1 package com.github.davidmoten.rx.observables; 2 3 import com.github.davidmoten.rx.internal.operators.OnSubscribeCacheResetable; 4 5 import rx.Observable; 6 7 public class CachedObservable<T> extends Observable<T> { 8 9 private final OnSubscribeCacheResetable<T> cache; 10 11 public CachedObservable(Observable<T> source) { 12 this(new OnSubscribeCacheResetable<T>(source)); 13 } 14 15 CachedObservable(OnSubscribeCacheResetable<T> cache) { 16 super(cache); 17 this.cache = cache; 18 } 19 20 public CachedObservable<T> reset() { 21 cache.reset(); 22 return this; 23 } 24 25 }