1 package com.github.davidmoten.rx2.observable; 2 3 import io.reactivex.Observable; 4 5 public final class CloseableObservableWithReset<T> { 6 7 private final Observable<T> observable; 8 private final Runnable closeAction; 9 private final Runnable resetAction; 10 11 public CloseableObservableWithReset(Observable<T> observable, Runnable closeAction, Runnable resetAction) { 12 this.observable = observable; 13 this.closeAction = closeAction; 14 this.resetAction = resetAction; 15 } 16 17 public Observable<T> observable() { 18 return observable; 19 } 20 21 public void reset() { 22 resetAction.run(); 23 } 24 25 public void close() { 26 closeAction.run(); 27 } 28 29 }