View Javadoc
1   package com.github.davidmoten.rx2.internal.observable;
2   
3   import io.reactivex.Observable;
4   import io.reactivex.Observer;
5   
6   import java.util.concurrent.atomic.AtomicBoolean;
7   
8   public final class OnSubscribeCacheResetable<T>{
9   
10      private final AtomicBoolean refresh = new AtomicBoolean(true);
11      private final Observable<T> source;
12      private volatile Observable<T> current;
13  
14      public OnSubscribeCacheResetable(Observable<T> source) {
15          this.source = source;
16          this.current = source;
17      }
18  
19      public void subscribe(final Observer<? super T> observer) {
20          if (refresh.compareAndSet(true, false)) {
21              current = source.cache();
22          }
23          current.subscribe(observer);
24      }
25  
26      public void reset() {
27          refresh.set(true);
28      }
29  
30  }