1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.concurrent.atomic.AtomicBoolean;
6 import java.util.concurrent.atomic.AtomicInteger;
7 import java.util.concurrent.atomic.AtomicLong;
8 import java.util.concurrent.atomic.AtomicReference;
9
10 import org.reactivestreams.Publisher;
11 import org.reactivestreams.Subscriber;
12 import org.reactivestreams.Subscription;
13
14 import com.github.davidmoten.guavamini.Preconditions;
15 import com.github.davidmoten.util.RingBuffer;
16
17 import io.reactivex.Flowable;
18 import io.reactivex.internal.fuseable.SimplePlainQueue;
19 import io.reactivex.internal.queue.MpscLinkedQueue;
20 import io.reactivex.internal.subscriptions.SubscriptionHelper;
21 import io.reactivex.internal.util.BackpressureHelper;
22
23 public final class FlowableMergeInterleave<T> extends Flowable<T> {
24
25 private final int maxConcurrent;
26 private final Publisher<? extends Publisher<? extends T>> sources;
27 private final int batchSize;
28 private boolean delayErrors;
29
30 public FlowableMergeInterleave(Publisher<? extends Publisher<? extends T>> sources,
31 int maxConcurrent, int batchSize, boolean delayErrors) {
32 this.sources = sources;
33 this.maxConcurrent = maxConcurrent;
34 this.batchSize = batchSize;
35 this.delayErrors = delayErrors;
36 }
37
38 @Override
39 protected void subscribeActual(Subscriber<? super T> s) {
40 MergeInterleaveSubscription<T> subscription = new MergeInterleaveSubscription<T>(sources,
41 maxConcurrent, batchSize, delayErrors, s);
42 s.onSubscribe(subscription);
43 }
44
45 private static final class MergeInterleaveSubscription<T> extends AtomicInteger
46 implements Subscription, Subscriber<Publisher<? extends T>> {
47
48 private static final long serialVersionUID = -6416801556759306113L;
49 private static final Object SOURCES_COMPLETE = new Object();
50 private final AtomicBoolean once = new AtomicBoolean();
51 private final Publisher<? extends Publisher<? extends T>> sources;
52 private final int maxConcurrent;
53 private final int batchSize;
54 private final boolean delayErrors;
55 private Subscriber<? super T> subscriber;
56 private Subscription subscription;
57 private volatile boolean cancelled;
58 private Throwable error;
59 private volatile boolean finished;
60 private final AtomicLong requested = new AtomicLong();
61 private long emitted;
62 private final RingBuffer<BatchFinished> batchFinished;
63
64
65 private final SimplePlainQueue<Object> queue;
66 private final List<SourceSubscriber<T>> sourceSubscribers = new ArrayList<SourceSubscriber<T>>();
67 private boolean sourcesComplete;
68 private long sourcesCount;
69
70 public MergeInterleaveSubscription(Publisher<? extends Publisher<? extends T>> sources,
71 int maxConcurrent, int batchSize, boolean delayErrors,
72 Subscriber<? super T> subscriber) {
73 this.sources = sources;
74 this.maxConcurrent = maxConcurrent;
75 this.batchSize = batchSize;
76 this.delayErrors = delayErrors;
77 this.subscriber = subscriber;
78 this.queue = new MpscLinkedQueue<Object>();
79 this.batchFinished = RingBuffer.create(maxConcurrent + 1);
80 }
81
82 @Override
83 public void request(long n) {
84 if (SubscriptionHelper.validate(n)) {
85 BackpressureHelper.add(requested, n);
86 if (once.compareAndSet(false, true)) {
87 sources.subscribe(this);
88 subscription.request(maxConcurrent);
89 }
90 drain();
91 }
92 }
93
94 @Override
95 public void cancel() {
96 this.cancelled = true;
97 }
98
99 @Override
100 public void onSubscribe(Subscription s) {
101 this.subscription = s;
102 drain();
103 }
104
105 @Override
106 public void onNext(Publisher<? extends T> f) {
107 sourcesCount++;
108 queue.offer(new SourceArrived<T>(f));
109 if (sourcesCount >= maxConcurrent) {
110 drain();
111 }
112 }
113
114 @Override
115 public void onError(Throwable t) {
116 error = t;
117 finished = true;
118 drain();
119 }
120
121 @Override
122 public void onComplete() {
123 queue.offer(SOURCES_COMPLETE);
124 drain();
125 }
126
127 private boolean tryCancelled() {
128 if (cancelled) {
129 cleanup();
130 return true;
131 } else {
132 return false;
133 }
134 }
135
136 @SuppressWarnings("unchecked")
137 private void drain() {
138 if (getAndIncrement() == 0) {
139 int missed = 1;
140 long e = emitted;
141 long r = requested.get();
142 while (true) {
143 if (tryCancelled()) {
144 return;
145 }
146 if (e == r) {
147 r = requested.get();
148 }
149 while (e != r) {
150 boolean d = finished;
151 if (d && !delayErrors) {
152 Throwable err = error;
153 if (err != null) {
154 error = null;
155 cleanup();
156 subscriber.onError(err);
157 return;
158 }
159 }
160 Object o = queue.poll();
161 if (o == null) {
162 if (d) {
163 Throwable err = error;
164 if (err != null) {
165 error = null;
166 cleanup();
167 subscriber.onError(err);
168 } else {
169 subscriber.onComplete();
170 }
171 return;
172 } else {
173 break;
174 }
175 } else {
176 if (o instanceof BatchFinished) {
177 handleBatchFinished((BatchFinished) o);
178 } else if (o instanceof SourceArrived) {
179 handleSourceArrived((SourceArrived<T>) o);
180 } else if (o instanceof SourceComplete) {
181 handleSourceComplete((SourceComplete<T>) o);
182 } else if (o == SOURCES_COMPLETE) {
183 handleSourcesComplete();
184 } else {
185 subscriber.onNext((T) o);
186 e++;
187 }
188 }
189 if (tryCancelled()) {
190 return;
191 }
192 }
193 emitted = e;
194 missed = addAndGet(-missed);
195 if (missed == 0) {
196 return;
197 }
198 }
199 }
200 }
201
202 private void handleSourcesComplete() {
203 sourcesComplete = true;
204 if (sourceSubscribers.isEmpty()) {
205 finished = true;
206 }
207 }
208
209 private void handleBatchFinished(BatchFinished b) {
210 Preconditions.checkNotNull(b);
211 boolean ok = batchFinished.offer(b);
212 assert ok;
213 batchFinished.poll().requestMore();
214 }
215
216 private void cleanup() {
217 subscription.cancel();
218 for (SourceSubscriber<T> s : sourceSubscribers) {
219 s.cancel();
220 }
221 sourceSubscribers.clear();
222 queue.clear();
223 batchFinished.clear();
224 }
225
226 private void handleSourceArrived(SourceArrived<T> event) {
227 SourceSubscriber<T> subscriber = new SourceSubscriber<T>(this);
228 sourceSubscribers.add(subscriber);
229 queue.offer(subscriber);
230 event.publisher.subscribe(subscriber);
231 }
232
233 private void handleSourceComplete(SourceComplete<T> event) {
234 sourceSubscribers.remove(event.subscriber);
235 if (!sourcesComplete) {
236 subscription.request(1);
237 } else if (sourceSubscribers.isEmpty() && sourcesComplete) {
238 finished = true;
239 }
240 }
241
242 public void sourceError(Throwable t) {
243 error = t;
244 finished = true;
245 drain();
246 }
247
248 public void sourceComplete(SourceSubscriber<T> sourceSubscriber) {
249 queue.offer(new SourceComplete<T>(sourceSubscriber));
250 drain();
251 }
252
253 public void sourceNext(T t, SourceSubscriber<T> sourceSubscriber) {
254 queue.offer(t);
255 if (sourceSubscriber != null) {
256 queue.offer(sourceSubscriber);
257 }
258 drain();
259 }
260 }
261
262 private final static class SourceSubscriber<T> implements Subscriber<T>, BatchFinished {
263
264 private final MergeInterleaveSubscription<T> parent;
265 private AtomicReference<Subscription> subscription = new AtomicReference<Subscription>();
266 private int count = 0;
267
268 SourceSubscriber(MergeInterleaveSubscription<T> parent) {
269 this.parent = parent;
270 }
271
272 @Override
273 public void onSubscribe(Subscription s) {
274 SubscriptionHelper.setOnce(subscription, s);
275 }
276
277 @Override
278 public void onNext(T t) {
279 count++;
280 boolean batchFinished = count == parent.batchSize;
281 if (batchFinished) {
282 count = 0;
283 }
284 parent.sourceNext(t, batchFinished ? this : null);
285 }
286
287 @Override
288 public void onError(Throwable t) {
289 parent.sourceError(t);
290 }
291
292 @Override
293 public void onComplete() {
294 parent.sourceComplete(this);
295 }
296
297 @Override
298 public void requestMore() {
299 subscription.get().request(parent.batchSize);
300 }
301
302 void cancel() {
303 while (true) {
304 Subscription s = subscription.get();
305 if (subscription.compareAndSet(s, SubscriptionHelper.CANCELLED)) {
306 s.cancel();
307 break;
308 }
309 }
310 }
311 }
312
313 private static final class SourceArrived<T> {
314 final Publisher<? extends T> publisher;
315
316 SourceArrived(Publisher<? extends T> publisher) {
317 this.publisher = publisher;
318 }
319 }
320
321 private static final class SourceComplete<T> {
322 final Subscriber<T> subscriber;
323
324 SourceComplete(Subscriber<T> subscriber) {
325 this.subscriber = subscriber;
326 }
327 }
328
329 private interface BatchFinished {
330 void requestMore();
331 }
332
333 }