View Javadoc
1   package com.github.davidmoten.rx2.observable;
2   
3   import com.github.davidmoten.rx2.internal.observable.OnSubscribeCacheResetable;
4   
5   import io.reactivex.Observable;
6   import io.reactivex.Observer;
7   
8   public final class CachedObservable<T> extends Observable<T> {
9   
10      private final OnSubscribeCacheResetable<T> cache;
11  
12      public CachedObservable(Observable<T> source) {
13          this(new OnSubscribeCacheResetable<T>(source));
14      }
15  
16      CachedObservable(OnSubscribeCacheResetable<T> cache) {
17          this.cache = cache;
18      }
19  
20      public CachedObservable<T> reset() {
21          cache.reset();
22          return this;
23      }
24  
25      @Override
26      protected void subscribeActual(Observer<? super T> observer) {
27          cache.subscribe(observer);
28      }
29  
30  }