1 package com.github.davidmoten.rx2.observable; 2 3 import com.github.davidmoten.rx2.internal.observable.OnSubscribeCacheResetable; 4 5 import io.reactivex.Observable; 6 import io.reactivex.Observer; 7 8 public final class CachedObservable<T> extends Observable<T> { 9 10 private final OnSubscribeCacheResetable<T> cache; 11 12 public CachedObservable(Observable<T> source) { 13 this(new OnSubscribeCacheResetable<T>(source)); 14 } 15 16 CachedObservable(OnSubscribeCacheResetable<T> cache) { 17 this.cache = cache; 18 } 19 20 public CachedObservable<T> reset() { 21 cache.reset(); 22 return this; 23 } 24 25 @Override 26 protected void subscribeActual(Observer<? super T> observer) { 27 cache.subscribe(observer); 28 } 29 30 }