1 package org.davidmoten.kool.internal.operators.stream;
2
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.NoSuchElementException;
6
7 import org.davidmoten.kool.Stream;
8 import org.davidmoten.kool.StreamIterator;
9 import org.davidmoten.kool.internal.util.RingBuffer;
10
11 import com.github.davidmoten.guavamini.Preconditions;
12
13 public class Buffer<T> implements Stream<List<T>> {
14
15 private final Stream<T> stream;
16 private final int size;
17 private final int step;
18 private final boolean copy;
19
20 public Buffer(Stream<T> stream, int size, int step, boolean copy) {
21 Preconditions.checkArgument(step > 0, "step must be greater than 0");
22 this.stream = stream;
23 this.size = size;
24 this.step = step;
25 this.copy = copy;
26 }
27
28 @Override
29 public StreamIterator<List<T>> iterator() {
30 return new StreamIterator<List<T>>() {
31
32 StreamIterator<T> it = stream.iteratorNullChecked();
33 RingBuffer<T> buffer = new RingBuffer<>(size);
34 boolean applyStep = false;
35
36 @Override
37 public boolean hasNext() {
38 loadNext();
39 return !buffer.isEmpty();
40 }
41
42 @Override
43 public List<T> next() {
44 loadNext();
45 if (buffer.isEmpty()) {
46 throw new NoSuchElementException();
47 } else {
48 applyStep = true;
49 if (copy) {
50 return new ArrayList<>(buffer);
51 } else {
52 return buffer;
53 }
54 }
55 }
56
57 @Override
58 public void dispose() {
59 it.dispose();
60 }
61
62 private void loadNext() {
63 if (applyStep) {
64 int n = Math.min(step, buffer.size());
65 for (int i = 0; i < n; i++) {
66 buffer.poll();
67 }
68 applyStep = false;
69 }
70 while (buffer.size() < size && it.hasNext()) {
71 buffer.add(it.nextNullChecked());
72 }
73 }
74 };
75 }
76
77 }