View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import rx.Observable;
4   import rx.Observable.OnSubscribe;
5   import rx.Subscriber;
6   import rx.exceptions.Exceptions;
7   import rx.functions.Action0;
8   
9   public final class OnSubscribeDoOnEmpty<T> implements OnSubscribe<T> {
10  
11      private final Action0 onEmpty;
12      private Observable<T> observable;
13  
14      public OnSubscribeDoOnEmpty(Observable<T> observable, Action0 onEmpty) {
15          this.observable = observable;
16          this.onEmpty = onEmpty;
17      }
18  
19      @Override
20      public void call(final Subscriber<? super T> child) {
21          Subscriber<T> parent = createSubscriber(child, onEmpty);
22          observable.unsafeSubscribe(parent);
23      }
24  
25      private static <T> Subscriber<T> createSubscriber(final Subscriber<? super T> child, final Action0 onEmpty) {
26          return new Subscriber<T>(child) {
27  
28              private boolean isEmpty = true;
29  
30              @Override
31              public void onCompleted() {
32                  if (isEmpty) {
33                      try {
34                          onEmpty.call();
35                      } catch (Throwable e) {
36                          Exceptions.throwOrReport(e,this);
37                          return;
38                      }
39                      if (!isUnsubscribed()) {
40                          child.onCompleted();
41                      }
42                  } else {
43                      child.onCompleted();
44                  }
45              }
46  
47              @Override
48              public void onError(Throwable e) {
49                  child.onError(e);
50              }
51  
52              @Override
53              public void onNext(T t) {
54                  isEmpty = false;
55                  child.onNext(t);
56              }
57          };
58      }
59  
60  }