1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import org.reactivestreams.Publisher;
4 import org.reactivestreams.Subscriber;
5 import org.reactivestreams.Subscription;
6
7 import com.github.davidmoten.guavamini.Preconditions;
8
9 import io.reactivex.Flowable;
10 import io.reactivex.FlowableSubscriber;
11 import io.reactivex.exceptions.Exceptions;
12 import io.reactivex.functions.Action;
13 import io.reactivex.internal.subscriptions.SubscriptionHelper;
14
15
16
17
18
19
20
21 public final class FlowableDoOnEmpty<T> extends Flowable<T> {
22
23 private final Publisher<T> source;
24 private final Action onEmpty;
25
26 public FlowableDoOnEmpty(Publisher<T> source, Action onEmpty) {
27 Preconditions.checkNotNull(source, "source cannot be null");
28 Preconditions.checkNotNull(onEmpty, "onEmpty cannot be null");
29 this.source = source;
30 this.onEmpty = onEmpty;
31 }
32
33 @Override
34 protected void subscribeActual(Subscriber<? super T> child) {
35 source.subscribe(new DoOnEmptySubscriber<T>(child, onEmpty));
36 }
37
38 private static final class DoOnEmptySubscriber<T> implements FlowableSubscriber<T>, Subscription {
39
40 private final Subscriber<? super T> child;
41 private final Action onEmpty;
42
43 private boolean done;
44 private boolean empty = true;
45 private Subscription parent;
46
47 DoOnEmptySubscriber(Subscriber<? super T> child, Action onEmpty) {
48 this.child = child;
49 this.onEmpty = onEmpty;
50 }
51
52 @Override
53 public void onSubscribe(Subscription parent) {
54 if (SubscriptionHelper.validate(this.parent, parent)) {
55 this.parent = parent;
56 child.onSubscribe(this);
57 }
58 }
59
60 @Override
61 public void onComplete() {
62 if (done) {
63 return;
64 }
65 if (empty) {
66 try {
67 onEmpty.run();
68 } catch (Throwable e) {
69 Exceptions.throwIfFatal(e);
70 onError(e);
71 return;
72 }
73 }
74 done = true;
75 child.onComplete();
76 }
77
78 @Override
79 public void onNext(T t) {
80 empty = false;
81 child.onNext(t);
82 }
83
84 @Override
85 public void onError(Throwable e) {
86 if (done) {
87 return;
88 }
89 done = true;
90 child.onError(e);
91 }
92
93 @Override
94 public void cancel() {
95 parent.cancel();
96 }
97
98 @Override
99 public void request(long n) {
100 parent.request(n);
101 }
102 }
103 }