1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.util.concurrent.atomic.AtomicBoolean;
4
5 import rx.Observable;
6 import rx.Observable.OnSubscribe;
7 import rx.Subscriber;
8
9 public final class OnSubscribeCacheResetable<T> implements OnSubscribe<T> {
10
11 private final AtomicBoolean refresh = new AtomicBoolean(true);
12 private final Observable<T> source;
13 private volatile Observable<T> current;
14
15 public OnSubscribeCacheResetable(Observable<T> source) {
16 this.source = source;
17 this.current = source;
18 }
19
20 @Override
21 public void call(Subscriber<? super T> subscriber) {
22 if (refresh.compareAndSet(true, false)) {
23 current = source.cache();
24 }
25 current.unsafeSubscribe(subscriber);
26 }
27
28 public void reset() {
29 refresh.set(true);
30 }
31
32 }