View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import io.reactivex.Flowable;
4   import org.reactivestreams.Subscriber;
5   
6   import java.util.concurrent.atomic.AtomicBoolean;
7   
8   public final class OnSubscribeCacheResettable<T>{
9   
10      private final AtomicBoolean refresh = new AtomicBoolean(true);
11      private final Flowable<T> source;
12      private volatile Flowable<T> current;
13  
14      public OnSubscribeCacheResettable(Flowable<T> source) {
15          this.source = source;
16          this.current = source;
17      }
18  
19      public void subscribe(final Subscriber<? super T> subscriber) {
20          if (refresh.compareAndSet(true, false)) {
21              current = source.cache();
22          }
23          current.subscribe(subscriber);
24      }
25  
26      public void reset() {
27          refresh.set(true);
28      }
29  
30  }