View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.util.ArrayList;
4   import java.util.Collection;
5   import java.util.Comparator;
6   import java.util.List;
7   import java.util.Queue;
8   import java.util.concurrent.atomic.AtomicLong;
9   
10  import rx.Observable;
11  import rx.Observable.OnSubscribe;
12  import rx.Producer;
13  import rx.Subscriber;
14  import rx.exceptions.CompositeException;
15  import rx.exceptions.MissingBackpressureException;
16  import rx.internal.operators.BackpressureUtils;
17  import rx.internal.operators.NotificationLite;
18  import rx.internal.util.RxRingBuffer;
19  import rx.internal.util.unsafe.MpscLinkedQueue;
20  
21  /**
22   * @author David Karnokd
23   *
24   * @param <T>
25   *            type of observable
26   */
27  public final class OrderedMerge<T> implements OnSubscribe<T> {
28      final List<Observable<T>> sources;
29      final Comparator<? super T> comparator;
30      final boolean delayErrors;
31  
32      public static <U extends Comparable<? super U>> Observable<U> create(
33              Collection<Observable<U>> sources) {
34          return create(sources, false);
35      }
36  
37      public static <U> Observable<U> create(Collection<Observable<U>> sources,
38              Comparator<? super U> comparator) {
39          return create(sources, comparator, false);
40      }
41  
42      public static <U extends Comparable<? super U>> Observable<U> create(
43              Collection<Observable<U>> sources, boolean delayErrors) {
44          return Observable.create(new OrderedMerge<U>(sources, new Comparator<U>() {
45              @Override
46              public int compare(U o1, U o2) {
47                  return o1.compareTo(o2);
48              }
49          }, delayErrors));
50      }
51  
52      public static <U> Observable<U> create(Collection<Observable<U>> sources,
53              Comparator<? super U> comparator, boolean delayErrors) {
54          return Observable.create(new OrderedMerge<U>(sources, comparator, delayErrors));
55      }
56  
57      private OrderedMerge(Collection<Observable<T>> sources,
58              Comparator<? super T> comparator, boolean delayErrors) {
59          this.sources = sources instanceof List ? (List<Observable<T>>) sources
60                  : new ArrayList<Observable<T>>(sources);
61          this.comparator = comparator;
62          this.delayErrors = delayErrors;
63      }
64  
65      @Override
66      public void call(Subscriber<? super T> child) {
67          @SuppressWarnings("unchecked")
68          SourceSubscriber<T>[] sources = new SourceSubscriber[this.sources.size()];
69          MergeProducer<T> mp = new MergeProducer<T>(sources, child, comparator, delayErrors);
70          for (int i = 0; i < sources.length; i++) {
71              if (child.isUnsubscribed()) {
72                  return;
73              }
74              SourceSubscriber<T> s = new SourceSubscriber<T>(mp);
75              sources[i] = s;
76              child.add(s);
77          }
78          mp.set(0); // release contents of the array
79          child.setProducer(mp);
80          int i = 0;
81          for (Observable<? extends T> source : this.sources) {
82              if (child.isUnsubscribed()) {
83                  return;
84              }
85              source.unsafeSubscribe(sources[i]);
86              i++;
87          }
88      }
89  
90      static final class MergeProducer<T> extends AtomicLong implements Producer {
91          /** */
92          private static final long serialVersionUID = -812969080497027108L;
93  
94          final NotificationLite<T> nl = NotificationLite.instance();
95  
96          final boolean delayErrors;
97          final Comparator<? super T> comparator;
98          @SuppressWarnings("rawtypes")
99          final SourceSubscriber[] sources;
100         final Subscriber<? super T> child;
101 
102         final Queue<Throwable> errors;
103 
104         boolean emitting;
105         boolean missed;
106 
107         @SuppressWarnings("rawtypes")
108         public MergeProducer(SourceSubscriber[] sources, Subscriber<? super T> child,
109                 Comparator<? super T> comparator, boolean delayErrors) {
110             this.sources = sources;
111             this.delayErrors = delayErrors;
112             this.errors = new MpscLinkedQueue<Throwable>();
113             this.child = child;
114             this.comparator = comparator;
115         }
116 
117         @Override
118         public void request(long n) {
119             BackpressureUtils.getAndAddRequest(this, n);
120             emit();
121         }
122 
123         public void error(Throwable ex) {
124             errors.offer(ex);
125             emit();
126         }
127 
128         public void emit() {
129             synchronized (this) {
130                 if (emitting) {
131                     missed = true;
132                     return;
133                 }
134                 emitting = true;
135             }
136             // lift into local variables, just in case
137             @SuppressWarnings("unchecked")
138             final SourceSubscriber<T>[] sources = this.sources;
139             final int n = sources.length;
140             final Subscriber<? super T> child = this.child;
141 
142             for (;;) {
143                 if (child.isUnsubscribed()) {
144                     return;
145                 }
146                 // eagerly check for errors
147                 if (!delayErrors && !errors.isEmpty()) {
148                     child.onError(errors.poll());
149                     return;
150                 }
151                 // the current requested
152                 long r = get();
153                 // aggregate total emissions
154                 long e = 0;
155                 // even without request, terminal events can be fired if the
156                 // state is right
157                 if (r == 0) {
158                     int doneCount = 0;
159                     // for each source
160                     for (SourceSubscriber<T> s : sources) {
161                         // if completed earlier
162                         if (s == null) {
163                             doneCount++;
164                         } else {
165                             // or just completed
166                             if (s.done && s.queue.isEmpty()) {
167                                 doneCount++;
168                             }
169                         }
170                     }
171                     // if all of them are completed
172                     if (doneCount == n) {
173                         reportErrorOrComplete(child);
174                         return;
175                     }
176                 }
177                 // until there is request
178                 while (r != 0L) {
179                     if (child.isUnsubscribed()) {
180                         return;
181                     }
182                     // eagerly check for errors
183                     if (!delayErrors && !errors.isEmpty()) {
184                         child.onError(errors.poll());
185                         return;
186                     }
187                     // indicates that every active source has at least one value
188                     boolean fullRow = true;
189                     // indicates that at least one value is available
190                     boolean hasAtLeastOne = false;
191                     // holds the smallest of the available values
192                     T minimum = null;
193                     // indicates which source's value is taken so it can be
194                     // polled/replenished
195                     int toPoll = -1;
196                     // number of completed sources
197                     int doneCount = 0;
198                     // for each source
199                     for (int i = 0; i < n; i++) {
200                         SourceSubscriber<T> s = sources[i];
201                         // terminated and emptied sources are ignored
202                         if (s == null) {
203                             doneCount++;
204                             continue;
205                         }
206                         // read the terminal indicator first
207                         boolean d = s.done;
208                         // peek into the queue
209                         Object o = s.queue.peek();
210                         // no value available
211                         if (o == null) {
212                             // because it terminated?
213                             if (d) {
214                                 sources[i] = null;
215                                 doneCount++;
216                                 continue;
217                             }
218                             // otherwise, indicate not all queues are ready
219                             fullRow = false;
220                             break;
221                         }
222                         // if we already found a value, compare it against the
223                         // current
224                         if (hasAtLeastOne) {
225                             T v = nl.getValue(o);
226                             int c = comparator.compare(minimum, v);
227                             if (c > 0) {
228                                 minimum = v;
229                                 toPoll = i;
230                             }
231                         } else {
232                             // this is the first value found
233                             minimum = nl.getValue(o);
234                             hasAtLeastOne = true;
235                             toPoll = i;
236                         }
237                     }
238                     // in case all of the sources completed
239                     if (doneCount == n) {
240                         reportErrorOrComplete(child);
241                         return;
242                     }
243                     // if there was a full row of available values
244                     if (fullRow) {
245                         // given the winner
246                         if (toPoll >= 0) {
247                             SourceSubscriber<T> s = sources[toPoll];
248                             // remove the winning value from its queue
249                             s.queue.poll();
250                             // request replenishment
251                             s.requestMore(1);
252                         }
253                         // emit the smallest
254                         child.onNext(minimum);
255                         // decrement the available request and increment the
256                         // emit count
257                         if (r != Long.MAX_VALUE) {
258                             r--;
259                             e++;
260                         }
261                     } else {
262                         // if some sources weren't ready, just quit
263                         break;
264                     }
265                 }
266 
267                 // if there was emission, adjust the downstream request amount
268                 if (e != 0L) {
269                     addAndGet(-e);
270                 }
271 
272                 synchronized (this) {
273                     if (!missed) {
274                         emitting = false;
275                         return;
276                     }
277                     missed = false;
278                 }
279             }
280         }
281 
282         void reportErrorOrComplete(Subscriber<? super T> child) {
283             if (delayErrors && !errors.isEmpty()) {
284                 if (errors.size() == 1) {
285                     child.onError(errors.poll());
286                 } else {
287                     child.onError(new CompositeException(errors));
288                 }
289             } else {
290                 child.onCompleted();
291             }
292         }
293     }
294 
295     static final class SourceSubscriber<T> extends Subscriber<T> {
296         final RxRingBuffer queue;
297         final MergeProducer<T> parent;
298         volatile boolean done;
299 
300         public SourceSubscriber(MergeProducer<T> parent) {
301             queue = RxRingBuffer.getSpscInstance();
302             this.parent = parent;
303         }
304 
305         @Override
306         public void onStart() {
307             add(queue);
308             request(RxRingBuffer.SIZE);
309         }
310 
311         public void requestMore(long n) {
312             request(n);
313         }
314 
315         @Override
316         public void onNext(T t) {
317             try {
318                 queue.onNext(parent.nl.next(t));
319             } catch (MissingBackpressureException mbe) {
320                 try {
321                     onError(mbe);
322                 } finally {
323                     unsubscribe();
324                 }
325                 return;
326             } catch (IllegalStateException ex) {
327                 if (!isUnsubscribed()) {
328                     try {
329                         onError(ex);
330                     } finally {
331                         unsubscribe();
332                     }
333                 }
334                 return;
335             }
336             parent.emit();
337         }
338 
339         @Override
340         public void onError(Throwable e) {
341             done = true;
342             parent.error(e);
343         }
344 
345         @Override
346         public void onCompleted() {
347             done = true;
348             parent.emit();
349         }
350     }
351 }