View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.util.ArrayList;
4   import java.util.List;
5   import java.util.concurrent.atomic.AtomicBoolean;
6   import java.util.concurrent.atomic.AtomicInteger;
7   import java.util.concurrent.atomic.AtomicLong;
8   import java.util.concurrent.atomic.AtomicReference;
9   
10  import org.reactivestreams.Publisher;
11  import org.reactivestreams.Subscriber;
12  import org.reactivestreams.Subscription;
13  
14  import com.github.davidmoten.guavamini.Preconditions;
15  import com.github.davidmoten.util.RingBuffer;
16  
17  import io.reactivex.Flowable;
18  import io.reactivex.internal.fuseable.SimplePlainQueue;
19  import io.reactivex.internal.queue.MpscLinkedQueue;
20  import io.reactivex.internal.subscriptions.SubscriptionHelper;
21  import io.reactivex.internal.util.BackpressureHelper;
22  
23  public final class FlowableMergeInterleave<T> extends Flowable<T> {
24  
25      private final int maxConcurrent;
26      private final Publisher<? extends Publisher<? extends T>> sources;
27      private final int batchSize;
28      private boolean delayErrors;
29  
30      public FlowableMergeInterleave(Publisher<? extends Publisher<? extends T>> sources,
31              int maxConcurrent, int batchSize, boolean delayErrors) {
32          this.sources = sources;
33          this.maxConcurrent = maxConcurrent;
34          this.batchSize = batchSize;
35          this.delayErrors = delayErrors;
36      }
37  
38      @Override
39      protected void subscribeActual(Subscriber<? super T> s) {
40          MergeInterleaveSubscription<T> subscription = new MergeInterleaveSubscription<T>(sources,
41                  maxConcurrent, batchSize, delayErrors, s);
42          s.onSubscribe(subscription);
43      }
44  
45      private static final class MergeInterleaveSubscription<T> extends AtomicInteger
46              implements Subscription, Subscriber<Publisher<? extends T>> {
47  
48          private static final long serialVersionUID = -6416801556759306113L;
49          private static final Object SOURCES_COMPLETE = new Object();
50          private final AtomicBoolean once = new AtomicBoolean();
51          private final Publisher<? extends Publisher<? extends T>> sources;
52          private final int maxConcurrent;
53          private final int batchSize;
54          private final boolean delayErrors;
55          private Subscriber<? super T> subscriber;
56          private Subscription subscription;
57          private volatile boolean cancelled;
58          private Throwable error;
59          private volatile boolean finished;
60          private final AtomicLong requested = new AtomicLong();
61          private long emitted;
62          private final RingBuffer<BatchFinished> batchFinished;
63  
64          // objects on queue can be Flowable, Subscriber,
65          private final SimplePlainQueue<Object> queue;
66          private final List<SourceSubscriber<T>> sourceSubscribers = new ArrayList<SourceSubscriber<T>>();
67          private boolean sourcesComplete;
68          private long sourcesCount;
69  
70          public MergeInterleaveSubscription(Publisher<? extends Publisher<? extends T>> sources,
71                  int maxConcurrent, int batchSize, boolean delayErrors,
72                  Subscriber<? super T> subscriber) {
73              this.sources = sources;
74              this.maxConcurrent = maxConcurrent;
75              this.batchSize = batchSize;
76              this.delayErrors = delayErrors;
77              this.subscriber = subscriber;
78              this.queue = new MpscLinkedQueue<Object>();
79              this.batchFinished = RingBuffer.create(maxConcurrent + 1);
80          }
81  
82          @Override
83          public void request(long n) {
84              if (SubscriptionHelper.validate(n)) {
85                  BackpressureHelper.add(requested, n);
86                  if (once.compareAndSet(false, true)) {
87                      sources.subscribe(this);
88                      subscription.request(maxConcurrent);
89                  }
90                  drain();
91              }
92          }
93  
94          @Override
95          public void cancel() {
96              this.cancelled = true;
97          }
98  
99          @Override
100         public void onSubscribe(Subscription s) {
101             this.subscription = s;
102             drain();
103         }
104 
105         @Override
106         public void onNext(Publisher<? extends T> f) {
107             sourcesCount++;
108             queue.offer(new SourceArrived<T>(f));
109             if (sourcesCount >= maxConcurrent) {
110                 drain();
111             }
112         }
113 
114         @Override
115         public void onError(Throwable t) {
116             error = t;
117             finished = true;
118             drain();
119         }
120 
121         @Override
122         public void onComplete() {
123             queue.offer(SOURCES_COMPLETE);
124             drain();
125         }
126 
127         private boolean tryCancelled() {
128             if (cancelled) {
129                 cleanup();
130                 return true;
131             } else {
132                 return false;
133             }
134         }
135 
136         @SuppressWarnings("unchecked")
137         private void drain() {
138             if (getAndIncrement() == 0) {
139                 int missed = 1;
140                 long e = emitted;
141                 long r = requested.get();
142                 while (true) {
143                     if (tryCancelled()) {
144                         return;
145                     }
146                     if (e == r) {
147                         r = requested.get();
148                     }
149                     while (e != r) {
150                         boolean d = finished;
151                         if (d && !delayErrors) {
152                             Throwable err = error;
153                             if (err != null) {
154                                 error = null;
155                                 cleanup();
156                                 subscriber.onError(err);
157                                 return;
158                             }
159                         }
160                         Object o = queue.poll();
161                         if (o == null) {
162                             if (d) {
163                                 Throwable err = error;
164                                 if (err != null) {
165                                     error = null;
166                                     cleanup();
167                                     subscriber.onError(err);
168                                 } else {
169                                     subscriber.onComplete();
170                                 }
171                                 return;
172                             } else {
173                                 break;
174                             }
175                         } else {
176                             if (o instanceof BatchFinished) {
177                                 handleBatchFinished((BatchFinished) o);
178                             } else if (o instanceof SourceArrived) {
179                                 handleSourceArrived((SourceArrived<T>) o);
180                             } else if (o instanceof SourceComplete) {
181                                 handleSourceComplete((SourceComplete<T>) o);
182                             } else if (o == SOURCES_COMPLETE) {
183                                 handleSourcesComplete();
184                             } else {
185                                 subscriber.onNext((T) o);
186                                 e++;
187                             }
188                         }
189                         if (tryCancelled()) {
190                             return;
191                         }
192                     }
193                     emitted = e;
194                     missed = addAndGet(-missed);
195                     if (missed == 0) {
196                         return;
197                     }
198                 }
199             }
200         }
201 
202         private void handleSourcesComplete() {
203             sourcesComplete = true;
204             if (sourceSubscribers.isEmpty()) {
205                 finished = true;
206             }
207         }
208 
209         private void handleBatchFinished(BatchFinished b) {
210             Preconditions.checkNotNull(b);
211             boolean ok = batchFinished.offer(b);
212             assert ok;
213             batchFinished.poll().requestMore();
214         }
215 
216         private void cleanup() {
217             subscription.cancel();
218             for (SourceSubscriber<T> s : sourceSubscribers) {
219                 s.cancel();
220             }
221             sourceSubscribers.clear();
222             queue.clear();
223             batchFinished.clear();
224         }
225 
226         private void handleSourceArrived(SourceArrived<T> event) {
227             SourceSubscriber<T> subscriber = new SourceSubscriber<T>(this);
228             sourceSubscribers.add(subscriber);
229             queue.offer(subscriber);
230             event.publisher.subscribe(subscriber);
231         }
232 
233         private void handleSourceComplete(SourceComplete<T> event) {
234             sourceSubscribers.remove(event.subscriber);
235             if (!sourcesComplete) {
236                 subscription.request(1);
237             } else if (sourceSubscribers.isEmpty() && sourcesComplete) {
238                 finished = true;
239             }
240         }
241 
242         public void sourceError(Throwable t) {
243             error = t;
244             finished = true;
245             drain();
246         }
247 
248         public void sourceComplete(SourceSubscriber<T> sourceSubscriber) {
249             queue.offer(new SourceComplete<T>(sourceSubscriber));
250             drain();
251         }
252 
253         public void sourceNext(T t, SourceSubscriber<T> sourceSubscriber) {
254             queue.offer(t);
255             if (sourceSubscriber != null) {
256                 queue.offer(sourceSubscriber);
257             }
258             drain();
259         }
260     }
261 
262     private final static class SourceSubscriber<T> implements Subscriber<T>, BatchFinished {
263 
264         private final MergeInterleaveSubscription<T> parent;
265         private AtomicReference<Subscription> subscription = new AtomicReference<Subscription>();
266         private int count = 0;
267 
268         SourceSubscriber(MergeInterleaveSubscription<T> parent) {
269             this.parent = parent;
270         }
271 
272         @Override
273         public void onSubscribe(Subscription s) {
274             SubscriptionHelper.setOnce(subscription, s);
275         }
276 
277         @Override
278         public void onNext(T t) {
279             count++;
280             boolean batchFinished = count == parent.batchSize;
281             if (batchFinished) {
282                 count = 0;
283             }
284             parent.sourceNext(t, batchFinished ? this : null);
285         }
286 
287         @Override
288         public void onError(Throwable t) {
289             parent.sourceError(t);
290         }
291 
292         @Override
293         public void onComplete() {
294             parent.sourceComplete(this);
295         }
296 
297         @Override
298         public void requestMore() {
299             subscription.get().request(parent.batchSize);
300         }
301 
302         void cancel() {
303             while (true) {
304                 Subscription s = subscription.get();
305                 if (subscription.compareAndSet(s, SubscriptionHelper.CANCELLED)) {
306                     s.cancel();
307                     break;
308                 }
309             }
310         }
311     }
312 
313     private static final class SourceArrived<T> {
314         final Publisher<? extends T> publisher;
315 
316         SourceArrived(Publisher<? extends T> publisher) {
317             this.publisher = publisher;
318         }
319     }
320 
321     private static final class SourceComplete<T> {
322         final Subscriber<T> subscriber;
323 
324         SourceComplete(Subscriber<T> subscriber) {
325             this.subscriber = subscriber;
326         }
327     }
328 
329     private interface BatchFinished {
330         void requestMore();
331     }
332 
333 }