1 package org.davidmoten.kool.internal.operators.stream; 2 3 import org.davidmoten.kool.StreamIterator; 4 import org.davidmoten.kool.internal.util.RingBuffer; 5 6 public final class ReplayableStreamIterator<T> implements StreamIterator<T> { 7 8 private final StreamIterator<T> it; 9 private final RingBuffer<T> buffer; 10 11 public ReplayableStreamIterator(StreamIterator<T> it, int maxReplay) { 12 this.it = it; 13 this.buffer = new RingBuffer<T>(maxReplay); 14 } 15 16 @Override 17 public boolean hasNext() { 18 return !buffer.isEmpty() || it.hasNext(); 19 } 20 21 @Override 22 public T next() { 23 if (buffer.isEmpty()) { 24 buffer.offer(it.next()); 25 } 26 return buffer.poll(); 27 } 28 29 @Override 30 public void dispose() { 31 it.dispose(); 32 } 33 34 public void replay(int count) { 35 buffer.replay(count); 36 } 37 38 }