1 package com.github.davidmoten.rx2.flowable;
2
3
4 import io.reactivex.Flowable;
5 import org.reactivestreams.Subscriber;
6
7 import com.github.davidmoten.rx2.internal.flowable.OnSubscribeCacheResettable;
8
9 public final class CachedFlowable<T> extends Flowable<T> {
10
11 private final OnSubscribeCacheResettable<T> cache;
12
13 public CachedFlowable(Flowable<T> source) {
14 this(new OnSubscribeCacheResettable<T>(source));
15 }
16
17 CachedFlowable(OnSubscribeCacheResettable<T> cache) {
18 this.cache = cache;
19 }
20
21 public CachedFlowable<T> reset() {
22 cache.reset();
23 return this;
24 }
25
26 @Override
27 protected void subscribeActual(Subscriber<? super T> subscriber) {
28 cache.subscribe(subscriber);
29 }
30 }