1 package com.github.davidmoten.rx2.internal.observable;
2
3 import io.reactivex.Observable;
4 import io.reactivex.Observer;
5
6 import java.util.concurrent.atomic.AtomicBoolean;
7
8 public final class OnSubscribeCacheResetable<T>{
9
10 private final AtomicBoolean refresh = new AtomicBoolean(true);
11 private final Observable<T> source;
12 private volatile Observable<T> current;
13
14 public OnSubscribeCacheResetable(Observable<T> source) {
15 this.source = source;
16 this.current = source;
17 }
18
19 public void subscribe(final Observer<? super T> observer) {
20 if (refresh.compareAndSet(true, false)) {
21 current = source.cache();
22 }
23 current.subscribe(observer);
24 }
25
26 public void reset() {
27 refresh.set(true);
28 }
29
30 }