1 package com.github.davidmoten.rx.internal.operators; 2 3 import rx.Observable; 4 import rx.Observable.OnSubscribe; 5 import rx.Subscriber; 6 import rx.exceptions.Exceptions; 7 import rx.functions.Action0; 8 9 public final class OnSubscribeDoOnEmpty<T> implements OnSubscribe<T> { 10 11 private final Action0 onEmpty; 12 private Observable<T> observable; 13 14 public OnSubscribeDoOnEmpty(Observable<T> observable, Action0 onEmpty) { 15 this.observable = observable; 16 this.onEmpty = onEmpty; 17 } 18 19 @Override 20 public void call(final Subscriber<? super T> child) { 21 Subscriber<T> parent = createSubscriber(child, onEmpty); 22 observable.unsafeSubscribe(parent); 23 } 24 25 private static <T> Subscriber<T> createSubscriber(final Subscriber<? super T> child, final Action0 onEmpty) { 26 return new Subscriber<T>(child) { 27 28 private boolean isEmpty = true; 29 30 @Override 31 public void onCompleted() { 32 if (isEmpty) { 33 try { 34 onEmpty.call(); 35 } catch (Throwable e) { 36 Exceptions.throwOrReport(e,this); 37 return; 38 } 39 if (!isUnsubscribed()) { 40 child.onCompleted(); 41 } 42 } else { 43 child.onCompleted(); 44 } 45 } 46 47 @Override 48 public void onError(Throwable e) { 49 child.onError(e); 50 } 51 52 @Override 53 public void onNext(T t) { 54 isEmpty = false; 55 child.onNext(t); 56 } 57 }; 58 } 59 60 }