1 package com.github.davidmoten.rx2.observable;
2
3 import com.github.davidmoten.rx2.internal.observable.OnSubscribeCacheResetable;
4
5 import io.reactivex.Observable;
6 import io.reactivex.Observer;
7
8 public final class CachedObservable<T> extends Observable<T> {
9
10 private final OnSubscribeCacheResetable<T> cache;
11
12 public CachedObservable(Observable<T> source) {
13 this(new OnSubscribeCacheResetable<T>(source));
14 }
15
16 CachedObservable(OnSubscribeCacheResetable<T> cache) {
17 this.cache = cache;
18 }
19
20 public CachedObservable<T> reset() {
21 cache.reset();
22 return this;
23 }
24
25 @Override
26 protected void subscribeActual(Observer<? super T> observer) {
27 cache.subscribe(observer);
28 }
29
30 }