1 package com.github.davidmoten.rx2.internal.flowable; 2 3 import io.reactivex.Flowable; 4 import org.reactivestreams.Subscriber; 5 6 import java.util.concurrent.atomic.AtomicBoolean; 7 8 public final class OnSubscribeCacheResettable<T>{ 9 10 private final AtomicBoolean refresh = new AtomicBoolean(true); 11 private final Flowable<T> source; 12 private volatile Flowable<T> current; 13 14 public OnSubscribeCacheResettable(Flowable<T> source) { 15 this.source = source; 16 this.current = source; 17 } 18 19 public void subscribe(final Subscriber<? super T> subscriber) { 20 if (refresh.compareAndSet(true, false)) { 21 current = source.cache(); 22 } 23 current.subscribe(subscriber); 24 } 25 26 public void reset() { 27 refresh.set(true); 28 } 29 30 }