1 package com.github.davidmoten.rx2.flowable; 2 3 4 import io.reactivex.Flowable; 5 import org.reactivestreams.Subscriber; 6 7 import com.github.davidmoten.rx2.internal.flowable.OnSubscribeCacheResettable; 8 9 public final class CachedFlowable<T> extends Flowable<T> { 10 11 private final OnSubscribeCacheResettable<T> cache; 12 13 public CachedFlowable(Flowable<T> source) { 14 this(new OnSubscribeCacheResettable<T>(source)); 15 } 16 17 CachedFlowable(OnSubscribeCacheResettable<T> cache) { 18 this.cache = cache; 19 } 20 21 public CachedFlowable<T> reset() { 22 cache.reset(); 23 return this; 24 } 25 26 @Override 27 protected void subscribeActual(Subscriber<? super T> subscriber) { 28 cache.subscribe(subscriber); 29 } 30 }