View Javadoc
1   package com.github.davidmoten.rx2.flowable;
2   
3   
4   import io.reactivex.Flowable;
5   import org.reactivestreams.Subscriber;
6   
7   import com.github.davidmoten.rx2.internal.flowable.OnSubscribeCacheResettable;
8   
9   public final class CachedFlowable<T> extends Flowable<T> {
10  
11      private final OnSubscribeCacheResettable<T> cache;
12  
13      public CachedFlowable(Flowable<T> source) {
14          this(new OnSubscribeCacheResettable<T>(source));
15      }
16  
17      CachedFlowable(OnSubscribeCacheResettable<T> cache) {
18          this.cache = cache;
19      }
20  
21      public CachedFlowable<T> reset() {
22          cache.reset();
23          return this;
24      }
25  
26      @Override
27      protected void subscribeActual(Subscriber<? super T> subscriber) {
28          cache.subscribe(subscriber);
29      }
30  }