View Javadoc
1   package org.davidmoten.kool.internal.operators.stream;
2   
3   import java.util.ArrayList;
4   import java.util.List;
5   import java.util.NoSuchElementException;
6   
7   import org.davidmoten.kool.Stream;
8   import org.davidmoten.kool.StreamIterator;
9   import org.davidmoten.kool.internal.util.RingBuffer;
10  
11  import com.github.davidmoten.guavamini.Preconditions;
12  
13  public class Buffer<T> implements Stream<List<T>> {
14  
15      private final Stream<T> stream;
16      private final int size;
17      private final int step;
18      private final boolean copy;
19  
20      public Buffer(Stream<T> stream, int size, int step, boolean copy) {
21          Preconditions.checkArgument(step > 0, "step must be greater than 0");
22          this.stream = stream;
23          this.size = size;
24          this.step = step;
25          this.copy = copy;
26      }
27  
28      @Override
29      public StreamIterator<List<T>> iterator() {
30          return new StreamIterator<List<T>>() {
31  
32              StreamIterator<T> it = stream.iteratorNullChecked();
33              RingBuffer<T> buffer = new RingBuffer<>(size);
34              boolean applyStep = false;
35  
36              @Override
37              public boolean hasNext() {
38                  loadNext();
39                  return !buffer.isEmpty();
40              }
41  
42              @Override
43              public List<T> next() {
44                  loadNext();
45                  if (buffer.isEmpty()) {
46                      throw new NoSuchElementException();
47                  } else {
48                      applyStep = true;
49                      if (copy) {
50                          return new ArrayList<>(buffer);
51                      } else {
52                          return buffer;
53                      }
54                  }
55              }
56  
57              @Override
58              public void dispose() {
59                  it.dispose();
60              }
61  
62              private void loadNext() {
63                  if (applyStep) {
64                      int n = Math.min(step, buffer.size());
65                      for (int i = 0; i < n; i++) {
66                          buffer.poll();
67                      }
68                      applyStep = false;
69                  }
70                  while (buffer.size() < size && it.hasNext()) {
71                      buffer.add(it.nextNullChecked());
72                  }
73              }
74          };
75      }
76  
77  }