1 package com.github.davidmoten.rx.internal.operators;
2
3 import rx.Observable;
4 import rx.Observable.OnSubscribe;
5 import rx.Subscriber;
6 import rx.exceptions.Exceptions;
7 import rx.functions.Action0;
8
9 public final class OnSubscribeDoOnEmpty<T> implements OnSubscribe<T> {
10
11 private final Action0 onEmpty;
12 private Observable<T> observable;
13
14 public OnSubscribeDoOnEmpty(Observable<T> observable, Action0 onEmpty) {
15 this.observable = observable;
16 this.onEmpty = onEmpty;
17 }
18
19 @Override
20 public void call(final Subscriber<? super T> child) {
21 Subscriber<T> parent = createSubscriber(child, onEmpty);
22 observable.unsafeSubscribe(parent);
23 }
24
25 private static <T> Subscriber<T> createSubscriber(final Subscriber<? super T> child, final Action0 onEmpty) {
26 return new Subscriber<T>(child) {
27
28 private boolean isEmpty = true;
29
30 @Override
31 public void onCompleted() {
32 if (isEmpty) {
33 try {
34 onEmpty.call();
35 } catch (Throwable e) {
36 Exceptions.throwOrReport(e,this);
37 return;
38 }
39 if (!isUnsubscribed()) {
40 child.onCompleted();
41 }
42 } else {
43 child.onCompleted();
44 }
45 }
46
47 @Override
48 public void onError(Throwable e) {
49 child.onError(e);
50 }
51
52 @Override
53 public void onNext(T t) {
54 isEmpty = false;
55 child.onNext(t);
56 }
57 };
58 }
59
60 }