View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.util.*;
4   import java.util.concurrent.atomic.*;
5   
6   import rx.*;
7   import rx.Observable;
8   import rx.Observable.*;
9   import rx.exceptions.*;
10  import rx.functions.Func1;
11  import rx.internal.operators.*;
12  import rx.internal.util.atomic.SpscAtomicArrayQueue;
13  import rx.internal.util.unsafe.*;
14  
15  /**
16   * Buffers values into a continuous, non-overlapping Lists where the boundary is determined
17   * by a predicate returning true.
18   *
19   * @param <T> the source and List element type
20   */
21  public final class OperatorBufferPredicateBoundary<T> implements Transformer<T, List<T>> {
22  
23      final Func1<? super T, Boolean> predicate;
24      
25      final int prefetch;
26      
27      final int capacityHint;
28      
29      final boolean after;
30      
31      public OperatorBufferPredicateBoundary(Func1<? super T, Boolean> predicate, int prefetch, int capacityHint, boolean after) {
32          if (predicate == null) {
33              throw new NullPointerException("predicate");
34          }
35          if (prefetch <= 0) {
36              throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
37          }
38          if (capacityHint <= 0) {
39              throw new IllegalArgumentException("capacityHint > 0 required but it was " + capacityHint);
40          }
41          this.predicate = predicate;
42          this.prefetch = prefetch;
43          this.capacityHint = capacityHint;
44          this.after = after;
45      }
46  
47      @Override
48      public Observable<List<T>> call(Observable<T> source) {
49          return source.lift(new Operator<List<T>, T>() {
50              @Override
51              public Subscriber<? super T> call(Subscriber<? super List<T>> child) {
52                  final BoundedSubscriber<T> parent = after 
53                          ? new BoundedAfterSubscriber<T>(child, capacityHint, predicate, prefetch)
54                          : new BoundedBeforeSubscriber<T>(child, capacityHint, predicate, prefetch);
55                          
56                  child.add(parent);
57                  child.setProducer(new Producer() {
58                      @Override
59                      public void request(long n) {
60                          parent.requestMore(n);
61                      }
62                  });
63                  
64                  return parent;
65              }
66          });
67      }
68      
69      static abstract class BoundedSubscriber<T> extends Subscriber<T> {
70          final Subscriber<? super List<T>> actual;
71          
72          final int capacityHint;
73          
74          final Func1<? super T, Boolean> predicate;
75          
76          final Queue<Object> queue;
77          
78          final AtomicLong requested;
79  
80          final AtomicInteger wip;
81          
82          final NotificationLite<T> nl;
83          
84          final int limit;
85  
86          List<T> buffer;
87          
88          long upstreamConsumed;
89          
90          volatile boolean done;
91          Throwable error;
92  
93          public BoundedSubscriber(Subscriber<? super List<T>> actual, int capacityHint,
94                  Func1<? super T, Boolean> predicate, int prefetch) {
95              this.actual = actual;
96              this.capacityHint = capacityHint;
97              this.predicate = predicate;
98              Queue<Object> q;
99              if (UnsafeAccess.isUnsafeAvailable()) {
100                 q = new SpscArrayQueue<Object>(prefetch);
101             } else {
102                 q = new SpscAtomicArrayQueue<Object>(prefetch);
103             }
104             queue = q;
105             buffer = new ArrayList<T>(capacityHint);
106             requested = new AtomicLong();
107             wip = new AtomicInteger();
108             nl = NotificationLite.instance();
109             limit = prefetch - (prefetch >> 2);
110             if (prefetch == Integer.MAX_VALUE) {
111                 request(Long.MAX_VALUE);
112             } else {
113                 request(prefetch);
114             }
115         }
116 
117         @Override
118         public void onNext(T t) {
119             if (!queue.offer(nl.next(t))) {
120                 unsubscribe();
121                 onError(new MissingBackpressureException());
122             } else {
123                 drain();
124             }
125         }
126         
127         @Override
128         public void onError(Throwable e) {
129             error = e;
130             done = true;
131             drain();
132         }
133         
134         @Override
135         public void onCompleted() {
136             done = true;
137             drain();
138         }
139         
140         void requestMore(long n) {
141             if (n > 0) {
142                 BackpressureUtils.getAndAddRequest(requested, n);
143                 drain();
144             } else
145             if (n < 0) {
146                 throw new IllegalArgumentException("n >= 0 required but it was " + n);
147             }
148         }
149 
150         abstract void drain();
151     }
152     
153     static final class BoundedAfterSubscriber<T> extends BoundedSubscriber<T> {
154 
155         public BoundedAfterSubscriber(Subscriber<? super List<T>> actual, int capacityHint,
156                 Func1<? super T, Boolean> predicate, int prefetch) {
157             super(actual, capacityHint, predicate, prefetch);
158         }
159         
160         @Override
161         void drain() {
162             if (wip.getAndIncrement() != 0) {
163                 return;
164             }
165             
166             final Subscriber<? super List<T>> localSubscriber = actual;
167             final Queue<Object> localQueue = queue;
168             int missed = 1;
169             
170             for (;;) {
171 
172                 long localRequested = requested.get();
173                 long localEmission = 0L;
174                 long localConsumption = 0L;
175                 List<T> localBuffer = buffer;
176                 
177                 while (localEmission != localRequested) {
178                     if (localSubscriber.isUnsubscribed()) {
179                         return;
180                     }
181                     
182                     boolean mainDone = done;
183                     
184                     if (mainDone) {
185                         Throwable exception = error;
186                         if (exception != null) {
187                             buffer = null;
188                             localSubscriber.onError(exception);
189                             return;
190                         }
191                     }
192                     
193                     Object notification = localQueue.poll();
194                     boolean empty = notification == null;
195                     
196                     if (mainDone && empty) {
197                         buffer = null;
198                         if (!localBuffer.isEmpty()) {
199                             localSubscriber.onNext(localBuffer);
200                         }
201                         localSubscriber.onCompleted();
202                         return;
203                     }
204                     if (empty) {
205                         break;
206                     }
207                     
208                     T value = nl.getValue(notification);
209                     
210                     localBuffer.add(value);
211                     localConsumption++;
212                     
213                     boolean emit;
214                     
215                     try {
216                         emit = predicate.call(value);
217                     } catch (Throwable ex) {
218                         unsubscribe();
219                         buffer = null;
220                         Exceptions.throwOrReport(ex, localSubscriber, value);
221                         return;
222                     }
223                     
224                     if (emit) {
225                         localSubscriber.onNext(localBuffer);
226                         localBuffer = new ArrayList<T>(capacityHint);
227                         buffer = localBuffer;
228                         
229                         localEmission++;
230                     }
231                 }
232                 
233                 if (localEmission == localRequested) {
234                     if (localSubscriber.isUnsubscribed()) {
235                         return;
236                     }
237                     
238                     boolean mainDone = done;
239 
240                     if (mainDone) {
241                         Throwable exception = error;
242                         if (exception != null) {
243                             buffer = null;
244                             localSubscriber.onError(exception);
245                             return;
246                         } else
247                         if (localQueue.isEmpty() && localBuffer.isEmpty()) {
248                             buffer = null;
249                             localSubscriber.onCompleted();
250                             return;
251                         }
252                     }
253                 }
254                 
255                 if (localEmission != 0L) {
256                     BackpressureUtils.produced(requested, localEmission);
257                 }
258                 if (localConsumption != 0L) {
259                     long p = upstreamConsumed + localConsumption;
260                     if (p >= limit) {
261                         upstreamConsumed = 0L;
262                         request(p);
263                     } else {
264                         upstreamConsumed = p;
265                     }
266                 }
267                 
268                 missed = wip.addAndGet(-missed);
269                 if (missed == 0) {
270                     break;
271                 }
272             }
273         }
274     }
275 
276     static final class BoundedBeforeSubscriber<T> extends BoundedSubscriber<T> {
277         public BoundedBeforeSubscriber(Subscriber<? super List<T>> actual, int capacityHint,
278                 Func1<? super T, Boolean> predicate, int prefetch) {
279             super(actual, capacityHint, predicate, prefetch);
280         }
281 
282         @Override
283         void drain() {
284             if (wip.getAndIncrement() != 0) {
285                 return;
286             }
287             
288             final Subscriber<? super List<T>> localSubscriber = actual;
289             final Queue<Object> localQueue = queue;
290             int missed = 1;
291             
292             for (;;) {
293 
294                 long localRequested = requested.get();
295                 long localEmission = 0L;
296                 long localConsumption = 0L;
297                 List<T> localBuffer = buffer;
298                 
299                 while (localEmission != localRequested) {
300                     if (localSubscriber.isUnsubscribed()) {
301                         return;
302                     }
303                     
304                     boolean mainDone = done;
305                     
306                     if (mainDone) {
307                         Throwable exception = error;
308                         if (exception != null) {
309                             buffer = null;
310                             localSubscriber.onError(exception);
311                             return;
312                         }
313                     }
314                     
315                     Object o = localQueue.poll();
316                     boolean empty = o == null;
317                     
318                     if (mainDone && empty) {
319                         buffer = null;
320                         if (!localBuffer.isEmpty()) {
321                             localSubscriber.onNext(localBuffer);
322                         }
323                         localSubscriber.onCompleted();
324                         return;
325                     }
326                     if (empty) {
327                         break;
328                     }
329                     
330                     T value = nl.getValue(o);
331                     
332                     boolean emit;
333                     
334                     try {
335                         emit = predicate.call(value);
336                     } catch (Throwable ex) {
337                         unsubscribe();
338                         buffer = null;
339                         Exceptions.throwOrReport(ex, localSubscriber, value);
340                         return;
341                     }
342                     
343                     if (emit && !localBuffer.isEmpty()) {
344                         localSubscriber.onNext(localBuffer);
345                         localBuffer = new ArrayList<T>(capacityHint);
346                         buffer = localBuffer;
347                         
348                         localEmission++;
349                     }
350                     
351                     localBuffer.add(value);
352                     
353                     localConsumption++;
354                 }
355                 
356                 if (localEmission == localRequested) {
357                     if (localSubscriber.isUnsubscribed()) {
358                         return;
359                     }
360                     
361                     boolean mainDone = done;
362 
363                     if (mainDone) {
364                         Throwable exception = error;
365                         if (exception != null) {
366                             buffer = null;
367                             localSubscriber.onError(exception);
368                             return;
369                         } else
370                         if (localQueue.isEmpty() && localBuffer.isEmpty()) {
371                             buffer = null;
372                             localSubscriber.onCompleted();
373                             return;
374                         }
375                     }
376                 }
377                 
378                 if (localEmission != 0L) {
379                     BackpressureUtils.produced(requested, localEmission);
380                 }
381                 
382                 if (localConsumption != 0L) {
383                     long produced = upstreamConsumed + localConsumption;
384                     if (produced >= limit) {
385                         upstreamConsumed = 0L;
386                         request(produced);
387                     } else {
388                         upstreamConsumed = produced;
389                     }
390                 }
391                 
392                 missed = wip.addAndGet(-missed);
393                 if (missed == 0) {
394                     break;
395                 }
396             }
397         }
398     }
399 }