1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.util.*;
4 import java.util.concurrent.atomic.*;
5
6 import rx.*;
7 import rx.Observable;
8 import rx.Observable.*;
9 import rx.exceptions.*;
10 import rx.functions.Func1;
11 import rx.internal.operators.*;
12 import rx.internal.util.atomic.SpscAtomicArrayQueue;
13 import rx.internal.util.unsafe.*;
14
15
16
17
18
19
20
21 public final class OperatorBufferPredicateBoundary<T> implements Transformer<T, List<T>> {
22
23 final Func1<? super T, Boolean> predicate;
24
25 final int prefetch;
26
27 final int capacityHint;
28
29 final boolean after;
30
31 public OperatorBufferPredicateBoundary(Func1<? super T, Boolean> predicate, int prefetch, int capacityHint, boolean after) {
32 if (predicate == null) {
33 throw new NullPointerException("predicate");
34 }
35 if (prefetch <= 0) {
36 throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
37 }
38 if (capacityHint <= 0) {
39 throw new IllegalArgumentException("capacityHint > 0 required but it was " + capacityHint);
40 }
41 this.predicate = predicate;
42 this.prefetch = prefetch;
43 this.capacityHint = capacityHint;
44 this.after = after;
45 }
46
47 @Override
48 public Observable<List<T>> call(Observable<T> source) {
49 return source.lift(new Operator<List<T>, T>() {
50 @Override
51 public Subscriber<? super T> call(Subscriber<? super List<T>> child) {
52 final BoundedSubscriber<T> parent = after
53 ? new BoundedAfterSubscriber<T>(child, capacityHint, predicate, prefetch)
54 : new BoundedBeforeSubscriber<T>(child, capacityHint, predicate, prefetch);
55
56 child.add(parent);
57 child.setProducer(new Producer() {
58 @Override
59 public void request(long n) {
60 parent.requestMore(n);
61 }
62 });
63
64 return parent;
65 }
66 });
67 }
68
69 static abstract class BoundedSubscriber<T> extends Subscriber<T> {
70 final Subscriber<? super List<T>> actual;
71
72 final int capacityHint;
73
74 final Func1<? super T, Boolean> predicate;
75
76 final Queue<Object> queue;
77
78 final AtomicLong requested;
79
80 final AtomicInteger wip;
81
82 final NotificationLite<T> nl;
83
84 final int limit;
85
86 List<T> buffer;
87
88 long upstreamConsumed;
89
90 volatile boolean done;
91 Throwable error;
92
93 public BoundedSubscriber(Subscriber<? super List<T>> actual, int capacityHint,
94 Func1<? super T, Boolean> predicate, int prefetch) {
95 this.actual = actual;
96 this.capacityHint = capacityHint;
97 this.predicate = predicate;
98 Queue<Object> q;
99 if (UnsafeAccess.isUnsafeAvailable()) {
100 q = new SpscArrayQueue<Object>(prefetch);
101 } else {
102 q = new SpscAtomicArrayQueue<Object>(prefetch);
103 }
104 queue = q;
105 buffer = new ArrayList<T>(capacityHint);
106 requested = new AtomicLong();
107 wip = new AtomicInteger();
108 nl = NotificationLite.instance();
109 limit = prefetch - (prefetch >> 2);
110 if (prefetch == Integer.MAX_VALUE) {
111 request(Long.MAX_VALUE);
112 } else {
113 request(prefetch);
114 }
115 }
116
117 @Override
118 public void onNext(T t) {
119 if (!queue.offer(nl.next(t))) {
120 unsubscribe();
121 onError(new MissingBackpressureException());
122 } else {
123 drain();
124 }
125 }
126
127 @Override
128 public void onError(Throwable e) {
129 error = e;
130 done = true;
131 drain();
132 }
133
134 @Override
135 public void onCompleted() {
136 done = true;
137 drain();
138 }
139
140 void requestMore(long n) {
141 if (n > 0) {
142 BackpressureUtils.getAndAddRequest(requested, n);
143 drain();
144 } else
145 if (n < 0) {
146 throw new IllegalArgumentException("n >= 0 required but it was " + n);
147 }
148 }
149
150 abstract void drain();
151 }
152
153 static final class BoundedAfterSubscriber<T> extends BoundedSubscriber<T> {
154
155 public BoundedAfterSubscriber(Subscriber<? super List<T>> actual, int capacityHint,
156 Func1<? super T, Boolean> predicate, int prefetch) {
157 super(actual, capacityHint, predicate, prefetch);
158 }
159
160 @Override
161 void drain() {
162 if (wip.getAndIncrement() != 0) {
163 return;
164 }
165
166 final Subscriber<? super List<T>> localSubscriber = actual;
167 final Queue<Object> localQueue = queue;
168 int missed = 1;
169
170 for (;;) {
171
172 long localRequested = requested.get();
173 long localEmission = 0L;
174 long localConsumption = 0L;
175 List<T> localBuffer = buffer;
176
177 while (localEmission != localRequested) {
178 if (localSubscriber.isUnsubscribed()) {
179 return;
180 }
181
182 boolean mainDone = done;
183
184 if (mainDone) {
185 Throwable exception = error;
186 if (exception != null) {
187 buffer = null;
188 localSubscriber.onError(exception);
189 return;
190 }
191 }
192
193 Object notification = localQueue.poll();
194 boolean empty = notification == null;
195
196 if (mainDone && empty) {
197 buffer = null;
198 if (!localBuffer.isEmpty()) {
199 localSubscriber.onNext(localBuffer);
200 }
201 localSubscriber.onCompleted();
202 return;
203 }
204 if (empty) {
205 break;
206 }
207
208 T value = nl.getValue(notification);
209
210 localBuffer.add(value);
211 localConsumption++;
212
213 boolean emit;
214
215 try {
216 emit = predicate.call(value);
217 } catch (Throwable ex) {
218 unsubscribe();
219 buffer = null;
220 Exceptions.throwOrReport(ex, localSubscriber, value);
221 return;
222 }
223
224 if (emit) {
225 localSubscriber.onNext(localBuffer);
226 localBuffer = new ArrayList<T>(capacityHint);
227 buffer = localBuffer;
228
229 localEmission++;
230 }
231 }
232
233 if (localEmission == localRequested) {
234 if (localSubscriber.isUnsubscribed()) {
235 return;
236 }
237
238 boolean mainDone = done;
239
240 if (mainDone) {
241 Throwable exception = error;
242 if (exception != null) {
243 buffer = null;
244 localSubscriber.onError(exception);
245 return;
246 } else
247 if (localQueue.isEmpty() && localBuffer.isEmpty()) {
248 buffer = null;
249 localSubscriber.onCompleted();
250 return;
251 }
252 }
253 }
254
255 if (localEmission != 0L) {
256 BackpressureUtils.produced(requested, localEmission);
257 }
258 if (localConsumption != 0L) {
259 long p = upstreamConsumed + localConsumption;
260 if (p >= limit) {
261 upstreamConsumed = 0L;
262 request(p);
263 } else {
264 upstreamConsumed = p;
265 }
266 }
267
268 missed = wip.addAndGet(-missed);
269 if (missed == 0) {
270 break;
271 }
272 }
273 }
274 }
275
276 static final class BoundedBeforeSubscriber<T> extends BoundedSubscriber<T> {
277 public BoundedBeforeSubscriber(Subscriber<? super List<T>> actual, int capacityHint,
278 Func1<? super T, Boolean> predicate, int prefetch) {
279 super(actual, capacityHint, predicate, prefetch);
280 }
281
282 @Override
283 void drain() {
284 if (wip.getAndIncrement() != 0) {
285 return;
286 }
287
288 final Subscriber<? super List<T>> localSubscriber = actual;
289 final Queue<Object> localQueue = queue;
290 int missed = 1;
291
292 for (;;) {
293
294 long localRequested = requested.get();
295 long localEmission = 0L;
296 long localConsumption = 0L;
297 List<T> localBuffer = buffer;
298
299 while (localEmission != localRequested) {
300 if (localSubscriber.isUnsubscribed()) {
301 return;
302 }
303
304 boolean mainDone = done;
305
306 if (mainDone) {
307 Throwable exception = error;
308 if (exception != null) {
309 buffer = null;
310 localSubscriber.onError(exception);
311 return;
312 }
313 }
314
315 Object o = localQueue.poll();
316 boolean empty = o == null;
317
318 if (mainDone && empty) {
319 buffer = null;
320 if (!localBuffer.isEmpty()) {
321 localSubscriber.onNext(localBuffer);
322 }
323 localSubscriber.onCompleted();
324 return;
325 }
326 if (empty) {
327 break;
328 }
329
330 T value = nl.getValue(o);
331
332 boolean emit;
333
334 try {
335 emit = predicate.call(value);
336 } catch (Throwable ex) {
337 unsubscribe();
338 buffer = null;
339 Exceptions.throwOrReport(ex, localSubscriber, value);
340 return;
341 }
342
343 if (emit && !localBuffer.isEmpty()) {
344 localSubscriber.onNext(localBuffer);
345 localBuffer = new ArrayList<T>(capacityHint);
346 buffer = localBuffer;
347
348 localEmission++;
349 }
350
351 localBuffer.add(value);
352
353 localConsumption++;
354 }
355
356 if (localEmission == localRequested) {
357 if (localSubscriber.isUnsubscribed()) {
358 return;
359 }
360
361 boolean mainDone = done;
362
363 if (mainDone) {
364 Throwable exception = error;
365 if (exception != null) {
366 buffer = null;
367 localSubscriber.onError(exception);
368 return;
369 } else
370 if (localQueue.isEmpty() && localBuffer.isEmpty()) {
371 buffer = null;
372 localSubscriber.onCompleted();
373 return;
374 }
375 }
376 }
377
378 if (localEmission != 0L) {
379 BackpressureUtils.produced(requested, localEmission);
380 }
381
382 if (localConsumption != 0L) {
383 long produced = upstreamConsumed + localConsumption;
384 if (produced >= limit) {
385 upstreamConsumed = 0L;
386 request(produced);
387 } else {
388 upstreamConsumed = produced;
389 }
390 }
391
392 missed = wip.addAndGet(-missed);
393 if (missed == 0) {
394 break;
395 }
396 }
397 }
398 }
399 }