1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import io.reactivex.Flowable;
4 import org.reactivestreams.Subscriber;
5
6 import java.util.concurrent.atomic.AtomicBoolean;
7
8 public final class OnSubscribeCacheResettable<T>{
9
10 private final AtomicBoolean refresh = new AtomicBoolean(true);
11 private final Flowable<T> source;
12 private volatile Flowable<T> current;
13
14 public OnSubscribeCacheResettable(Flowable<T> source) {
15 this.source = source;
16 this.current = source;
17 }
18
19 public void subscribe(final Subscriber<? super T> subscriber) {
20 if (refresh.compareAndSet(true, false)) {
21 current = source.cache();
22 }
23 current.subscribe(subscriber);
24 }
25
26 public void reset() {
27 refresh.set(true);
28 }
29
30 }