1 package org.davidmoten.kool.internal.operators.stream;
2
3 import org.davidmoten.kool.StreamIterator;
4 import org.davidmoten.kool.internal.util.RingBuffer;
5
6 public final class ReplayableStreamIterator<T> implements StreamIterator<T> {
7
8 private final StreamIterator<T> it;
9 private final RingBuffer<T> buffer;
10
11 public ReplayableStreamIterator(StreamIterator<T> it, int maxReplay) {
12 this.it = it;
13 this.buffer = new RingBuffer<T>(maxReplay);
14 }
15
16 @Override
17 public boolean hasNext() {
18 return !buffer.isEmpty() || it.hasNext();
19 }
20
21 @Override
22 public T next() {
23 if (buffer.isEmpty()) {
24 buffer.offer(it.next());
25 }
26 return buffer.poll();
27 }
28
29 @Override
30 public void dispose() {
31 it.dispose();
32 }
33
34 public void replay(int count) {
35 buffer.replay(count);
36 }
37
38 }