View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.util.concurrent.atomic.AtomicInteger;
4   import java.util.concurrent.atomic.AtomicLong;
5   import java.util.concurrent.atomic.AtomicReference;
6   
7   import org.reactivestreams.Subscriber;
8   import org.reactivestreams.Subscription;
9   
10  import com.github.davidmoten.guavamini.Preconditions;
11  
12  import io.reactivex.Flowable;
13  import io.reactivex.FlowableSubscriber;
14  import io.reactivex.Maybe;
15  import io.reactivex.MaybeObserver;
16  import io.reactivex.disposables.Disposable;
17  import io.reactivex.exceptions.Exceptions;
18  import io.reactivex.functions.Function;
19  import io.reactivex.internal.disposables.DisposableHelper;
20  import io.reactivex.internal.fuseable.SimplePlainQueue;
21  import io.reactivex.internal.queue.MpscLinkedQueue;
22  import io.reactivex.internal.subscriptions.SubscriptionHelper;
23  import io.reactivex.internal.util.BackpressureHelper;
24  import io.reactivex.plugins.RxJavaPlugins;
25  
26  public final class FlowableInsertMaybe<T> extends Flowable<T> {
27  
28      private final Flowable<T> source;
29      private final Function<? super T, ? extends Maybe<? extends T>> valueToInsert;
30  
31      public FlowableInsertMaybe(Flowable<T> source, Function<? super T, ? extends Maybe<? extends T>> valueToInsert) {
32          Preconditions.checkNotNull(valueToInsert, "valueToInsert cannot be null");
33          this.source = source;
34          this.valueToInsert = valueToInsert;
35      }
36  
37      @Override
38      protected void subscribeActual(Subscriber<? super T> downstream) {
39          source.subscribe(new InsertSubscriber<T>(downstream, valueToInsert));
40      }
41  
42      static final class InsertSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
43  
44          private static final long serialVersionUID = -15415234346097063L;
45  
46          private final Subscriber<? super T> downstream;
47          private final Function<? super T, ? extends Maybe<? extends T>> valueToInsert;
48          private final SimplePlainQueue<T> queue;
49          private final AtomicLong requested;
50          private final AtomicLong inserted;
51  
52          // define as type Object so can set with a non-null sentinel object that does
53          // not gather a stacktrace. A further refinement would be to use just one
54          // AtomicReference for `error` and `done` (leaving it for now)
55          private final AtomicReference<Object> error;
56          private final AtomicReference<Disposable> valueToInsertObserver;
57  
58          private Subscription upstream;
59          private volatile boolean done;
60          private volatile boolean cancelled;
61  
62          // used to prevent emission of events after a terminal event
63          // does not need to be volatile
64          private boolean finished;
65  
66          InsertSubscriber(Subscriber<? super T> downstream,
67                  Function<? super T, ? extends Maybe<? extends T>> valueToInsert) {
68              this.downstream = downstream;
69              this.valueToInsert = valueToInsert;
70              this.queue = new MpscLinkedQueue<T>();
71              this.requested = new AtomicLong();
72              this.inserted = new AtomicLong();
73              this.error = new AtomicReference<Object>();
74              this.valueToInsertObserver = new AtomicReference<Disposable>();
75          }
76  
77          @Override
78          public void onSubscribe(Subscription upstream) {
79              if (SubscriptionHelper.validate(this.upstream, upstream)) {
80                  this.upstream = upstream;
81                  downstream.onSubscribe(this);
82              }
83          }
84  
85          @Override
86          public void onNext(T t) {
87              if (finished) {
88                  return;
89              }
90              queue.offer(t);
91              Maybe<? extends T> maybe;
92              try {
93                  maybe = valueToInsert.apply(t);
94              } catch (Throwable e) {
95                  Exceptions.throwIfFatal(e);
96                  // we cancel upstream ourselves because the
97                  // error did not originate from source
98                  upstream.cancel();
99                  onError(e);
100                 return;
101             }
102             ValueToInsertObserver<T> o = new ValueToInsertObserver<T>(this);
103             if (DisposableHelper.set(valueToInsertObserver, o)) {
104                 // note that at this point we have to cover o being disposed
105                 // from another thread so the Observer class needs
106                 // to handle dispose being called before/during onSubscribe
107                 maybe.subscribe(o);
108             }
109             drain();
110         }
111 
112         @Override
113         public void onError(Throwable e) {
114             if (finished) {
115                 RxJavaPlugins.onError(e);
116                 return;
117             }
118             finished = true;
119             if (error.compareAndSet(null, e)) {
120                 DisposableHelper.dispose(valueToInsertObserver);
121                 done = true;
122                 drain();
123             } else {
124                 RxJavaPlugins.onError(e);
125             }
126         }
127 
128         @Override
129         public void onComplete() {
130             if (finished) {
131                 return;
132             }
133             finished = true;
134             DisposableHelper.dispose(valueToInsertObserver);
135             done = true;
136             drain();
137         }
138 
139         private void drain() {
140             if (getAndIncrement() != 0) {
141                 return;
142             }
143             // note that this drain loop does not shortcut errors
144             int missed = 1;
145             while (true) {
146                 long r = requested.get();
147                 long e = 0;
148                 while (e != r) {
149                     if (cancelled) {
150                         DisposableHelper.dispose(valueToInsertObserver);
151                         queue.clear();
152                         return;
153                     }
154                     // must read `done` before polling queue
155                     boolean d = done;
156                     T t = queue.poll();
157                     if (t == null) {
158                         if (d) {
159                             Object err = error.get();
160                             if (err != null) {
161                                 // clear the exception so can be gc'd
162                                 // `this` is not a real error, it just prevents
163                                 // it getting set again in a race because the other
164                                 // setters which use CAS assume initial value of null
165                                 error.set(this);
166                                 DisposableHelper.dispose(valueToInsertObserver);
167                                 downstream.onError((Throwable) err);
168                             } else {
169                                 // don't need to dispose valueToInsertObserver because already done in
170                                 // onComplete
171                                 downstream.onComplete();
172                             }
173                             return;
174                         } else {
175                             // nothing to emit and not done
176                             break;
177                         }
178                     } else {
179                         downstream.onNext(t);
180                         e++;
181                     }
182                 }
183                 if (e != 0L && r != Long.MAX_VALUE) {
184                     requested.addAndGet(-e);
185                 }
186                 missed = addAndGet(-missed);
187                 if (missed == 0) {
188                     return;
189                 }
190             }
191         }
192 
193         @Override
194         public void request(long n) {
195             if (SubscriptionHelper.validate(n)) {
196                 BackpressureHelper.add(requested, n);
197                 // modify request to upstream to account for inserted values
198                 // use a CAS loop because request can be called from any thread
199                 while (true) {
200                     long ins = inserted.get();
201                     long d = Math.min(ins, n);
202                     if (inserted.compareAndSet(ins, ins - d)) {
203                         if (n - d > 0) {
204                             upstream.request(n - d);
205                         }
206                         break;
207                     }
208                 }
209                 drain();
210             }
211         }
212 
213         @Override
214         public void cancel() {
215             if (!cancelled) {
216                 cancelled = true;
217                 upstream.cancel();
218                 DisposableHelper.dispose(valueToInsertObserver);
219                 if (getAndIncrement() == 0) {
220                     // use the same access control to queue as drain method
221                     // because `clear` just calls `queue.poll()` repeatedly till nothing left on the
222                     // queue (ignoring the dequeued items).
223                     //
224                     // this is best endeavours, there still exists a race with onNext and drain
225                     // where items could be left on the queue after cancel
226                     queue.clear();
227                 }
228             }
229         }
230 
231         void insert(T t) {
232             inserted.incrementAndGet();
233             queue.offer(t);
234             drain();
235         }
236 
237         void insertError(Throwable e) {
238             if (error.compareAndSet(null, e)) {
239                 upstream.cancel();
240                 DisposableHelper.dispose(valueToInsertObserver);
241                 done = true;
242                 drain();
243             } else {
244                 RxJavaPlugins.onError(e);
245             }
246         }
247 
248     }
249 
250     static final class ValueToInsertObserver<T> extends AtomicReference<Disposable>
251             implements MaybeObserver<T>, Disposable {
252 
253         private static final long serialVersionUID = 41384726414575403L;
254 
255         private final InsertSubscriber<T> downstream;
256 
257         ValueToInsertObserver(InsertSubscriber<T> downstream) {
258             this.downstream = downstream;
259         }
260 
261         @Override
262         public void onSubscribe(Disposable upstream) {
263             // an AtomicReference is used to hold the upstream Disposable
264             // because this Observer can be disposed before onSubscribe
265             // is called (contrary to the normal contract).
266             DisposableHelper.setOnce(this, upstream);
267         }
268 
269         @Override
270         public void onSuccess(T t) {
271             lazySet(DisposableHelper.DISPOSED);
272             downstream.insert(t);
273         }
274 
275         @Override
276         public void onError(Throwable e) {
277             lazySet(DisposableHelper.DISPOSED);
278             downstream.insertError(e);
279         }
280 
281         @Override
282         public void onComplete() {
283             lazySet(DisposableHelper.DISPOSED);
284             // don't do anything else because no value to insert was reported
285         }
286 
287         @Override
288         public void dispose() {
289             DisposableHelper.dispose(this);
290         }
291 
292         @Override
293         public boolean isDisposed() {
294             return get() == DisposableHelper.DISPOSED;
295         }
296 
297     }
298 }