1 package com.github.davidmoten.rx2.internal.observable; 2 3 import io.reactivex.Observable; 4 import io.reactivex.Observer; 5 6 import java.util.concurrent.atomic.AtomicBoolean; 7 8 public final class OnSubscribeCacheResetable<T>{ 9 10 private final AtomicBoolean refresh = new AtomicBoolean(true); 11 private final Observable<T> source; 12 private volatile Observable<T> current; 13 14 public OnSubscribeCacheResetable(Observable<T> source) { 15 this.source = source; 16 this.current = source; 17 } 18 19 public void subscribe(final Observer<? super T> observer) { 20 if (refresh.compareAndSet(true, false)) { 21 current = source.cache(); 22 } 23 current.subscribe(observer); 24 } 25 26 public void reset() { 27 refresh.set(true); 28 } 29 30 }