View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.util.concurrent.Callable;
4   import java.util.concurrent.atomic.AtomicInteger;
5   import java.util.concurrent.atomic.AtomicLong;
6   
7   import org.reactivestreams.Subscriber;
8   import org.reactivestreams.Subscription;
9   
10  import io.reactivex.Flowable;
11  import io.reactivex.FlowableSubscriber;
12  import io.reactivex.exceptions.Exceptions;
13  import io.reactivex.functions.BiFunction;
14  import io.reactivex.functions.BiPredicate;
15  import io.reactivex.internal.fuseable.ConditionalSubscriber;
16  import io.reactivex.internal.fuseable.SimplePlainQueue;
17  import io.reactivex.internal.queue.SpscLinkedArrayQueue;
18  import io.reactivex.internal.subscriptions.SubscriptionHelper;
19  import io.reactivex.internal.util.BackpressureHelper;
20  import io.reactivex.plugins.RxJavaPlugins;
21  
22  public final class FlowableCollectWhile<T, R> extends Flowable<R> {
23  
24      private final Flowable<T> source;
25      private final Callable<R> collectionFactory;
26      private final BiFunction<? super R, ? super T, ? extends R> add;
27      private final BiPredicate<? super R, ? super T> condition;
28      private final boolean emitRemainder;
29  
30      public FlowableCollectWhile(Flowable<T> source, Callable<R> collectionFactory,
31              BiFunction<? super R, ? super T, ? extends R> add,
32              BiPredicate<? super R, ? super T> condition, boolean emitRemainder) {
33          super();
34          this.source = source;
35          this.collectionFactory = collectionFactory;
36          this.add = add;
37          this.condition = condition;
38          this.emitRemainder = emitRemainder;
39      }
40  
41      @Override
42      protected void subscribeActual(Subscriber<? super R> child) {
43          CollectWhileSubscriber<T, R> subscriber = new CollectWhileSubscriber<T, R>(
44                  collectionFactory, add, condition, child, emitRemainder);
45          source.subscribe(subscriber);
46      }
47  
48      @SuppressWarnings("serial")
49      private static final class CollectWhileSubscriber<T, R> extends AtomicInteger
50              implements FlowableSubscriber<T>, Subscription, ConditionalSubscriber<T> {
51  
52          private final Callable<R> collectionFactory;
53          private final BiFunction<? super R, ? super T, ? extends R> add;
54          private final BiPredicate<? super R, ? super T> condition;
55          private final Subscriber<? super R> child;
56          private final boolean emitRemainder;
57          private final AtomicLong requested = new AtomicLong();
58          private final SimplePlainQueue<R> queue = new SpscLinkedArrayQueue<R>(16);
59  
60          private Subscription parent;
61          private R collection;
62          private volatile boolean done;
63          private Throwable error; // does not need to be volatile because is set
64                                   // before `done` and read after `done`
65  
66          private volatile boolean cancelled;
67  
68          CollectWhileSubscriber(Callable<R> collectionFactory,
69                  BiFunction<? super R, ? super T, ? extends R> add,
70                  BiPredicate<? super R, ? super T> condition, Subscriber<? super R> child,
71                  boolean emitRemainder) {
72              this.collectionFactory = collectionFactory;
73              this.add = add;
74              this.condition = condition;
75              this.child = child;
76              this.emitRemainder = emitRemainder;
77          }
78  
79          @Override
80          public void onSubscribe(Subscription parent) {
81              if (SubscriptionHelper.validate(this.parent, parent)) {
82                  this.parent = parent;
83                  child.onSubscribe(this);
84              }
85          }
86  
87          @Override
88          public void onNext(T t) {
89              // this path taken by upstream if not enabled to call `tryOnNext`
90              if (!tryOnNext(t)) {
91                  parent.request(1);
92              }
93          }
94  
95          @Override
96          public boolean tryOnNext(T t) {
97              if (done) {
98                  return true;
99              }
100             if (collection == null && !collectionCreated()) {
101                 return true;
102             }
103             boolean collect;
104             try {
105                 collect = condition.test(collection, t);
106             } catch (Throwable e) {
107                 Exceptions.throwIfFatal(e);
108                 onError(e);
109                 return true;
110             }
111             if (!collect) {
112                 queue.offer(collection);
113                 if (!collectionCreated()) {
114                     return true;
115                 }
116             }
117             try {
118                 collection = add.apply(collection, t);
119                 if (collection == null) {
120                     throw new NullPointerException("add function should not return null");
121                 }
122             } catch (Throwable e) {
123                 Exceptions.throwIfFatal(e);
124                 onError(e);
125                 return true;
126             }
127             drain();
128             return !collect;
129         }
130 
131         public boolean collectionCreated() {
132             try {
133                 collection = collectionFactory.call();
134                 if (collection == null) {
135                     throw new NullPointerException("collectionFactory should not return null");
136                 }
137                 return true;
138             } catch (Throwable e) {
139                 Exceptions.throwIfFatal(e);
140                 onError(e);
141                 return false;
142             }
143         }
144 
145         @Override
146         public void onError(Throwable e) {
147             if (done) {
148                 RxJavaPlugins.onError(e);
149                 return;
150             }
151             //release for GC
152             collection = null;
153             // must set `error` before done because `error` is not volatile and
154             // `done` is
155             error = e;
156             done = true;
157             drain();
158         }
159 
160         @Override
161         public void onComplete() {
162             if (done) {
163                 return;
164             }
165             R col = collection;
166             if (col != null) {
167                 collection = null;
168                 // ensure that the remainder is emitted
169                 // if configured to
170                 if (emitRemainder) {
171                     queue.offer(col);
172                 }
173             }
174             done = true;
175             drain();
176         }
177 
178         private void drain() {
179             if (getAndIncrement() == 0) {
180                 int missed = 1;
181                 while (true) {
182                     long r = requested.get();
183                     long e = 0;
184                     while (e != r) {
185                         if (cancelled) {
186                             queue.clear();
187                             return;
188                         }
189                         //must read `done` before polling queue
190                         boolean d = done;
191                         R c = queue.poll();
192                         if (c == null) {
193                             if (d) {
194                                 // `error` must be read AFTER `done` for
195                                 // full visibility
196                                 Throwable err = error;
197                                 if (err != null) {
198                                     error = null;
199                                     child.onError(err);
200                                 } else {
201                                     child.onComplete();
202                                 }
203                                 return;
204                             } else {
205                                 // nothing to emit and not done
206                                 break;
207                             }
208                         } else {
209                             child.onNext(c);
210                             e++;
211                         }
212                     }
213                     if (e != 0L && r != Long.MAX_VALUE) {
214                         requested.addAndGet(-e);
215                     }
216                     missed = addAndGet(-missed);
217                     if (missed == 0) {
218                         return;
219                     }
220                 }
221             }
222 
223         }
224 
225         @Override
226         public void request(long n) {
227             if (SubscriptionHelper.validate(n)) {
228                 BackpressureHelper.add(requested, n);
229                 parent.request(n);
230                 drain();
231             }
232         }
233 
234         @Override
235         public void cancel() {
236             cancelled = true;
237             parent.cancel();
238         }
239 
240     }
241 }