1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.util.concurrent.atomic.AtomicInteger;
4 import java.util.concurrent.atomic.AtomicLong;
5 import java.util.concurrent.atomic.AtomicReference;
6
7 import org.reactivestreams.Subscriber;
8 import org.reactivestreams.Subscription;
9
10 import com.github.davidmoten.guavamini.Preconditions;
11
12 import io.reactivex.Flowable;
13 import io.reactivex.FlowableSubscriber;
14 import io.reactivex.Observable;
15 import io.reactivex.Observer;
16 import io.reactivex.disposables.Disposable;
17 import io.reactivex.disposables.Disposables;
18 import io.reactivex.exceptions.Exceptions;
19 import io.reactivex.functions.Function;
20 import io.reactivex.internal.fuseable.SimplePlainQueue;
21 import io.reactivex.internal.queue.SpscLinkedArrayQueue;
22 import io.reactivex.internal.subscriptions.SubscriptionHelper;
23 import io.reactivex.internal.util.BackpressureHelper;
24 import io.reactivex.plugins.RxJavaPlugins;
25
26 public final class FlowableRepeatingTransform<T> extends Flowable<T> {
27
28 private final Flowable<T> source;
29 private final Function<? super Flowable<T>, ? extends Flowable<T>> transform;
30 private final int maxChained;
31 private final long maxIterations;
32 private final Function<Observable<T>, ? extends Observable<?>> tester;
33
34 public FlowableRepeatingTransform(Flowable<T> source,
35 Function<? super Flowable<T>, ? extends Flowable<T>> transform, int maxChained,
36 long maxIterations, Function<Observable<T>, Observable<?>> tester) {
37 Preconditions.checkArgument(maxChained > 0, "maxChained must be > 0");
38 Preconditions.checkArgument(maxIterations > 0, "maxIterations must be > 0");
39 Preconditions.checkNotNull(transform, "transform must not be null");
40 Preconditions.checkNotNull(tester, "tester must not be null");
41 this.source = source;
42 this.transform = transform;
43 this.maxChained = maxChained;
44 this.maxIterations = maxIterations;
45 this.tester = tester;
46 }
47
48 @Override
49 protected void subscribeActual(Subscriber<? super T> child) {
50
51 Flowable<T> f;
52 try {
53 f = transform.apply(source);
54 } catch (Exception e) {
55 Exceptions.throwIfFatal(e);
56 child.onSubscribe(SubscriptionHelper.CANCELLED);
57 child.onError(e);
58 return;
59 }
60 AtomicReference<Chain<T>> chainRef = new AtomicReference<Chain<T>>();
61 DestinationSerializedSubject<T> destination = new DestinationSerializedSubject<T>(child,
62 chainRef);
63 Chain<T> chain = new Chain<T>(transform, destination, maxIterations, maxChained, tester);
64 chainRef.set(chain);
65
66
67 destination.subscribe(child);
68 ChainedReplaySubject<T> sub = ChainedReplaySubject.create(destination, chain, tester);
69 chain.initialize(sub);
70 f.onTerminateDetach()
71 .subscribe(sub);
72 }
73
74 private static enum EventType {
75 TESTER_ADD, TESTER_DONE, TESTER_COMPLETE_OR_CANCEL, NEXT, ERROR, COMPLETE;
76 }
77
78 private static final class Event<T> {
79
80 final EventType eventType;
81 final ChainedReplaySubject<T> subject;
82 final Subscriber<? super T> subscriber;
83 final T t;
84 final Throwable error;
85
86 Event(EventType eventType, ChainedReplaySubject<T> subject,
87 Subscriber<? super T> subscriber, T t, Throwable error) {
88 this.eventType = eventType;
89 this.subject = subject;
90 this.subscriber = subscriber;
91 this.t = t;
92 this.error = error;
93 }
94 }
95
96 @SuppressWarnings("serial")
97 private static final class Chain<T> extends AtomicInteger implements Subscription {
98
99 private final Function<? super Flowable<T>, ? extends Flowable<T>> transform;
100 private final SimplePlainQueue<Event<T>> queue;
101 private final DestinationSerializedSubject<T> destination;
102 private final long maxIterations;
103 private final int maxChained;
104 private final Function<Observable<T>, ? extends Observable<?>> test;
105
106
107 private int iteration = 1;
108 private int length;
109 private ChainedReplaySubject<T> finalSubscriber;
110 private boolean destinationAttached;
111 private volatile boolean cancelled;
112
113 Chain(Function<? super Flowable<T>, ? extends Flowable<T>> transform,
114 DestinationSerializedSubject<T> destination, long maxIterations, int maxChained,
115 Function<Observable<T>, ? extends Observable<?>> test) {
116 this.transform = transform;
117 this.destination = destination;
118 this.maxIterations = maxIterations;
119 this.maxChained = maxChained;
120 this.test = test;
121 this.queue = new SpscLinkedArrayQueue<Event<T>>(16);
122 }
123
124 void initialize(ChainedReplaySubject<T> subject) {
125 finalSubscriber = subject;
126 if (maxIterations == 1) {
127 finalSubscriber.subscribe(destination);
128 destinationAttached = true;
129 }
130 }
131
132 void tryAddSubscriber(ChainedReplaySubject<T> subject) {
133 queue.offer(new Event<T>(EventType.TESTER_ADD, subject, null, null, null));
134 drain();
135 }
136
137 void done(ChainedReplaySubject<T> subject) {
138 queue.offer(new Event<T>(EventType.TESTER_DONE, subject, null, null, null));
139 drain();
140 }
141
142 void completeOrCancel(ChainedReplaySubject<T> subject) {
143 queue.offer(
144 new Event<T>(EventType.TESTER_COMPLETE_OR_CANCEL, subject, null, null, null));
145 drain();
146 }
147
148 public void onError(Subscriber<? super T> child, Throwable err) {
149 queue.offer(new Event<T>(EventType.ERROR, null, child, null, err));
150 drain();
151
152 }
153
154 public void onCompleted(Subscriber<? super T> child) {
155 queue.offer(new Event<T>(EventType.COMPLETE, null, child, null, null));
156 drain();
157
158 }
159
160 public void onNext(Subscriber<? super T> child, T t) {
161 queue.offer(new Event<T>(EventType.NEXT, null, child, t, null));
162 drain();
163 }
164
165 void drain() {
166 if (getAndIncrement() == 0) {
167 if (cancelled) {
168 finalSubscriber.cancel();
169 queue.clear();
170 return;
171 }
172 int missed = 1;
173 while (true) {
174 while (true) {
175 Event<T> v = queue.poll();
176 if (v == null) {
177 break;
178 } else if (v.eventType == EventType.TESTER_ADD) {
179 handleAdd(v);
180 } else if (v.eventType == EventType.TESTER_DONE) {
181 handleDone();
182 } else if (v.eventType == EventType.NEXT) {
183 v.subscriber.onNext(v.t);
184 } else if (v.eventType == EventType.COMPLETE) {
185 v.subscriber.onComplete();
186 } else if (v.eventType == EventType.ERROR) {
187 v.subscriber.onError(v.error);
188 } else {
189 handleCompleteOrCancel(v);
190 }
191 }
192 missed = addAndGet(-missed);
193 if (missed == 0) {
194 break;
195 }
196 }
197 }
198 }
199
200 private void handleAdd(Event<T> v) {
201 debug("ADD " + v.subject);
202 if (!destinationAttached && v.subject == finalSubscriber && length < maxChained
203 && !destinationAttached) {
204 if (iteration <= maxIterations - 1) {
205
206 ChainedReplaySubject<T> sub = ChainedReplaySubject.create(destination, this,
207 test);
208 if (iteration == maxIterations - 1) {
209 sub.subscribe(destination);
210 debug(sub + "subscribed to by destination");
211 destinationAttached = true;
212 }
213 addToChain(sub);
214 finalSubscriber = sub;
215 iteration++;
216 length += 1;
217 }
218 }
219 }
220
221 private void handleDone() {
222 debug("DONE");
223 if (!destinationAttached) {
224 destinationAttached = true;
225 finalSubscriber.subscribe(destination);
226 }
227 }
228
229 private void handleCompleteOrCancel(Event<T> v) {
230 debug("COMPLETE/CANCEL " + v.subject);
231 if (destinationAttached) {
232 return;
233 }
234 if (v.subject == finalSubscriber) {
235
236
237 } else if (iteration < maxIterations - 1) {
238 ChainedReplaySubject<T> sub = ChainedReplaySubject.create(destination, this, test);
239 addToChain(sub);
240 finalSubscriber = sub;
241 iteration++;
242 } else if (iteration == maxIterations - 1) {
243 ChainedReplaySubject<T> sub = ChainedReplaySubject.create(destination, this, test);
244 destinationAttached = true;
245 sub.subscribe(destination);
246 addToChain(sub);
247 debug(sub + "subscribed to by destination");
248 finalSubscriber = sub;
249 iteration++;
250 } else {
251 length--;
252 }
253 }
254
255 private void addToChain(final Subscriber<T> sub) {
256 Flowable<T> f;
257 try {
258 f = transform.apply(finalSubscriber);
259 } catch (Exception e) {
260 Exceptions.throwIfFatal(e);
261 cancelWholeChain();
262 destination.onError(e);
263 return;
264 }
265 log("adding subscriber to " + finalSubscriber);
266 f.onTerminateDetach().subscribe(sub);
267 debug(finalSubscriber + " subscribed to by " + sub);
268 }
269
270 private void cancelWholeChain() {
271 cancelled = true;
272 drain();
273 }
274
275 @Override
276 public void request(long n) {
277
278 }
279
280 @Override
281 public void cancel() {
282 cancelled = true;
283 cancelWholeChain();
284 }
285
286 }
287
288 private static class DestinationSerializedSubject<T> extends Flowable<T>
289 implements FlowableSubscriber<T>, Subscription {
290
291 private final Subscriber<? super T> child;
292 private final AtomicReference<Chain<T>> chain;
293
294 private final AtomicInteger wip = new AtomicInteger();
295 private final AtomicReference<Subscription> parent = new AtomicReference<Subscription>();
296 private final AtomicLong requested = new AtomicLong();
297 private final SimplePlainQueue<T> queue = new SpscLinkedArrayQueue<T>(16);
298 private final AtomicLong deferredRequests = new AtomicLong();
299
300 private Throwable error;
301 private volatile boolean done;
302 private volatile boolean cancelled;
303
304 DestinationSerializedSubject(Subscriber<? super T> child, AtomicReference<Chain<T>> chain) {
305 this.child = child;
306 this.chain = chain;
307 }
308
309 @Override
310 protected void subscribeActual(Subscriber<? super T> child) {
311 debug(this + " subscribed to by " + child);
312 child.onSubscribe(new MultiSubscription(this, chain.get()));
313
314
315 }
316
317 @Override
318 public void onSubscribe(Subscription pr) {
319 parent.set(pr);
320 long r = deferredRequests.getAndSet(-1);
321 if (r > 0L) {
322 debug(this + " requesting of parent " + r);
323 pr.request(r);
324 }
325 drain();
326 }
327
328 @Override
329 public void request(long n) {
330 debug(this + " request " + n);
331 if (SubscriptionHelper.validate(n)) {
332 BackpressureHelper.add(requested, n);
333 while (true) {
334 Subscription p = parent.get();
335 long d = deferredRequests.get();
336 if (d == -1) {
337
338 debug(this + " requesting from parent " + n);
339 p.request(n);
340 break;
341 } else {
342 long d2 = d + n;
343 if (d2 < 0) {
344 d2 = Long.MAX_VALUE;
345 }
346 if (deferredRequests.compareAndSet(d, d2)) {
347 break;
348 }
349 }
350 }
351 drain();
352 }
353 }
354
355 @Override
356 public void cancel() {
357 cancelled = true;
358 SubscriptionHelper.cancel(this.parent);
359 chain.get().cancel();
360 }
361
362 @Override
363 public void onNext(T t) {
364 queue.offer(t);
365 drain();
366 }
367
368 @Override
369 public void onError(Throwable e) {
370 error = e;
371 done = true;
372 drain();
373 }
374
375 @Override
376 public void onComplete() {
377 debug("final complete");
378 done = true;
379 drain();
380 }
381
382 private void drain() {
383
384
385 if (wip.getAndIncrement() == 0) {
386 int missed = 1;
387 while (true) {
388 long r = requested.get();
389 long e = 0;
390 boolean d = done;
391 while (e != r) {
392 if (cancelled) {
393 queue.clear();
394 return;
395 }
396 if (d && terminate()) {
397 return;
398 }
399 T t = queue.poll();
400 if (t == null) {
401 if (d) {
402 cancel();
403 child.onComplete();
404 return;
405 } else {
406 break;
407 }
408 } else {
409 child.onNext(t);
410 e++;
411 }
412 d = done;
413 }
414 if (d && terminate()) {
415 return;
416 }
417 if (e != 0 && r != Long.MAX_VALUE) {
418 r = requested.addAndGet(-e);
419 }
420 missed = wip.addAndGet(-missed);
421 if (missed == 0) {
422 return;
423 }
424 }
425 }
426 }
427
428 private boolean terminate() {
429
430 Throwable err = error;
431 if (err != null) {
432 queue.clear();
433 error = null;
434 cancel();
435 child.onError(err);
436 return true;
437 } else if (queue.isEmpty()) {
438 cancel();
439 child.onComplete();
440 return true;
441 } else {
442 return false;
443 }
444 }
445
446 }
447
448 private static final class Tester<T> extends Observable<T> implements Observer<T> {
449
450 private Observer<? super T> observer;
451
452 @Override
453 protected void subscribeActual(Observer<? super T> observer) {
454 observer.onSubscribe(Disposables.empty());
455 this.observer = observer;
456 }
457
458 @Override
459 public void onSubscribe(Disposable d) {
460 throw new RuntimeException("unexpected");
461 }
462
463 @Override
464 public void onNext(T t) {
465 observer.onNext(t);
466 }
467
468 @Override
469 public void onError(Throwable e) {
470 observer.onError(e);
471 }
472
473 @Override
474 public void onComplete() {
475 observer.onComplete();
476 }
477 }
478
479 private static final class TesterObserver<T> implements Observer<Object> {
480
481 private final Chain<T> chain;
482 private final ChainedReplaySubject<T> subject;
483
484 TesterObserver(Chain<T> chain, ChainedReplaySubject<T> subject) {
485 this.chain = chain;
486 this.subject = subject;
487 }
488
489 @Override
490 public void onSubscribe(Disposable d) {
491
492 }
493
494 @Override
495 public void onNext(Object t) {
496 debug(subject + " TestObserver emits add " + t);
497 chain.tryAddSubscriber(subject);
498 }
499
500 @Override
501 public void onError(Throwable e) {
502 chain.cancel();
503 subject.destination().onError(e);
504 }
505
506 @Override
507 public void onComplete() {
508 debug(subject + " TestObserver emits done");
509 chain.done(subject);
510 }
511 }
512
513
514
515
516
517
518
519
520
521 private static final class ChainedReplaySubject<T> extends Flowable<T>
522 implements FlowableSubscriber<T>, Subscription {
523
524
525 private final DestinationSerializedSubject<T> destination;
526 private final Chain<T> chain;
527
528
529 private final SimplePlainQueue<T> queue = new SpscLinkedArrayQueue<T>(16);
530 private final AtomicLong requested = new AtomicLong();
531 private final AtomicReference<Requests<T>> requests = new AtomicReference<Requests<T>>(
532 new Requests<T>(null, 0, 0, null));
533 private final AtomicInteger wip = new AtomicInteger();
534 private final Tester<T> tester;
535
536
537 private volatile boolean done;
538
539 private Throwable error;
540 private volatile boolean cancelled;
541 private final Function<Observable<T>, ? extends Observable<?>> test;
542
543 static <T> ChainedReplaySubject<T> create(DestinationSerializedSubject<T> destination,
544 Chain<T> chain, Function<Observable<T>, ? extends Observable<?>> test) {
545 ChainedReplaySubject<T> c = new ChainedReplaySubject<T>(destination, chain, test);
546 c.init();
547 return c;
548 }
549
550 private ChainedReplaySubject(DestinationSerializedSubject<T> destination, Chain<T> chain,
551 Function<Observable<T>, ? extends Observable<?>> test) {
552 this.destination = destination;
553 this.chain = chain;
554 this.test = test;
555 this.tester = new Tester<T>();
556 }
557
558 private static final class Requests<T> {
559 final Subscription parent;
560 final long unreconciled;
561 final long deferred;
562 final Subscriber<? super T> child;
563
564 Requests(Subscription parent, long unreconciled, long deferred,
565 Subscriber<? super T> child) {
566 this.parent = parent;
567 this.unreconciled = unreconciled;
568 this.deferred = deferred;
569 this.child = child;
570 }
571 }
572
573 private void init() {
574 Observable<?> o;
575 try {
576 o = test.apply(tester);
577 } catch (Exception e) {
578
579 throw new RuntimeException(e);
580 }
581 o.subscribe(new TesterObserver<T>(chain, this));
582 }
583
584 DestinationSerializedSubject<T> destination() {
585 return destination;
586 }
587
588 @Override
589 public void onSubscribe(Subscription parent) {
590 while (true) {
591 Requests<T> r = requests.get();
592 Requests<T> r2;
593 if (r.deferred == 0) {
594 r2 = new Requests<T>(parent, r.unreconciled + 1, 0, r.child);
595 if (requests.compareAndSet(r, r2)) {
596 parent.request(1);
597 break;
598 }
599 } else {
600 r2 = new Requests<T>(parent, r.unreconciled, 0, r.child);
601 if (requests.compareAndSet(r, r2)) {
602 parent.request(r.deferred);
603 break;
604 }
605 }
606 }
607 drain();
608 }
609
610 @Override
611 protected void subscribeActual(Subscriber<? super T> child) {
612 debug(this + " subscribed with " + child);
613 while (true) {
614 Requests<T> r = requests.get();
615 Requests<T> r2 = new Requests<T>(r.parent, r.unreconciled, r.deferred, child);
616 if (requests.compareAndSet(r, r2)) {
617 break;
618 }
619 }
620 child.onSubscribe(this);
621 drain();
622 }
623
624 @Override
625 public void request(long n) {
626 debug(this + " request " + n);
627 if (SubscriptionHelper.validate(n)) {
628 BackpressureHelper.add(requested, n);
629 while (true) {
630 Requests<T> r = requests.get();
631 Requests<T> r2;
632 if (r.parent == null) {
633 long d = r.deferred + n;
634 if (d < 0) {
635 d = Long.MAX_VALUE;
636 }
637 r2 = new Requests<T>(r.parent, r.unreconciled, d, r.child);
638 if (requests.compareAndSet(r, r2)) {
639 break;
640 }
641 } else {
642 long x = n + r.deferred - r.unreconciled;
643 long u = Math.max(0, -x);
644 r2 = new Requests<T>(r.parent, u, 0, r.child);
645 if (requests.compareAndSet(r, r2)) {
646 if (x > 0) {
647 r.parent.request(x);
648 }
649 break;
650 }
651 }
652 }
653 drain();
654 }
655 }
656
657 @Override
658 public void onNext(T t) {
659 debug(this + " arrived " + t);
660 if (done) {
661 return;
662 }
663 queue.offer(t);
664 tester.onNext(t);
665 while (true) {
666 Requests<T> r = requests.get();
667 Requests<T> r2;
668 if (r.child == null) {
669 r2 = new Requests<T>(r.parent, r.unreconciled + 1, r.deferred, r.child);
670 if (requests.compareAndSet(r, r2)) {
671
672 r.parent.request(1);
673 break;
674 }
675 } else {
676 r2 = new Requests<T>(r.parent, r.unreconciled, 0, r.child);
677 if (requests.compareAndSet(r, r2)) {
678 break;
679 }
680 }
681 }
682 drain();
683 }
684
685 @Override
686 public void onComplete() {
687 debug(this + " complete");
688 if (done) {
689 return;
690 }
691 done = true;
692 cancelParent();
693 debug(this + " emits complete to tester");
694 tester.onComplete();
695 drain();
696 }
697
698 @Override
699 public void onError(Throwable t) {
700 debug(this + " error " + t);
701 if (done) {
702 RxJavaPlugins.onError(t);
703 return;
704 }
705 error = t;
706 done = true;
707 tester.onError(t);
708 drain();
709 }
710
711 private void drain() {
712
713
714 if (wip.getAndIncrement() == 0) {
715 int missed = 1;
716 while (true) {
717 long r = requested.get();
718 long e = 0;
719 boolean d = done;
720 while (e != r) {
721 if (cancelled) {
722 queue.clear();
723 return;
724 }
725 Subscriber<? super T> child = requests.get().child;
726 if (child == null) {
727 break;
728 }
729 Throwable err = error;
730 if (err != null) {
731 queue.clear();
732 error = null;
733 cancel();
734 chain.onError(child, err);
735 return;
736 }
737
738 T t = queue.poll();
739 if (t == null) {
740 if (d) {
741 cancel();
742 chain.onCompleted(child);
743 return;
744 } else {
745 break;
746 }
747 } else {
748 debug(this + " emitting " + t + " to " + requests.get().child + ":"
749 + requests.get().child.getClass().getSimpleName());
750 chain.onNext(child, t);
751 e++;
752 }
753 d = done;
754 }
755 if (d && queue.isEmpty() && terminate()) {
756 return;
757 }
758 if (e != 0 && r != Long.MAX_VALUE) {
759 r = requested.addAndGet(-e);
760 }
761 missed = wip.addAndGet(-missed);
762 if (missed == 0) {
763 return;
764 }
765 }
766 }
767 }
768
769 private boolean terminate() {
770 Subscriber<? super T> child = requests.get().child;
771 if (child != null) {
772 Throwable err = error;
773 if (err != null) {
774 queue.clear();
775 error = null;
776 cancel();
777 chain.onError(child, err);
778 return true;
779 } else {
780 cancel();
781 chain.onCompleted(child);
782 return true;
783 }
784 }
785 return false;
786 }
787
788 @Override
789 public void cancel() {
790 if (!cancelled) {
791 cancelled = true;
792 cancelParentTryToAddSubscriberToChain();
793 }
794 }
795
796 private void cancelParentTryToAddSubscriberToChain() {
797 cancelParent();
798 chain.completeOrCancel(this);
799 }
800
801 private void cancelParent() {
802 Subscription par = requests.get().parent;
803 if (par != null) {
804 par.cancel();
805 }
806 }
807
808 }
809
810 private static final class MultiSubscription implements Subscription {
811
812 private final Subscription primary;
813 private final Subscription secondary;
814
815 MultiSubscription(Subscription primary, Subscription secondary) {
816 this.primary = primary;
817 this.secondary = secondary;
818 }
819
820 @Override
821 public void request(long n) {
822 primary.request(n);
823 }
824
825 @Override
826 public void cancel() {
827 primary.cancel();
828 secondary.cancel();
829 }
830
831 }
832
833 static void debug(String message) {
834
835 }
836
837 static void log(String message) {
838
839 }
840
841 }