View Javadoc
1   package com.github.davidmoten.rx2.flowable;
2   
3   import java.util.ArrayList;
4   import java.util.Comparator;
5   import java.util.List;
6   import java.util.concurrent.Callable;
7   import java.util.concurrent.TimeUnit;
8   
9   import org.reactivestreams.Publisher;
10  
11  import com.github.davidmoten.guavamini.Preconditions;
12  import com.github.davidmoten.rx2.BiFunctions;
13  import com.github.davidmoten.rx2.Flowables;
14  import com.github.davidmoten.rx2.Functions;
15  import com.github.davidmoten.rx2.StateMachine;
16  import com.github.davidmoten.rx2.StateMachine2;
17  import com.github.davidmoten.rx2.Statistics;
18  import com.github.davidmoten.rx2.buffertofile.Options;
19  import com.github.davidmoten.rx2.internal.flowable.FlowableCollectWhile;
20  import com.github.davidmoten.rx2.internal.flowable.FlowableDoOnEmpty;
21  import com.github.davidmoten.rx2.internal.flowable.FlowableInsertMaybe;
22  import com.github.davidmoten.rx2.internal.flowable.FlowableInsertTimeout;
23  import com.github.davidmoten.rx2.internal.flowable.FlowableMapLast;
24  import com.github.davidmoten.rx2.internal.flowable.FlowableMatch;
25  import com.github.davidmoten.rx2.internal.flowable.FlowableMaxRequest;
26  import com.github.davidmoten.rx2.internal.flowable.FlowableMinRequest;
27  import com.github.davidmoten.rx2.internal.flowable.FlowableRepeatingTransform;
28  import com.github.davidmoten.rx2.internal.flowable.FlowableReverse;
29  import com.github.davidmoten.rx2.internal.flowable.FlowableWindowMinMax;
30  import com.github.davidmoten.rx2.internal.flowable.FlowableWindowMinMax.Metric;
31  import com.github.davidmoten.rx2.internal.flowable.TransformerStateMachine;
32  import com.github.davidmoten.rx2.util.Pair;
33  
34  import io.reactivex.BackpressureStrategy;
35  import io.reactivex.Flowable;
36  import io.reactivex.FlowableEmitter;
37  import io.reactivex.FlowableTransformer;
38  import io.reactivex.Maybe;
39  import io.reactivex.Notification;
40  import io.reactivex.Observable;
41  import io.reactivex.Scheduler;
42  import io.reactivex.functions.Action;
43  import io.reactivex.functions.BiFunction;
44  import io.reactivex.functions.BiPredicate;
45  import io.reactivex.functions.Function;
46  import io.reactivex.functions.Function3;
47  import io.reactivex.functions.Predicate;
48  import io.reactivex.schedulers.Schedulers;
49  
50  public final class Transformers {
51  
52      private Transformers() {
53          // prevent instantiation
54      }
55  
56      public static <State, In, Out> FlowableTransformer<In, Out> stateMachine(Callable<? extends State> initialState,
57              Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
58              BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
59              BackpressureStrategy backpressureStrategy, int requestBatchSize) {
60          return TransformerStateMachine.create(initialState, transition, completion, backpressureStrategy,
61                  requestBatchSize);
62      }
63  
64      public static StateMachine.Builder stateMachine() {
65          return StateMachine.builder();
66      }
67  
68      public static StateMachine2.Builder stateMachine2() {
69          return StateMachine2.builder();
70      }
71  
72      /**
73       * Returns a transformer that when a stream is empty runs the given
74       * {@link Action}.
75       * 
76       * <p>
77       * <img src=
78       * "https://raw.githubusercontent.com/davidmoten/rxjava2-extras/master/src/docs/doOnEmpty.png"
79       * alt="image">
80       * 
81       * @param action
82       *            to be called when the stream is determined to be empty.
83       * @param <T>
84       *            item type
85       * 
86       * @return a transformer that when a stream is empty runs the given action.
87       */
88      public static <T> FlowableTransformer<T, T> doOnEmpty(final Action action) {
89          return new FlowableTransformer<T, T>() {
90  
91              @Override
92              public Publisher<T> apply(Flowable<T> upstream) {
93                  return new FlowableDoOnEmpty<T>(upstream, action);
94              }
95          };
96      }
97  
98      @SuppressWarnings("unchecked")
99      public static <T> FlowableTransformer<T, T> reverse() {
100         return (FlowableTransformer<T, T>) ReverseHolder.INSTANCE;
101     }
102 
103     private static final class ReverseHolder {
104         static final FlowableTransformer<Object, Object> INSTANCE = new FlowableTransformer<Object, Object>() {
105 
106             @Override
107             public Publisher<Object> apply(Flowable<Object> upstream) {
108                 return FlowableReverse.reverse(upstream);
109             }
110 
111         };
112 
113     }
114 
115     public static <T> FlowableTransformer<T, T> mapLast(final Function<? super T, ? extends T> function) {
116         return new FlowableTransformer<T, T>() {
117 
118             @Override
119             public Publisher<T> apply(Flowable<T> upstream) {
120                 return new FlowableMapLast<T>(upstream, function);
121             }
122 
123         };
124 
125     }
126 
127     public static <A, B, K, C> Flowable<C> match(Flowable<A> a, Flowable<B> b, Function<? super A, K> aKey,
128             Function<? super B, K> bKey, BiFunction<? super A, ? super B, C> combiner, int requestSize) {
129         return new FlowableMatch<A, B, K, C>(a, b, aKey, bKey, combiner, requestSize);
130     }
131 
132     public static <A, B, C, K> FlowableTransformer<A, C> matchWith(final Flowable<B> b,
133             final Function<? super A, K> aKey, final Function<? super B, K> bKey,
134             final BiFunction<? super A, ? super B, C> combiner, int requestSize) {
135         return new FlowableTransformer<A, C>() {
136 
137             @Override
138             public Publisher<C> apply(Flowable<A> upstream) {
139                 return Flowables.match(upstream, b, aKey, bKey, combiner);
140             }
141         };
142     }
143 
144     public static <A, B, C, K> FlowableTransformer<A, C> matchWith(final Flowable<B> b,
145             final Function<? super A, K> aKey, final Function<? super B, K> bKey,
146             final BiFunction<? super A, ? super B, C> combiner) {
147         return matchWith(b, aKey, bKey, combiner, 128);
148     }
149 
150     public static Options.BuilderFlowable onBackpressureBufferToFile() {
151         return Options.builderFlowable();
152     }
153 
154     /**
155      * <p>
156      * Converts a stream of {@code Number} to a stream of {@link Statistics} about
157      * those numbers.
158      * 
159      * <p>
160      * <img src=
161      * "https://raw.githubusercontent.com/davidmoten/rxjava2-extras/master/src/docs/collectStats.png"
162      * alt="image">
163      * 
164      * @param <T>
165      *            item type
166      * @return transformer that converts a stream of Number to a stream of
167      *         Statistics
168      */
169     @SuppressWarnings("unchecked")
170     public static <T extends Number> FlowableTransformer<T, Statistics> collectStats() {
171         return (FlowableTransformer<T, Statistics>) CollectStatsHolder.INSTANCE;
172     }
173 
174     private static final class CollectStatsHolder {
175         static final FlowableTransformer<Number, Statistics> INSTANCE = new FlowableTransformer<Number, Statistics>() {
176 
177             @Override
178             public Flowable<Statistics> apply(Flowable<Number> source) {
179                 return source.scan(Statistics.create(), BiFunctions.collectStats());
180             }
181         };
182     }
183 
184     public static <T, R extends Number> FlowableTransformer<T, Pair<T, Statistics>> collectStats(
185             final Function<? super T, ? extends R> function) {
186         return new FlowableTransformer<T, Pair<T, Statistics>>() {
187 
188             @Override
189             public Flowable<Pair<T, Statistics>> apply(Flowable<T> source) {
190                 return source.scan(Pair.create((T) null, Statistics.create()),
191                         new BiFunction<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
192                             @Override
193                             public Pair<T, Statistics> apply(Pair<T, Statistics> pair, T t) throws Exception {
194                                 return Pair.create(t, pair.b().add(function.apply(t)));
195                             }
196                         }).skip(1);
197             }
198         };
199     }
200 
201     /**
202      * Returns a transformer that emits collections of items with the collection
203      * boundaries determined by the given {@link BiPredicate}.
204      * 
205      * <p>
206      * <img src=
207      * "https://raw.githubusercontent.com/davidmoten/rxjava2-extras/master/src/docs/collectWhile.png"
208      * alt="image">
209      * 
210      * @param collectionFactory
211      *            factory to create a new collection
212      * @param add
213      *            method to add an item to a collection
214      * @param condition
215      *            while true will continue to add to the current collection. Do not
216      *            modify the given collection!
217      * @param emitRemainder
218      *            whether to emit the remainder as a collection
219      * @param <T>
220      *            item type
221      * @param <R>
222      *            collection type
223      * @return transform that collects while some conditions is returned then starts
224      *         a new collection
225      */
226     public static <T, R> FlowableTransformer<T, R> collectWhile(final Callable<R> collectionFactory,
227             final BiFunction<? super R, ? super T, ? extends R> add, final BiPredicate<? super R, ? super T> condition,
228             final boolean emitRemainder) {
229         return new FlowableTransformer<T, R>() {
230 
231             @Override
232             public Publisher<R> apply(Flowable<T> source) {
233                 return new FlowableCollectWhile<T, R>(source, collectionFactory, add, condition, emitRemainder);
234             }
235         };
236     }
237 
238     public static <T, R> FlowableTransformer<T, R> collectWhile(final Callable<R> collectionFactory,
239             final BiFunction<? super R, ? super T, ? extends R> add,
240             final BiPredicate<? super R, ? super T> condition) {
241         return collectWhile(collectionFactory, add, condition, true);
242     }
243 
244     public static <T> FlowableTransformer<T, List<T>> toListWhile(
245             final BiPredicate<? super List<T>, ? super T> condition, boolean emitRemainder) {
246         return collectWhile(ListFactoryHolder.<T>factory(), ListFactoryHolder.<T>add(), condition, emitRemainder);
247     }
248 
249     public static <T> FlowableTransformer<T, List<T>> toListWhile(
250             final BiPredicate<? super List<T>, ? super T> condition) {
251         return toListWhile(condition, true);
252     }
253 
254     public static <T> FlowableTransformer<T, List<T>> bufferWhile(
255             final BiPredicate<? super List<T>, ? super T> condition, boolean emitRemainder) {
256         return toListWhile(condition, emitRemainder);
257     }
258 
259     public static <T> FlowableTransformer<T, List<T>> bufferWhile(
260             final BiPredicate<? super List<T>, ? super T> condition) {
261         return toListWhile(condition);
262     }
263 
264     private static final class ListFactoryHolder {
265 
266         private static final Callable<List<Object>> INSTANCE = new Callable<List<Object>>() {
267 
268             @Override
269             public List<Object> call() throws Exception {
270                 return new ArrayList<Object>();
271             }
272         };
273 
274         private static final BiFunction<List<Object>, Object, List<Object>> ADD = new BiFunction<List<Object>, Object, List<Object>>() {
275 
276             @Override
277             public List<Object> apply(List<Object> list, Object t) throws Exception {
278                 list.add(t);
279                 return list;
280             }
281         };
282 
283         @SuppressWarnings("unchecked")
284         static <T> Callable<List<T>> factory() {
285             return (Callable<List<T>>) (Callable<?>) INSTANCE;
286         };
287 
288         @SuppressWarnings("unchecked")
289         static <T> BiFunction<List<T>, T, List<T>> add() {
290             return (BiFunction<List<T>, T, List<T>>) (BiFunction<?, ?, ?>) ADD;
291         }
292 
293     }
294 
295     public static <T extends Comparable<T>> FlowableTransformer<T, T> windowMax(final int windowSize) {
296         return windowMax(windowSize, Transformers.<T>naturalComparator());
297     }
298 
299     public static <T> FlowableTransformer<T, T> windowMax(final int windowSize,
300             final Comparator<? super T> comparator) {
301         return new FlowableTransformer<T, T>() {
302             @Override
303             public Flowable<T> apply(Flowable<T> source) {
304                 return new FlowableWindowMinMax<T>(source, windowSize, comparator, Metric.MAX);
305             }
306         };
307     }
308 
309     public static <T extends Comparable<T>> FlowableTransformer<T, T> windowMin(final int windowSize) {
310         return windowMin(windowSize, Transformers.<T>naturalComparator());
311     }
312 
313     public static <T> FlowableTransformer<T, T> windowMin(final int windowSize,
314             final Comparator<? super T> comparator) {
315         return new FlowableTransformer<T, T>() {
316             @Override
317             public Flowable<T> apply(Flowable<T> source) {
318                 return new FlowableWindowMinMax<T>(source, windowSize, comparator, Metric.MIN);
319             }
320         };
321     }
322 
323     private static class NaturalComparatorHolder {
324         static final Comparator<Comparable<Object>> INSTANCE = new Comparator<Comparable<Object>>() {
325 
326             @Override
327             public int compare(Comparable<Object> o1, Comparable<Object> o2) {
328                 return o1.compareTo(o2);
329             }
330         };
331     }
332 
333     @SuppressWarnings("unchecked")
334     private static <T extends Comparable<T>> Comparator<T> naturalComparator() {
335         return (Comparator<T>) (Comparator<?>) NaturalComparatorHolder.INSTANCE;
336     }
337 
338     public static <T> FlowableTransformer<T, T> maxRequest(final long... maxRequest) {
339         return new FlowableTransformer<T, T>() {
340 
341             @Override
342             public Publisher<T> apply(Flowable<T> source) {
343                 return new FlowableMaxRequest<T>(source, maxRequest);
344             }
345         };
346     }
347 
348     public static <T> FlowableTransformer<T, T> minRequest(final int... minRequests) {
349         return new FlowableTransformer<T, T>() {
350 
351             @Override
352             public Publisher<T> apply(Flowable<T> source) {
353                 return new FlowableMinRequest<T>(source, minRequests);
354             }
355         };
356     }
357 
358     public static <T> FlowableTransformer<T, T> rebatchRequests(final int minRequest, final long maxRequest,
359             final boolean constrainFirstRequestMin) {
360         Preconditions.checkArgument(minRequest <= maxRequest, "minRequest cannot be greater than maxRequest");
361         return new FlowableTransformer<T, T>() {
362 
363             @Override
364             public Publisher<T> apply(Flowable<T> source) {
365                 if (minRequest == maxRequest && constrainFirstRequestMin) {
366                     return source.rebatchRequests(minRequest);
367                 } else {
368                     return source
369                             .compose(Transformers.<T>minRequest(constrainFirstRequestMin ? minRequest : 1, minRequest))
370                             .compose(Transformers.<T>maxRequest(maxRequest));
371                 }
372             }
373         };
374     }
375 
376     public static <T> FlowableTransformer<T, T> rebatchRequests(int minRequest, long maxRequest) {
377         return rebatchRequests(minRequest, maxRequest, true);
378     }
379 
380     public static <T> Function<Flowable<T>, Flowable<T>> repeat(
381             final Function<? super Flowable<T>, ? extends Flowable<T>> transform, final int maxChained,
382             final long maxIterations, final Function<Observable<T>, Observable<?>> tester) {
383         Preconditions.checkArgument(maxChained > 0, "maxChained must be > 0");
384         Preconditions.checkArgument(maxIterations > 0, "maxIterations must be > 0");
385         Preconditions.checkNotNull(transform, "transform must not be null");
386         Preconditions.checkNotNull(tester, "tester must not be null");
387         return new Function<Flowable<T>, Flowable<T>>() {
388             @Override
389             public Flowable<T> apply(Flowable<T> source) {
390                 return new FlowableRepeatingTransform<T>(source, transform, maxChained, maxIterations, tester);
391             }
392         };
393     }
394 
395     public static <T> Function<Flowable<T>, Flowable<T>> reduce(
396             final Function<? super Flowable<T>, ? extends Flowable<T>> reducer, final int maxChained,
397             final long maxIterations) {
398         return repeat(reducer, maxChained, maxIterations, Transformers.<T>finishWhenSingle());
399     }
400 
401     @SuppressWarnings("unchecked")
402     private static <T> Function<Observable<T>, Observable<?>> finishWhenSingle() {
403         return (Function<Observable<T>, Observable<?>>) (Function<?, Observable<?>>) FINISH_WHEN_SINGLE;
404     }
405 
406     private static final Function<Observable<Object>, Observable<?>> FINISH_WHEN_SINGLE = new Function<Observable<Object>, Observable<?>>() {
407 
408         @Override
409         public Observable<?> apply(final Observable<Object> o) throws Exception {
410             return Observable.defer(new Callable<Observable<Object>>() {
411 
412                 final long[] count = new long[1];
413 
414                 @Override
415                 public Observable<Object> call() throws Exception {
416                     return o.materialize() //
417                             .flatMap(new Function<Notification<Object>, Observable<Notification<Object>>>() {
418                                 @Override
419                                 public Observable<Notification<Object>> apply(Notification<Object> x) throws Exception {
420                                     if (x.isOnNext()) {
421                                         count[0]++;
422                                         if (count[0] > 1) {
423                                             return Observable.just(x);
424                                         } else {
425                                             return Observable.empty();
426                                         }
427                                     } else if (x.isOnComplete()) {
428                                         if (count[0] <= 1) {
429                                             // complete the stream
430                                             return Observable.just(x);
431                                         } else {
432                                             // never complete
433                                             return Observable.never();
434                                         }
435                                     } else {
436                                         // is onError
437                                         return Observable.just(x);
438                                     }
439                                 }
440                             }) //
441                             .dematerialize(Functions.<Notification<Object>>identity());
442                 }
443             });
444         }
445     };
446 
447     public static <T> Function<Flowable<T>, Flowable<T>> reduce(
448             final Function<? super Flowable<T>, ? extends Flowable<T>> reducer, final int maxChained) {
449         return reduce(reducer, maxChained, Long.MAX_VALUE);
450     }
451 
452     public static <T, R> FlowableTransformer<T, R> flatMapInterleaved(
453             final Function<? super T, ? extends Publisher<? extends R>> mapper, final int maxConcurrency) {
454         return flatMapInterleaved(mapper, maxConcurrency, 128, false);
455     }
456 
457     public static <T, R> FlowableTransformer<T, R> flatMapInterleaved(
458             final Function<? super T, ? extends Publisher<? extends R>> mapper, final int maxConcurrency,
459             final int bufferSize, final boolean delayErrors) {
460         return new FlowableTransformer<T, R>() {
461             @Override
462             public Publisher<R> apply(Flowable<T> f) {
463                 return Flowables.mergeInterleaved(f.map(mapper), maxConcurrency, bufferSize, delayErrors);
464             }
465         };
466     }
467 
468     /**
469      * For every onNext emission from the source stream, the {@code valueToInsert}
470      * Maybe is subscribed to. If the Maybe emits before the next upstream emission
471      * then the result from the Maybe will be inserted into the stream. If the Maybe
472      * does not emit before the next upstream emission then it is cancelled (and no
473      * value is inserted).
474      * 
475      * @param valueToInsert
476      *            a Maybe is calculated from last source emission and subscribed to.
477      *            If succeeds before next source emission then result is inserted
478      *            into stream.
479      * @param <T>
480      *            stream element type
481      * @return source with operator insert applied
482      */
483     public static <T> FlowableTransformer<T, T> insert(
484             final Function<? super T, ? extends Maybe<? extends T>> valueToInsert) {
485         return new FlowableTransformer<T, T>() {
486 
487             @Override
488             public Publisher<T> apply(Flowable<T> source) {
489                 return new FlowableInsertMaybe<T>(source, valueToInsert);
490             }
491 
492         };
493     }
494     
495     public static <T> FlowableTransformer<T, T> insert(long timeout, TimeUnit unit, T value) {
496         return insert(Functions.constant(timeout), unit, value);
497     }
498     
499     public static <T> FlowableTransformer<T, T> insert(Function<? super T, ? extends Long> timeout, TimeUnit unit,
500             T value) {
501         return insert(timeout, unit, Functions.constant(value));
502     }
503     
504     public static <T> FlowableTransformer<T, T> insert(Function<? super T, ? extends Long> timeout, TimeUnit unit,
505             Function<? super T, ? extends T> value) {
506         return insert(timeout, unit, value, Schedulers.computation());
507     }
508     
509     public static <T> FlowableTransformer<T, T> insert(final Function<? super T, ? extends Long> timeout,
510             final TimeUnit unit, final Function<? super T, ? extends T> value, final Scheduler scheduler) {
511         return new FlowableTransformer<T, T>() {
512 
513             @Override
514             public Publisher<T> apply(Flowable<T> source) {
515                 return new FlowableInsertTimeout<T>(source, timeout, unit, value, scheduler);
516             }
517 
518         };
519     }
520     
521     public static <T> FlowableTransformer<T, T> insert(
522             final Maybe<? extends T> valueToInsert) {
523         return new FlowableTransformer<T, T>() {
524 
525             @Override
526             public Publisher<T> apply(Flowable<T> source) {
527                 return new FlowableInsertMaybe<T>(source, Functions.constant(valueToInsert));
528             }
529 
530         };
531     }
532 
533     private static final class MyOptional<T> {
534         private static final MyOptional<Object> EMPTY = new MyOptional<Object>(null);
535 
536         final T t;
537 
538         private MyOptional(T t) {
539             this.t = t;
540         }
541 
542         static <T> MyOptional<T> of(T t) {
543             Preconditions.checkNotNull(t);
544             return new MyOptional<T>(t);
545         }
546 
547         @SuppressWarnings("unchecked")
548         static <T> MyOptional<T> empty() {
549             return (MyOptional<T>) EMPTY;
550         }
551 
552         boolean isPresent() {
553             return t != null;
554         }
555 
556         T get() {
557             Preconditions.checkNotNull(t);
558             return t;
559         }
560 
561         private static final Function<Object, MyOptional<Object>> OF = new Function<Object, MyOptional<Object>>() {
562             @Override
563             public MyOptional<Object> apply(Object x) throws Exception {
564                 return MyOptional.of(x);
565             }
566         };
567 
568         @SuppressWarnings("unchecked")
569         static <T> Function<Object, MyOptional<T>> of() {
570             return (Function<Object, MyOptional<T>>) (Function<Object, ?>) OF;
571         }
572 
573         private static final BiFunction<List<Object>, MyOptional<Object>, List<Object>> ADD = new BiFunction<List<Object>, MyOptional<Object>, List<Object>>() {
574             @Override
575             public List<Object> apply(List<Object> list, MyOptional<Object> x) throws Exception {
576                 if (x.isPresent()) {
577                     list.add(x.get());
578                 }
579                 return list;
580             }
581         };
582 
583         @SuppressWarnings("unchecked")
584         static <T> BiFunction<List<T>, MyOptional<T>, List<T>> addIfPresent() {
585             return (BiFunction<List<T>, MyOptional<T>, List<T>>) (BiFunction<?, ?, ?>) ADD;
586         }
587 
588         // does not really belong in MyOptional but saves creating another class
589         private static final Predicate<List<Object>> LIST_HAS_ELEMENTS = new Predicate<List<Object>>() {
590             @Override
591             public boolean test(List<Object> list) throws Exception {
592                 return !list.isEmpty();
593             }
594         };
595 
596         // does not really belong in MyOptional but saves creating another class
597         @SuppressWarnings("unchecked")
598         static <T> Predicate<List<T>> listHasElements() {
599             return (Predicate<List<T>>) (Predicate<?>) LIST_HAS_ELEMENTS;
600         }
601     }
602 
603     /**
604      * Buffers the source {@link Flowable} into {@link List}s, emitting Lists when
605      * the size of a list reaches {@code maxSize} or if the elapsed time since last
606      * emission from the source reaches the given duration.
607      * {@link Schedulers#computation} is used for scheduling an inserted emission.
608      * 
609      * @param maxSize
610      *            max size of emitted lists
611      * @param duration
612      *            buffered list is emitted if the elapsed time since last emission
613      *            from the source reaches this duration
614      * @param unit
615      *            unit of {@code duration}
616      * @param <T>
617      *            type of the source stream items
618      * @return source with operator applied
619      */
620     public static <T> FlowableTransformer<T, List<T>> buffer(final int maxSize, final long duration,
621             final TimeUnit unit) {
622         return buffer(maxSize, Functions.constant(duration), unit);
623     }
624 
625     /**
626      * Buffers the source {@link Flowable} into {@link List}s, emitting Lists when the size of a
627      * list reaches {@code maxSize} or if the elapsed time since last emission from
628      * the source reaches the given duration.
629      * 
630      * @param maxSize
631      *            max size of emitted lists
632      * @param duration
633      *            buffered list is emitted if the elapsed time since last emission
634      *            from the source reaches this duration
635      * @param unit
636      *            unit of {@code duration}
637      * @param scheduler
638      *            scheduler to use to schedule emission of a buffer (as a list) if
639      *            the time since last emission from the source reaches duration
640      * @param <T>
641      *            type of the source stream items
642      * @return source with operator applied
643      */
644     public static <T> FlowableTransformer<T, List<T>> buffer(final int maxSize, final long duration,
645             final TimeUnit unit, final Scheduler scheduler) {
646         return buffer(maxSize, Functions.constant(duration), unit, scheduler);
647     }
648     
649     /**
650      * Buffers the source {@link Flowable} into {@link List}s, emitting Lists when
651      * the size of a list reaches {@code maxSize} or if the elapsed time since last
652      * emission from the source reaches the given duration. An emission on timeout
653      * is scheduled on {@link Schedulers#computation()}.
654      * 
655      * @param maxSize
656      *            max size of emitted lists
657      * @param duration
658      *            function that based on the last emission calculates the elapsed
659      *            time to be used before emitting a buffered list
660      * @param unit
661      *            unit of {@code duration}
662      * @param <T>
663      *            type of the source stream items
664      * @return source with operator applied
665      */
666     public static <T> FlowableTransformer<T, List<T>> buffer(final int maxSize,
667             final Function<? super T, ? extends Long> duration, final TimeUnit unit) {
668         return buffer(maxSize, duration, unit, Schedulers.computation());
669     }
670     
671     /**
672      * Buffers the source {@link Flowable} into {@link List}s, emitting Lists when
673      * the size of a list reaches {@code maxSize} or if the elapsed time since last
674      * emission from the source reaches the given duration.
675      * 
676      * @param maxSize
677      *            max size of emitted lists
678      * @param duration
679      *            function that based on the last emission calculates the elapsed
680      *            time to be used before emitting a buffered list
681      * @param unit
682      *            unit of {@code duration}
683      * @param scheduler
684      *            scheduler to use to schedule emission of a buffer (as a list) if
685      *            the time since last emission from the source reaches duration
686      * @param <T>
687      *            type of the source stream items
688      * @return source with operator applied
689      */
690     public static <T> FlowableTransformer<T, List<T>> buffer(final int maxSize,
691             final Function<? super T, ? extends Long> duration, final TimeUnit unit, final Scheduler scheduler) {
692 
693         final BiPredicate<List<T>, MyOptional<T>> condition = new BiPredicate<List<T>, MyOptional<T>>() {
694             @Override
695             public boolean test(List<T> list, MyOptional<T> x) throws Exception {
696                 return list.size() < maxSize && x.isPresent();
697             }
698         };
699         Function<MyOptional<T>, Long> timeout = new Function<MyOptional<T>, Long>() {
700             @Override
701             public Long apply(MyOptional<T> t) throws Exception {
702                 return duration.apply(t.get());
703             }
704         };
705         final FlowableTransformer<MyOptional<T>, MyOptional<T>> insert = insert(timeout, unit,
706                 Functions.constant(MyOptional.<T>empty()), scheduler);
707 
708         final FlowableTransformer<MyOptional<T>, List<T>> collectWhile = collectWhile( //
709                 // factory
710                 ListFactoryHolder.<T>factory(), //
711                 // add function
712                 MyOptional.<T>addIfPresent(), //
713                 // condition
714                 condition);
715 
716         return new FlowableTransformer<T, List<T>>() {
717             @Override
718             public Publisher<List<T>> apply(Flowable<T> source) {
719 
720                 return source //
721                         .map(MyOptional.<T>of()) //
722                         .compose(insert) //
723                         .compose(collectWhile)
724                         // need this filter because sometimes nothing gets added to list
725                         .filter(MyOptional.<T>listHasElements()); //
726             }
727         };
728     }
729 }