1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.util.ArrayList;
4 import java.util.Collection;
5 import java.util.Comparator;
6 import java.util.List;
7 import java.util.Queue;
8 import java.util.concurrent.atomic.AtomicLong;
9
10 import rx.Observable;
11 import rx.Observable.OnSubscribe;
12 import rx.Producer;
13 import rx.Subscriber;
14 import rx.exceptions.CompositeException;
15 import rx.exceptions.MissingBackpressureException;
16 import rx.internal.operators.BackpressureUtils;
17 import rx.internal.operators.NotificationLite;
18 import rx.internal.util.RxRingBuffer;
19 import rx.internal.util.unsafe.MpscLinkedQueue;
20
21
22
23
24
25
26
27 public final class OrderedMerge<T> implements OnSubscribe<T> {
28 final List<Observable<T>> sources;
29 final Comparator<? super T> comparator;
30 final boolean delayErrors;
31
32 public static <U extends Comparable<? super U>> Observable<U> create(
33 Collection<Observable<U>> sources) {
34 return create(sources, false);
35 }
36
37 public static <U> Observable<U> create(Collection<Observable<U>> sources,
38 Comparator<? super U> comparator) {
39 return create(sources, comparator, false);
40 }
41
42 public static <U extends Comparable<? super U>> Observable<U> create(
43 Collection<Observable<U>> sources, boolean delayErrors) {
44 return Observable.create(new OrderedMerge<U>(sources, new Comparator<U>() {
45 @Override
46 public int compare(U o1, U o2) {
47 return o1.compareTo(o2);
48 }
49 }, delayErrors));
50 }
51
52 public static <U> Observable<U> create(Collection<Observable<U>> sources,
53 Comparator<? super U> comparator, boolean delayErrors) {
54 return Observable.create(new OrderedMerge<U>(sources, comparator, delayErrors));
55 }
56
57 private OrderedMerge(Collection<Observable<T>> sources,
58 Comparator<? super T> comparator, boolean delayErrors) {
59 this.sources = sources instanceof List ? (List<Observable<T>>) sources
60 : new ArrayList<Observable<T>>(sources);
61 this.comparator = comparator;
62 this.delayErrors = delayErrors;
63 }
64
65 @Override
66 public void call(Subscriber<? super T> child) {
67 @SuppressWarnings("unchecked")
68 SourceSubscriber<T>[] sources = new SourceSubscriber[this.sources.size()];
69 MergeProducer<T> mp = new MergeProducer<T>(sources, child, comparator, delayErrors);
70 for (int i = 0; i < sources.length; i++) {
71 if (child.isUnsubscribed()) {
72 return;
73 }
74 SourceSubscriber<T> s = new SourceSubscriber<T>(mp);
75 sources[i] = s;
76 child.add(s);
77 }
78 mp.set(0);
79 child.setProducer(mp);
80 int i = 0;
81 for (Observable<? extends T> source : this.sources) {
82 if (child.isUnsubscribed()) {
83 return;
84 }
85 source.unsafeSubscribe(sources[i]);
86 i++;
87 }
88 }
89
90 static final class MergeProducer<T> extends AtomicLong implements Producer {
91
92 private static final long serialVersionUID = -812969080497027108L;
93
94 final NotificationLite<T> nl = NotificationLite.instance();
95
96 final boolean delayErrors;
97 final Comparator<? super T> comparator;
98 @SuppressWarnings("rawtypes")
99 final SourceSubscriber[] sources;
100 final Subscriber<? super T> child;
101
102 final Queue<Throwable> errors;
103
104 boolean emitting;
105 boolean missed;
106
107 @SuppressWarnings("rawtypes")
108 public MergeProducer(SourceSubscriber[] sources, Subscriber<? super T> child,
109 Comparator<? super T> comparator, boolean delayErrors) {
110 this.sources = sources;
111 this.delayErrors = delayErrors;
112 this.errors = new MpscLinkedQueue<Throwable>();
113 this.child = child;
114 this.comparator = comparator;
115 }
116
117 @Override
118 public void request(long n) {
119 BackpressureUtils.getAndAddRequest(this, n);
120 emit();
121 }
122
123 public void error(Throwable ex) {
124 errors.offer(ex);
125 emit();
126 }
127
128 public void emit() {
129 synchronized (this) {
130 if (emitting) {
131 missed = true;
132 return;
133 }
134 emitting = true;
135 }
136
137 @SuppressWarnings("unchecked")
138 final SourceSubscriber<T>[] sources = this.sources;
139 final int n = sources.length;
140 final Subscriber<? super T> child = this.child;
141
142 for (;;) {
143 if (child.isUnsubscribed()) {
144 return;
145 }
146
147 if (!delayErrors && !errors.isEmpty()) {
148 child.onError(errors.poll());
149 return;
150 }
151
152 long r = get();
153
154 long e = 0;
155
156
157 if (r == 0) {
158 int doneCount = 0;
159
160 for (SourceSubscriber<T> s : sources) {
161
162 if (s == null) {
163 doneCount++;
164 } else {
165
166 if (s.done && s.queue.isEmpty()) {
167 doneCount++;
168 }
169 }
170 }
171
172 if (doneCount == n) {
173 reportErrorOrComplete(child);
174 return;
175 }
176 }
177
178 while (r != 0L) {
179 if (child.isUnsubscribed()) {
180 return;
181 }
182
183 if (!delayErrors && !errors.isEmpty()) {
184 child.onError(errors.poll());
185 return;
186 }
187
188 boolean fullRow = true;
189
190 boolean hasAtLeastOne = false;
191
192 T minimum = null;
193
194
195 int toPoll = -1;
196
197 int doneCount = 0;
198
199 for (int i = 0; i < n; i++) {
200 SourceSubscriber<T> s = sources[i];
201
202 if (s == null) {
203 doneCount++;
204 continue;
205 }
206
207 boolean d = s.done;
208
209 Object o = s.queue.peek();
210
211 if (o == null) {
212
213 if (d) {
214 sources[i] = null;
215 doneCount++;
216 continue;
217 }
218
219 fullRow = false;
220 break;
221 }
222
223
224 if (hasAtLeastOne) {
225 T v = nl.getValue(o);
226 int c = comparator.compare(minimum, v);
227 if (c > 0) {
228 minimum = v;
229 toPoll = i;
230 }
231 } else {
232
233 minimum = nl.getValue(o);
234 hasAtLeastOne = true;
235 toPoll = i;
236 }
237 }
238
239 if (doneCount == n) {
240 reportErrorOrComplete(child);
241 return;
242 }
243
244 if (fullRow) {
245
246 if (toPoll >= 0) {
247 SourceSubscriber<T> s = sources[toPoll];
248
249 s.queue.poll();
250
251 s.requestMore(1);
252 }
253
254 child.onNext(minimum);
255
256
257 if (r != Long.MAX_VALUE) {
258 r--;
259 e++;
260 }
261 } else {
262
263 break;
264 }
265 }
266
267
268 if (e != 0L) {
269 addAndGet(-e);
270 }
271
272 synchronized (this) {
273 if (!missed) {
274 emitting = false;
275 return;
276 }
277 missed = false;
278 }
279 }
280 }
281
282 void reportErrorOrComplete(Subscriber<? super T> child) {
283 if (delayErrors && !errors.isEmpty()) {
284 if (errors.size() == 1) {
285 child.onError(errors.poll());
286 } else {
287 child.onError(new CompositeException(errors));
288 }
289 } else {
290 child.onCompleted();
291 }
292 }
293 }
294
295 static final class SourceSubscriber<T> extends Subscriber<T> {
296 final RxRingBuffer queue;
297 final MergeProducer<T> parent;
298 volatile boolean done;
299
300 public SourceSubscriber(MergeProducer<T> parent) {
301 queue = RxRingBuffer.getSpscInstance();
302 this.parent = parent;
303 }
304
305 @Override
306 public void onStart() {
307 add(queue);
308 request(RxRingBuffer.SIZE);
309 }
310
311 public void requestMore(long n) {
312 request(n);
313 }
314
315 @Override
316 public void onNext(T t) {
317 try {
318 queue.onNext(parent.nl.next(t));
319 } catch (MissingBackpressureException mbe) {
320 try {
321 onError(mbe);
322 } finally {
323 unsubscribe();
324 }
325 return;
326 } catch (IllegalStateException ex) {
327 if (!isUnsubscribed()) {
328 try {
329 onError(ex);
330 } finally {
331 unsubscribe();
332 }
333 }
334 return;
335 }
336 parent.emit();
337 }
338
339 @Override
340 public void onError(Throwable e) {
341 done = true;
342 parent.error(e);
343 }
344
345 @Override
346 public void onCompleted() {
347 done = true;
348 parent.emit();
349 }
350 }
351 }