View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.util.concurrent.atomic.AtomicBoolean;
4   
5   import rx.Observable;
6   import rx.Observable.OnSubscribe;
7   import rx.Subscriber;
8   
9   public final class OnSubscribeCacheResetable<T> implements OnSubscribe<T> {
10  
11      private final AtomicBoolean refresh = new AtomicBoolean(true);
12      private final Observable<T> source;
13      private volatile Observable<T> current;
14  
15      public OnSubscribeCacheResetable(Observable<T> source) {
16          this.source = source;
17          this.current = source;
18      }
19  
20      @Override
21      public void call(Subscriber<? super T> subscriber) {
22          if (refresh.compareAndSet(true, false)) {
23              current = source.cache();
24          }
25          current.unsafeSubscribe(subscriber);
26      }
27  
28      public void reset() {
29          refresh.set(true);
30      }
31  
32  }