1 package com.github.davidmoten.rx.internal.operators; 2 3 import java.util.concurrent.atomic.AtomicBoolean; 4 5 import rx.Observable; 6 import rx.Observable.OnSubscribe; 7 import rx.Subscriber; 8 9 public final class OnSubscribeCacheResetable<T> implements OnSubscribe<T> { 10 11 private final AtomicBoolean refresh = new AtomicBoolean(true); 12 private final Observable<T> source; 13 private volatile Observable<T> current; 14 15 public OnSubscribeCacheResetable(Observable<T> source) { 16 this.source = source; 17 this.current = source; 18 } 19 20 @Override 21 public void call(Subscriber<? super T> subscriber) { 22 if (refresh.compareAndSet(true, false)) { 23 current = source.cache(); 24 } 25 current.unsafeSubscribe(subscriber); 26 } 27 28 public void reset() { 29 refresh.set(true); 30 } 31 32 }