View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.util.concurrent.atomic.AtomicInteger;
4   import java.util.concurrent.atomic.AtomicLong;
5   import java.util.concurrent.atomic.AtomicReference;
6   
7   import org.reactivestreams.Subscriber;
8   import org.reactivestreams.Subscription;
9   
10  import com.github.davidmoten.guavamini.Preconditions;
11  
12  import io.reactivex.Flowable;
13  import io.reactivex.FlowableSubscriber;
14  import io.reactivex.Observable;
15  import io.reactivex.Observer;
16  import io.reactivex.disposables.Disposable;
17  import io.reactivex.disposables.Disposables;
18  import io.reactivex.exceptions.Exceptions;
19  import io.reactivex.functions.Function;
20  import io.reactivex.internal.fuseable.SimplePlainQueue;
21  import io.reactivex.internal.queue.SpscLinkedArrayQueue;
22  import io.reactivex.internal.subscriptions.SubscriptionHelper;
23  import io.reactivex.internal.util.BackpressureHelper;
24  import io.reactivex.plugins.RxJavaPlugins;
25  
26  public final class FlowableRepeatingTransform<T> extends Flowable<T> {
27  
28      private final Flowable<T> source;
29      private final Function<? super Flowable<T>, ? extends Flowable<T>> transform;
30      private final int maxChained;
31      private final long maxIterations;
32      private final Function<Observable<T>, ? extends Observable<?>> tester;
33  
34      public FlowableRepeatingTransform(Flowable<T> source,
35              Function<? super Flowable<T>, ? extends Flowable<T>> transform, int maxChained,
36              long maxIterations, Function<Observable<T>, Observable<?>> tester) {
37          Preconditions.checkArgument(maxChained > 0, "maxChained must be > 0");
38          Preconditions.checkArgument(maxIterations > 0, "maxIterations must be > 0");
39          Preconditions.checkNotNull(transform, "transform must not be null");
40          Preconditions.checkNotNull(tester, "tester must not be null");
41          this.source = source;
42          this.transform = transform;
43          this.maxChained = maxChained;
44          this.maxIterations = maxIterations;
45          this.tester = tester;
46      }
47  
48      @Override
49      protected void subscribeActual(Subscriber<? super T> child) {
50  
51          Flowable<T> f;
52          try {
53              f = transform.apply(source);
54          } catch (Exception e) {
55              Exceptions.throwIfFatal(e);
56              child.onSubscribe(SubscriptionHelper.CANCELLED);
57              child.onError(e);
58              return;
59          }
60          AtomicReference<Chain<T>> chainRef = new AtomicReference<Chain<T>>();
61          DestinationSerializedSubject<T> destination = new DestinationSerializedSubject<T>(child,
62                  chainRef);
63          Chain<T> chain = new Chain<T>(transform, destination, maxIterations, maxChained, tester);
64          chainRef.set(chain);
65          // destination is not initially subscribed to the chain but will be when
66          // tester function result completes
67          destination.subscribe(child);
68          ChainedReplaySubject<T> sub = ChainedReplaySubject.create(destination, chain, tester);
69          chain.initialize(sub);
70          f.onTerminateDetach() //
71                  .subscribe(sub);
72      }
73  
74      private static enum EventType {
75          TESTER_ADD, TESTER_DONE, TESTER_COMPLETE_OR_CANCEL, NEXT, ERROR, COMPLETE;
76      }
77  
78      private static final class Event<T> {
79  
80          final EventType eventType;
81          final ChainedReplaySubject<T> subject;
82          final Subscriber<? super T> subscriber;
83          final T t;
84          final Throwable error;
85  
86          Event(EventType eventType, ChainedReplaySubject<T> subject,
87                  Subscriber<? super T> subscriber, T t, Throwable error) {
88              this.eventType = eventType;
89              this.subject = subject;
90              this.subscriber = subscriber;
91              this.t = t;
92              this.error = error;
93          }
94      }
95  
96      @SuppressWarnings("serial")
97      private static final class Chain<T> extends AtomicInteger implements Subscription {
98  
99          private final Function<? super Flowable<T>, ? extends Flowable<T>> transform;
100         private final SimplePlainQueue<Event<T>> queue;
101         private final DestinationSerializedSubject<T> destination;
102         private final long maxIterations;
103         private final int maxChained;
104         private final Function<Observable<T>, ? extends Observable<?>> test;
105 
106         // state
107         private int iteration = 1;
108         private int length;
109         private ChainedReplaySubject<T> finalSubscriber;
110         private boolean destinationAttached;
111         private volatile boolean cancelled;
112 
113         Chain(Function<? super Flowable<T>, ? extends Flowable<T>> transform,
114                 DestinationSerializedSubject<T> destination, long maxIterations, int maxChained,
115                 Function<Observable<T>, ? extends Observable<?>> test) {
116             this.transform = transform;
117             this.destination = destination;
118             this.maxIterations = maxIterations;
119             this.maxChained = maxChained;
120             this.test = test;
121             this.queue = new SpscLinkedArrayQueue<Event<T>>(16);
122         }
123 
124         void initialize(ChainedReplaySubject<T> subject) {
125             finalSubscriber = subject;
126             if (maxIterations == 1) {
127                 finalSubscriber.subscribe(destination);
128                 destinationAttached = true;
129             }
130         }
131 
132         void tryAddSubscriber(ChainedReplaySubject<T> subject) {
133             queue.offer(new Event<T>(EventType.TESTER_ADD, subject, null, null, null));
134             drain();
135         }
136 
137         void done(ChainedReplaySubject<T> subject) {
138             queue.offer(new Event<T>(EventType.TESTER_DONE, subject, null, null, null));
139             drain();
140         }
141 
142         void completeOrCancel(ChainedReplaySubject<T> subject) {
143             queue.offer(
144                     new Event<T>(EventType.TESTER_COMPLETE_OR_CANCEL, subject, null, null, null));
145             drain();
146         }
147 
148         public void onError(Subscriber<? super T> child, Throwable err) {
149             queue.offer(new Event<T>(EventType.ERROR, null, child, null, err));
150             drain();
151 
152         }
153 
154         public void onCompleted(Subscriber<? super T> child) {
155             queue.offer(new Event<T>(EventType.COMPLETE, null, child, null, null));
156             drain();
157 
158         }
159 
160         public void onNext(Subscriber<? super T> child, T t) {
161             queue.offer(new Event<T>(EventType.NEXT, null, child, t, null));
162             drain();
163         }
164 
165         void drain() {
166             if (getAndIncrement() == 0) {
167                 if (cancelled) {
168                     finalSubscriber.cancel();
169                     queue.clear();
170                     return;
171                 }
172                 int missed = 1;
173                 while (true) {
174                     while (true) {
175                         Event<T> v = queue.poll();
176                         if (v == null) {
177                             break;
178                         } else if (v.eventType == EventType.TESTER_ADD) {
179                             handleAdd(v);
180                         } else if (v.eventType == EventType.TESTER_DONE) {
181                             handleDone();
182                         } else if (v.eventType == EventType.NEXT) {
183                             v.subscriber.onNext(v.t);
184                         } else if (v.eventType == EventType.COMPLETE) {
185                             v.subscriber.onComplete();
186                         } else if (v.eventType == EventType.ERROR) {
187                             v.subscriber.onError(v.error);
188                         } else {
189                             handleCompleteOrCancel(v);
190                         }
191                     }
192                     missed = addAndGet(-missed);
193                     if (missed == 0) {
194                         break;
195                     }
196                 }
197             }
198         }
199 
200         private void handleAdd(Event<T> v) {
201             debug("ADD " + v.subject);
202             if (!destinationAttached && v.subject == finalSubscriber && length < maxChained
203                     && !destinationAttached) {
204                 if (iteration <= maxIterations - 1) {
205                     // ok to add another subject to the chain
206                     ChainedReplaySubject<T> sub = ChainedReplaySubject.create(destination, this,
207                             test);
208                     if (iteration == maxIterations - 1) {
209                         sub.subscribe(destination);
210                         debug(sub + "subscribed to by destination");
211                         destinationAttached = true;
212                     }
213                     addToChain(sub);
214                     finalSubscriber = sub;
215                     iteration++;
216                     length += 1;
217                 }
218             }
219         }
220 
221         private void handleDone() {
222             debug("DONE");
223             if (!destinationAttached) {
224                 destinationAttached = true;
225                 finalSubscriber.subscribe(destination);
226             }
227         }
228 
229         private void handleCompleteOrCancel(Event<T> v) {
230             debug("COMPLETE/CANCEL " + v.subject);
231             if (destinationAttached) {
232                 return;
233             }
234             if (v.subject == finalSubscriber) {
235                 // TODO what to do here?
236                 // cancelWholeChain();
237             } else if (iteration < maxIterations - 1) {
238                 ChainedReplaySubject<T> sub = ChainedReplaySubject.create(destination, this, test);
239                 addToChain(sub);
240                 finalSubscriber = sub;
241                 iteration++;
242             } else if (iteration == maxIterations - 1) {
243                 ChainedReplaySubject<T> sub = ChainedReplaySubject.create(destination, this, test);
244                 destinationAttached = true;
245                 sub.subscribe(destination);
246                 addToChain(sub);
247                 debug(sub + "subscribed to by destination");
248                 finalSubscriber = sub;
249                 iteration++;
250             } else {
251                 length--;
252             }
253         }
254 
255         private void addToChain(final Subscriber<T> sub) {
256             Flowable<T> f;
257             try {
258                 f = transform.apply(finalSubscriber);
259             } catch (Exception e) {
260                 Exceptions.throwIfFatal(e);
261                 cancelWholeChain();
262                 destination.onError(e);
263                 return;
264             }
265             log("adding subscriber to " + finalSubscriber);
266             f.onTerminateDetach().subscribe(sub);
267             debug(finalSubscriber + " subscribed to by " + sub);
268         }
269 
270         private void cancelWholeChain() {
271             cancelled = true;
272             drain();
273         }
274 
275         @Override
276         public void request(long n) {
277             // ignore, just want to be able to cancel
278         }
279 
280         @Override
281         public void cancel() {
282             cancelled = true;
283             cancelWholeChain();
284         }
285 
286     }
287 
288     private static class DestinationSerializedSubject<T> extends Flowable<T>
289             implements FlowableSubscriber<T>, Subscription {
290 
291         private final Subscriber<? super T> child;
292         private final AtomicReference<Chain<T>> chain;
293 
294         private final AtomicInteger wip = new AtomicInteger();
295         private final AtomicReference<Subscription> parent = new AtomicReference<Subscription>();
296         private final AtomicLong requested = new AtomicLong();
297         private final SimplePlainQueue<T> queue = new SpscLinkedArrayQueue<T>(16);
298         private final AtomicLong deferredRequests = new AtomicLong();
299 
300         private Throwable error;
301         private volatile boolean done;
302         private volatile boolean cancelled;
303 
304         DestinationSerializedSubject(Subscriber<? super T> child, AtomicReference<Chain<T>> chain) {
305             this.child = child;
306             this.chain = chain;
307         }
308 
309         @Override
310         protected void subscribeActual(Subscriber<? super T> child) {
311             debug(this + " subscribed to by " + child);
312             child.onSubscribe(new MultiSubscription(this, chain.get()));
313             // don't need to drain because destination is always subscribed to
314             // this before this is subscribed to parent
315         }
316 
317         @Override
318         public void onSubscribe(Subscription pr) {
319             parent.set(pr);
320             long r = deferredRequests.getAndSet(-1);
321             if (r > 0L) {
322                 debug(this + " requesting of parent " + r);
323                 pr.request(r);
324             }
325             drain();
326         }
327 
328         @Override
329         public void request(long n) {
330             debug(this + " request " + n);
331             if (SubscriptionHelper.validate(n)) {
332                 BackpressureHelper.add(requested, n);
333                 while (true) {
334                     Subscription p = parent.get();
335                     long d = deferredRequests.get();
336                     if (d == -1) {
337                         // parent exists so can request of it
338                         debug(this + " requesting from parent " + n);
339                         p.request(n);
340                         break;
341                     } else {
342                         long d2 = d + n;
343                         if (d2 < 0) {
344                             d2 = Long.MAX_VALUE;
345                         }
346                         if (deferredRequests.compareAndSet(d, d2)) {
347                             break;
348                         }
349                     }
350                 }
351                 drain();
352             }
353         }
354 
355         @Override
356         public void cancel() {
357             cancelled = true;
358             SubscriptionHelper.cancel(this.parent);
359             chain.get().cancel();
360         }
361 
362         @Override
363         public void onNext(T t) {
364             queue.offer(t);
365             drain();
366         }
367 
368         @Override
369         public void onError(Throwable e) {
370             error = e;
371             done = true;
372             drain();
373         }
374 
375         @Override
376         public void onComplete() {
377             debug("final complete");
378             done = true;
379             drain();
380         }
381 
382         private void drain() {
383             // this is a pretty standard drain loop
384             // default is to shortcut errors (don't delay them)
385             if (wip.getAndIncrement() == 0) {
386                 int missed = 1;
387                 while (true) {
388                     long r = requested.get();
389                     long e = 0;
390                     boolean d = done;
391                     while (e != r) {
392                         if (cancelled) {
393                             queue.clear();
394                             return;
395                         }
396                         if (d && terminate()) {
397                             return;
398                         }
399                         T t = queue.poll();
400                         if (t == null) {
401                             if (d) {
402                                 cancel();
403                                 child.onComplete();
404                                 return;
405                             } else {
406                                 break;
407                             }
408                         } else {
409                             child.onNext(t);
410                             e++;
411                         }
412                         d = done;
413                     }
414                     if (d && terminate()) {
415                         return;
416                     }
417                     if (e != 0 && r != Long.MAX_VALUE) {
418                         r = requested.addAndGet(-e);
419                     }
420                     missed = wip.addAndGet(-missed);
421                     if (missed == 0) {
422                         return;
423                     }
424                 }
425             }
426         }
427 
428         private boolean terminate() {
429             // done is true at this point
430             Throwable err = error;
431             if (err != null) {
432                 queue.clear();
433                 error = null;
434                 cancel();
435                 child.onError(err);
436                 return true;
437             } else if (queue.isEmpty()) {
438                 cancel();
439                 child.onComplete();
440                 return true;
441             } else {
442                 return false;
443             }
444         }
445 
446     }
447 
448     private static final class Tester<T> extends Observable<T> implements Observer<T> {
449 
450         private Observer<? super T> observer;
451 
452         @Override
453         protected void subscribeActual(Observer<? super T> observer) {
454             observer.onSubscribe(Disposables.empty());
455             this.observer = observer;
456         }
457 
458         @Override
459         public void onSubscribe(Disposable d) {
460             throw new RuntimeException("unexpected");
461         }
462 
463         @Override
464         public void onNext(T t) {
465             observer.onNext(t);
466         }
467 
468         @Override
469         public void onError(Throwable e) {
470             observer.onError(e);
471         }
472 
473         @Override
474         public void onComplete() {
475             observer.onComplete();
476         }
477     }
478 
479     private static final class TesterObserver<T> implements Observer<Object> {
480 
481         private final Chain<T> chain;
482         private final ChainedReplaySubject<T> subject;
483 
484         TesterObserver(Chain<T> chain, ChainedReplaySubject<T> subject) {
485             this.chain = chain;
486             this.subject = subject;
487         }
488 
489         @Override
490         public void onSubscribe(Disposable d) {
491             // ignore
492         }
493 
494         @Override
495         public void onNext(Object t) {
496             debug(subject + " TestObserver emits add " + t);
497             chain.tryAddSubscriber(subject);
498         }
499 
500         @Override
501         public void onError(Throwable e) {
502             chain.cancel();
503             subject.destination().onError(e);
504         }
505 
506         @Override
507         public void onComplete() {
508             debug(subject + " TestObserver emits done");
509             chain.done(subject);
510         }
511     }
512 
513     /**
514      * Requests minimally of upstream and buffers until this subscriber itself
515      * is subscribed to. A maximum of {@code maxDepthConcurrent} subscribers can
516      * be chained together at any one time.
517      * 
518      * @param <T>
519      *            generic type
520      */
521     private static final class ChainedReplaySubject<T> extends Flowable<T>
522             implements FlowableSubscriber<T>, Subscription {
523 
524         // assigned in constructor
525         private final DestinationSerializedSubject<T> destination;
526         private final Chain<T> chain;
527 
528         // assigned here
529         private final SimplePlainQueue<T> queue = new SpscLinkedArrayQueue<T>(16);
530         private final AtomicLong requested = new AtomicLong();
531         private final AtomicReference<Requests<T>> requests = new AtomicReference<Requests<T>>(
532                 new Requests<T>(null, 0, 0, null));
533         private final AtomicInteger wip = new AtomicInteger();
534         private final Tester<T> tester;
535 
536         // mutable
537         private volatile boolean done;
538         // visibility controlled by `done`
539         private Throwable error;
540         private volatile boolean cancelled;
541         private final Function<Observable<T>, ? extends Observable<?>> test;
542 
543         static <T> ChainedReplaySubject<T> create(DestinationSerializedSubject<T> destination,
544                 Chain<T> chain, Function<Observable<T>, ? extends Observable<?>> test) {
545             ChainedReplaySubject<T> c = new ChainedReplaySubject<T>(destination, chain, test);
546             c.init();
547             return c;
548         }
549 
550         private ChainedReplaySubject(DestinationSerializedSubject<T> destination, Chain<T> chain,
551                 Function<Observable<T>, ? extends Observable<?>> test) {
552             this.destination = destination;
553             this.chain = chain;
554             this.test = test;
555             this.tester = new Tester<T>();
556         }
557 
558         private static final class Requests<T> {
559             final Subscription parent;
560             final long unreconciled;
561             final long deferred;
562             final Subscriber<? super T> child;
563 
564             Requests(Subscription parent, long unreconciled, long deferred,
565                     Subscriber<? super T> child) {
566                 this.parent = parent;
567                 this.unreconciled = unreconciled;
568                 this.deferred = deferred;
569                 this.child = child;
570             }
571         }
572 
573         private void init() {
574             Observable<?> o;
575             try {
576                 o = test.apply(tester);
577             } catch (Exception e) {
578                 // TODO
579                 throw new RuntimeException(e);
580             }
581             o.subscribe(new TesterObserver<T>(chain, this));
582         }
583 
584         DestinationSerializedSubject<T> destination() {
585             return destination;
586         }
587 
588         @Override
589         public void onSubscribe(Subscription parent) {
590             while (true) {
591                 Requests<T> r = requests.get();
592                 Requests<T> r2;
593                 if (r.deferred == 0) {
594                     r2 = new Requests<T>(parent, r.unreconciled + 1, 0, r.child);
595                     if (requests.compareAndSet(r, r2)) {
596                         parent.request(1);
597                         break;
598                     }
599                 } else {
600                     r2 = new Requests<T>(parent, r.unreconciled, 0, r.child);
601                     if (requests.compareAndSet(r, r2)) {
602                         parent.request(r.deferred);
603                         break;
604                     }
605                 }
606             }
607             drain();
608         }
609 
610         @Override
611         protected void subscribeActual(Subscriber<? super T> child) {
612             debug(this + " subscribed with " + child);
613             while (true) {
614                 Requests<T> r = requests.get();
615                 Requests<T> r2 = new Requests<T>(r.parent, r.unreconciled, r.deferred, child);
616                 if (requests.compareAndSet(r, r2)) {
617                     break;
618                 }
619             }
620             child.onSubscribe(this);
621             drain();
622         }
623 
624         @Override
625         public void request(long n) {
626             debug(this + " request " + n);
627             if (SubscriptionHelper.validate(n)) {
628                 BackpressureHelper.add(requested, n);
629                 while (true) {
630                     Requests<T> r = requests.get();
631                     Requests<T> r2;
632                     if (r.parent == null) {
633                         long d = r.deferred + n;
634                         if (d < 0) {
635                             d = Long.MAX_VALUE;
636                         }
637                         r2 = new Requests<T>(r.parent, r.unreconciled, d, r.child);
638                         if (requests.compareAndSet(r, r2)) {
639                             break;
640                         }
641                     } else {
642                         long x = n + r.deferred - r.unreconciled;
643                         long u = Math.max(0, -x);
644                         r2 = new Requests<T>(r.parent, u, 0, r.child);
645                         if (requests.compareAndSet(r, r2)) {
646                             if (x > 0) {
647                                 r.parent.request(x);
648                             }
649                             break;
650                         }
651                     }
652                 }
653                 drain();
654             }
655         }
656 
657         @Override
658         public void onNext(T t) {
659             debug(this + " arrived " + t);
660             if (done) {
661                 return;
662             }
663             queue.offer(t);
664             tester.onNext(t);
665             while (true) {
666                 Requests<T> r = requests.get();
667                 Requests<T> r2;
668                 if (r.child == null) {
669                     r2 = new Requests<T>(r.parent, r.unreconciled + 1, r.deferred, r.child);
670                     if (requests.compareAndSet(r, r2)) {
671                         // make minimal request to keep upstream producing
672                         r.parent.request(1);
673                         break;
674                     }
675                 } else {
676                     r2 = new Requests<T>(r.parent, r.unreconciled, 0, r.child);
677                     if (requests.compareAndSet(r, r2)) {
678                         break;
679                     }
680                 }
681             }
682             drain();
683         }
684 
685         @Override
686         public void onComplete() {
687             debug(this + " complete");
688             if (done) {
689                 return;
690             }
691             done = true;
692             cancelParent();
693             debug(this + " emits complete to tester");
694             tester.onComplete();
695             drain();
696         }
697 
698         @Override
699         public void onError(Throwable t) {
700             debug(this + " error " + t);
701             if (done) {
702                 RxJavaPlugins.onError(t);
703                 return;
704             }
705             error = t;
706             done = true;
707             tester.onError(t);
708             drain();
709         }
710 
711         private void drain() {
712             // this is a pretty standard drain loop
713             // default is to shortcut errors (don't delay them)
714             if (wip.getAndIncrement() == 0) {
715                 int missed = 1;
716                 while (true) {
717                     long r = requested.get();
718                     long e = 0;
719                     boolean d = done;
720                     while (e != r) {
721                         if (cancelled) {
722                             queue.clear();
723                             return;
724                         }
725                         Subscriber<? super T> child = requests.get().child;
726                         if (child == null) {
727                             break;
728                         }
729                         Throwable err = error;
730                         if (err != null) {
731                             queue.clear();
732                             error = null;
733                             cancel();
734                             chain.onError(child, err);
735                             return;
736                         }
737 
738                         T t = queue.poll();
739                         if (t == null) {
740                             if (d) {
741                                 cancel();
742                                 chain.onCompleted(child);
743                                 return;
744                             } else {
745                                 break;
746                             }
747                         } else {
748                             debug(this + " emitting " + t + " to " + requests.get().child + ":"
749                                     + requests.get().child.getClass().getSimpleName());
750                             chain.onNext(child, t);
751                             e++;
752                         }
753                         d = done;
754                     }
755                     if (d && queue.isEmpty() && terminate()) {
756                         return;
757                     }
758                     if (e != 0 && r != Long.MAX_VALUE) {
759                         r = requested.addAndGet(-e);
760                     }
761                     missed = wip.addAndGet(-missed);
762                     if (missed == 0) {
763                         return;
764                     }
765                 }
766             }
767         }
768 
769         private boolean terminate() {
770             Subscriber<? super T> child = requests.get().child;
771             if (child != null) {
772                 Throwable err = error;
773                 if (err != null) {
774                     queue.clear();
775                     error = null;
776                     cancel();
777                     chain.onError(child, err);
778                     return true;
779                 } else {
780                     cancel();
781                     chain.onCompleted(child);
782                     return true;
783                 }
784             }
785             return false;
786         }
787 
788         @Override
789         public void cancel() {
790             if (!cancelled) {
791                 cancelled = true;
792                 cancelParentTryToAddSubscriberToChain();
793             }
794         }
795 
796         private void cancelParentTryToAddSubscriberToChain() {
797             cancelParent();
798             chain.completeOrCancel(this);
799         }
800 
801         private void cancelParent() {
802             Subscription par = requests.get().parent;
803             if (par != null) {
804                 par.cancel();
805             }
806         }
807 
808     }
809 
810     private static final class MultiSubscription implements Subscription {
811 
812         private final Subscription primary;
813         private final Subscription secondary;
814 
815         MultiSubscription(Subscription primary, Subscription secondary) {
816             this.primary = primary;
817             this.secondary = secondary;
818         }
819 
820         @Override
821         public void request(long n) {
822             primary.request(n);
823         }
824 
825         @Override
826         public void cancel() {
827             primary.cancel();
828             secondary.cancel();
829         }
830 
831     }
832 
833     static void debug(String message) {
834         // System.out.println(message);
835     }
836 
837     static void log(String message) {
838         // System.out.println(message);
839     }
840 
841 }