1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.util.concurrent.Callable;
4 import java.util.concurrent.atomic.AtomicInteger;
5 import java.util.concurrent.atomic.AtomicLong;
6
7 import org.reactivestreams.Subscriber;
8 import org.reactivestreams.Subscription;
9
10 import io.reactivex.Flowable;
11 import io.reactivex.FlowableSubscriber;
12 import io.reactivex.exceptions.Exceptions;
13 import io.reactivex.functions.BiFunction;
14 import io.reactivex.functions.BiPredicate;
15 import io.reactivex.internal.fuseable.ConditionalSubscriber;
16 import io.reactivex.internal.fuseable.SimplePlainQueue;
17 import io.reactivex.internal.queue.SpscLinkedArrayQueue;
18 import io.reactivex.internal.subscriptions.SubscriptionHelper;
19 import io.reactivex.internal.util.BackpressureHelper;
20 import io.reactivex.plugins.RxJavaPlugins;
21
22 public final class FlowableCollectWhile<T, R> extends Flowable<R> {
23
24 private final Flowable<T> source;
25 private final Callable<R> collectionFactory;
26 private final BiFunction<? super R, ? super T, ? extends R> add;
27 private final BiPredicate<? super R, ? super T> condition;
28 private final boolean emitRemainder;
29
30 public FlowableCollectWhile(Flowable<T> source, Callable<R> collectionFactory,
31 BiFunction<? super R, ? super T, ? extends R> add,
32 BiPredicate<? super R, ? super T> condition, boolean emitRemainder) {
33 super();
34 this.source = source;
35 this.collectionFactory = collectionFactory;
36 this.add = add;
37 this.condition = condition;
38 this.emitRemainder = emitRemainder;
39 }
40
41 @Override
42 protected void subscribeActual(Subscriber<? super R> child) {
43 CollectWhileSubscriber<T, R> subscriber = new CollectWhileSubscriber<T, R>(
44 collectionFactory, add, condition, child, emitRemainder);
45 source.subscribe(subscriber);
46 }
47
48 @SuppressWarnings("serial")
49 private static final class CollectWhileSubscriber<T, R> extends AtomicInteger
50 implements FlowableSubscriber<T>, Subscription, ConditionalSubscriber<T> {
51
52 private final Callable<R> collectionFactory;
53 private final BiFunction<? super R, ? super T, ? extends R> add;
54 private final BiPredicate<? super R, ? super T> condition;
55 private final Subscriber<? super R> child;
56 private final boolean emitRemainder;
57 private final AtomicLong requested = new AtomicLong();
58 private final SimplePlainQueue<R> queue = new SpscLinkedArrayQueue<R>(16);
59
60 private Subscription parent;
61 private R collection;
62 private volatile boolean done;
63 private Throwable error;
64
65
66 private volatile boolean cancelled;
67
68 CollectWhileSubscriber(Callable<R> collectionFactory,
69 BiFunction<? super R, ? super T, ? extends R> add,
70 BiPredicate<? super R, ? super T> condition, Subscriber<? super R> child,
71 boolean emitRemainder) {
72 this.collectionFactory = collectionFactory;
73 this.add = add;
74 this.condition = condition;
75 this.child = child;
76 this.emitRemainder = emitRemainder;
77 }
78
79 @Override
80 public void onSubscribe(Subscription parent) {
81 if (SubscriptionHelper.validate(this.parent, parent)) {
82 this.parent = parent;
83 child.onSubscribe(this);
84 }
85 }
86
87 @Override
88 public void onNext(T t) {
89
90 if (!tryOnNext(t)) {
91 parent.request(1);
92 }
93 }
94
95 @Override
96 public boolean tryOnNext(T t) {
97 if (done) {
98 return true;
99 }
100 if (collection == null && !collectionCreated()) {
101 return true;
102 }
103 boolean collect;
104 try {
105 collect = condition.test(collection, t);
106 } catch (Throwable e) {
107 Exceptions.throwIfFatal(e);
108 onError(e);
109 return true;
110 }
111 if (!collect) {
112 queue.offer(collection);
113 if (!collectionCreated()) {
114 return true;
115 }
116 }
117 try {
118 collection = add.apply(collection, t);
119 if (collection == null) {
120 throw new NullPointerException("add function should not return null");
121 }
122 } catch (Throwable e) {
123 Exceptions.throwIfFatal(e);
124 onError(e);
125 return true;
126 }
127 drain();
128 return !collect;
129 }
130
131 public boolean collectionCreated() {
132 try {
133 collection = collectionFactory.call();
134 if (collection == null) {
135 throw new NullPointerException("collectionFactory should not return null");
136 }
137 return true;
138 } catch (Throwable e) {
139 Exceptions.throwIfFatal(e);
140 onError(e);
141 return false;
142 }
143 }
144
145 @Override
146 public void onError(Throwable e) {
147 if (done) {
148 RxJavaPlugins.onError(e);
149 return;
150 }
151
152 collection = null;
153
154
155 error = e;
156 done = true;
157 drain();
158 }
159
160 @Override
161 public void onComplete() {
162 if (done) {
163 return;
164 }
165 R col = collection;
166 if (col != null) {
167 collection = null;
168
169
170 if (emitRemainder) {
171 queue.offer(col);
172 }
173 }
174 done = true;
175 drain();
176 }
177
178 private void drain() {
179 if (getAndIncrement() == 0) {
180 int missed = 1;
181 while (true) {
182 long r = requested.get();
183 long e = 0;
184 while (e != r) {
185 if (cancelled) {
186 queue.clear();
187 return;
188 }
189
190 boolean d = done;
191 R c = queue.poll();
192 if (c == null) {
193 if (d) {
194
195
196 Throwable err = error;
197 if (err != null) {
198 error = null;
199 child.onError(err);
200 } else {
201 child.onComplete();
202 }
203 return;
204 } else {
205
206 break;
207 }
208 } else {
209 child.onNext(c);
210 e++;
211 }
212 }
213 if (e != 0L && r != Long.MAX_VALUE) {
214 requested.addAndGet(-e);
215 }
216 missed = addAndGet(-missed);
217 if (missed == 0) {
218 return;
219 }
220 }
221 }
222
223 }
224
225 @Override
226 public void request(long n) {
227 if (SubscriptionHelper.validate(n)) {
228 BackpressureHelper.add(requested, n);
229 parent.request(n);
230 drain();
231 }
232 }
233
234 @Override
235 public void cancel() {
236 cancelled = true;
237 parent.cancel();
238 }
239
240 }
241 }