View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import org.reactivestreams.Publisher;
4   import org.reactivestreams.Subscriber;
5   import org.reactivestreams.Subscription;
6   
7   import com.github.davidmoten.guavamini.Preconditions;
8   
9   import io.reactivex.Flowable;
10  import io.reactivex.FlowableSubscriber;
11  import io.reactivex.exceptions.Exceptions;
12  import io.reactivex.functions.Action;
13  import io.reactivex.internal.subscriptions.SubscriptionHelper;
14  
15  /**
16   * Calls a consumer just before completion if the stream was empty.
17   * 
18   * @param <T>
19   *            the value type
20   */
21  public final class FlowableDoOnEmpty<T> extends Flowable<T> {
22  
23      private final Publisher<T> source;
24      private final Action onEmpty;
25  
26      public FlowableDoOnEmpty(Publisher<T> source, Action onEmpty) {
27          Preconditions.checkNotNull(source, "source cannot be null");
28          Preconditions.checkNotNull(onEmpty, "onEmpty cannot be null");
29          this.source = source;
30          this.onEmpty = onEmpty;
31      }
32  
33      @Override
34      protected void subscribeActual(Subscriber<? super T> child) {
35          source.subscribe(new DoOnEmptySubscriber<T>(child, onEmpty));
36      }
37  
38      private static final class DoOnEmptySubscriber<T> implements FlowableSubscriber<T>, Subscription {
39  
40          private final Subscriber<? super T> child;
41          private final Action onEmpty;
42          // mutable state
43          private boolean done;
44          private boolean empty = true;
45          private Subscription parent;
46  
47          DoOnEmptySubscriber(Subscriber<? super T> child, Action onEmpty) {
48              this.child = child;
49              this.onEmpty = onEmpty;
50          }
51  
52          @Override
53          public void onSubscribe(Subscription parent) {
54              if (SubscriptionHelper.validate(this.parent, parent)) {
55                  this.parent = parent;
56                  child.onSubscribe(this);
57              }
58          }
59  
60          @Override
61          public void onComplete() {
62              if (done) {
63                  return;
64              }
65              if (empty) {
66                  try {
67                      onEmpty.run();
68                  } catch (Throwable e) {
69                      Exceptions.throwIfFatal(e);
70                      onError(e);
71                      return;
72                  }
73              }
74              done = true;
75              child.onComplete();
76          }
77  
78          @Override
79          public void onNext(T t) {
80              empty = false;
81              child.onNext(t);
82          }
83  
84          @Override
85          public void onError(Throwable e) {
86              if (done) {
87                  return;
88              }
89              done = true;
90              child.onError(e);
91          }
92  
93          @Override
94          public void cancel() {
95              parent.cancel();
96          }
97  
98          @Override
99          public void request(long n) {
100             parent.request(n);
101         }
102     }
103 }