1 package com.github.davidmoten.rx2.flowable;
2
3 import java.util.ArrayList;
4 import java.util.Comparator;
5 import java.util.List;
6 import java.util.concurrent.Callable;
7 import java.util.concurrent.TimeUnit;
8
9 import org.reactivestreams.Publisher;
10
11 import com.github.davidmoten.guavamini.Preconditions;
12 import com.github.davidmoten.rx2.BiFunctions;
13 import com.github.davidmoten.rx2.Flowables;
14 import com.github.davidmoten.rx2.Functions;
15 import com.github.davidmoten.rx2.StateMachine;
16 import com.github.davidmoten.rx2.StateMachine2;
17 import com.github.davidmoten.rx2.Statistics;
18 import com.github.davidmoten.rx2.buffertofile.Options;
19 import com.github.davidmoten.rx2.internal.flowable.FlowableCollectWhile;
20 import com.github.davidmoten.rx2.internal.flowable.FlowableDoOnEmpty;
21 import com.github.davidmoten.rx2.internal.flowable.FlowableInsertMaybe;
22 import com.github.davidmoten.rx2.internal.flowable.FlowableInsertTimeout;
23 import com.github.davidmoten.rx2.internal.flowable.FlowableMapLast;
24 import com.github.davidmoten.rx2.internal.flowable.FlowableMatch;
25 import com.github.davidmoten.rx2.internal.flowable.FlowableMaxRequest;
26 import com.github.davidmoten.rx2.internal.flowable.FlowableMinRequest;
27 import com.github.davidmoten.rx2.internal.flowable.FlowableRepeatingTransform;
28 import com.github.davidmoten.rx2.internal.flowable.FlowableReverse;
29 import com.github.davidmoten.rx2.internal.flowable.FlowableWindowMinMax;
30 import com.github.davidmoten.rx2.internal.flowable.FlowableWindowMinMax.Metric;
31 import com.github.davidmoten.rx2.internal.flowable.TransformerStateMachine;
32 import com.github.davidmoten.rx2.util.Pair;
33
34 import io.reactivex.BackpressureStrategy;
35 import io.reactivex.Flowable;
36 import io.reactivex.FlowableEmitter;
37 import io.reactivex.FlowableTransformer;
38 import io.reactivex.Maybe;
39 import io.reactivex.Notification;
40 import io.reactivex.Observable;
41 import io.reactivex.Scheduler;
42 import io.reactivex.functions.Action;
43 import io.reactivex.functions.BiFunction;
44 import io.reactivex.functions.BiPredicate;
45 import io.reactivex.functions.Function;
46 import io.reactivex.functions.Function3;
47 import io.reactivex.functions.Predicate;
48 import io.reactivex.schedulers.Schedulers;
49
50 public final class Transformers {
51
52 private Transformers() {
53
54 }
55
56 public static <State, In, Out> FlowableTransformer<In, Out> stateMachine(Callable<? extends State> initialState,
57 Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
58 BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
59 BackpressureStrategy backpressureStrategy, int requestBatchSize) {
60 return TransformerStateMachine.create(initialState, transition, completion, backpressureStrategy,
61 requestBatchSize);
62 }
63
64 public static StateMachine.Builder stateMachine() {
65 return StateMachine.builder();
66 }
67
68 public static StateMachine2.Builder stateMachine2() {
69 return StateMachine2.builder();
70 }
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 public static <T> FlowableTransformer<T, T> doOnEmpty(final Action action) {
89 return new FlowableTransformer<T, T>() {
90
91 @Override
92 public Publisher<T> apply(Flowable<T> upstream) {
93 return new FlowableDoOnEmpty<T>(upstream, action);
94 }
95 };
96 }
97
98 @SuppressWarnings("unchecked")
99 public static <T> FlowableTransformer<T, T> reverse() {
100 return (FlowableTransformer<T, T>) ReverseHolder.INSTANCE;
101 }
102
103 private static final class ReverseHolder {
104 static final FlowableTransformer<Object, Object> INSTANCE = new FlowableTransformer<Object, Object>() {
105
106 @Override
107 public Publisher<Object> apply(Flowable<Object> upstream) {
108 return FlowableReverse.reverse(upstream);
109 }
110
111 };
112
113 }
114
115 public static <T> FlowableTransformer<T, T> mapLast(final Function<? super T, ? extends T> function) {
116 return new FlowableTransformer<T, T>() {
117
118 @Override
119 public Publisher<T> apply(Flowable<T> upstream) {
120 return new FlowableMapLast<T>(upstream, function);
121 }
122
123 };
124
125 }
126
127 public static <A, B, K, C> Flowable<C> match(Flowable<A> a, Flowable<B> b, Function<? super A, K> aKey,
128 Function<? super B, K> bKey, BiFunction<? super A, ? super B, C> combiner, int requestSize) {
129 return new FlowableMatch<A, B, K, C>(a, b, aKey, bKey, combiner, requestSize);
130 }
131
132 public static <A, B, C, K> FlowableTransformer<A, C> matchWith(final Flowable<B> b,
133 final Function<? super A, K> aKey, final Function<? super B, K> bKey,
134 final BiFunction<? super A, ? super B, C> combiner, int requestSize) {
135 return new FlowableTransformer<A, C>() {
136
137 @Override
138 public Publisher<C> apply(Flowable<A> upstream) {
139 return Flowables.match(upstream, b, aKey, bKey, combiner);
140 }
141 };
142 }
143
144 public static <A, B, C, K> FlowableTransformer<A, C> matchWith(final Flowable<B> b,
145 final Function<? super A, K> aKey, final Function<? super B, K> bKey,
146 final BiFunction<? super A, ? super B, C> combiner) {
147 return matchWith(b, aKey, bKey, combiner, 128);
148 }
149
150 public static Options.BuilderFlowable onBackpressureBufferToFile() {
151 return Options.builderFlowable();
152 }
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 @SuppressWarnings("unchecked")
170 public static <T extends Number> FlowableTransformer<T, Statistics> collectStats() {
171 return (FlowableTransformer<T, Statistics>) CollectStatsHolder.INSTANCE;
172 }
173
174 private static final class CollectStatsHolder {
175 static final FlowableTransformer<Number, Statistics> INSTANCE = new FlowableTransformer<Number, Statistics>() {
176
177 @Override
178 public Flowable<Statistics> apply(Flowable<Number> source) {
179 return source.scan(Statistics.create(), BiFunctions.collectStats());
180 }
181 };
182 }
183
184 public static <T, R extends Number> FlowableTransformer<T, Pair<T, Statistics>> collectStats(
185 final Function<? super T, ? extends R> function) {
186 return new FlowableTransformer<T, Pair<T, Statistics>>() {
187
188 @Override
189 public Flowable<Pair<T, Statistics>> apply(Flowable<T> source) {
190 return source.scan(Pair.create((T) null, Statistics.create()),
191 new BiFunction<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
192 @Override
193 public Pair<T, Statistics> apply(Pair<T, Statistics> pair, T t) throws Exception {
194 return Pair.create(t, pair.b().add(function.apply(t)));
195 }
196 }).skip(1);
197 }
198 };
199 }
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226 public static <T, R> FlowableTransformer<T, R> collectWhile(final Callable<R> collectionFactory,
227 final BiFunction<? super R, ? super T, ? extends R> add, final BiPredicate<? super R, ? super T> condition,
228 final boolean emitRemainder) {
229 return new FlowableTransformer<T, R>() {
230
231 @Override
232 public Publisher<R> apply(Flowable<T> source) {
233 return new FlowableCollectWhile<T, R>(source, collectionFactory, add, condition, emitRemainder);
234 }
235 };
236 }
237
238 public static <T, R> FlowableTransformer<T, R> collectWhile(final Callable<R> collectionFactory,
239 final BiFunction<? super R, ? super T, ? extends R> add,
240 final BiPredicate<? super R, ? super T> condition) {
241 return collectWhile(collectionFactory, add, condition, true);
242 }
243
244 public static <T> FlowableTransformer<T, List<T>> toListWhile(
245 final BiPredicate<? super List<T>, ? super T> condition, boolean emitRemainder) {
246 return collectWhile(ListFactoryHolder.<T>factory(), ListFactoryHolder.<T>add(), condition, emitRemainder);
247 }
248
249 public static <T> FlowableTransformer<T, List<T>> toListWhile(
250 final BiPredicate<? super List<T>, ? super T> condition) {
251 return toListWhile(condition, true);
252 }
253
254 public static <T> FlowableTransformer<T, List<T>> bufferWhile(
255 final BiPredicate<? super List<T>, ? super T> condition, boolean emitRemainder) {
256 return toListWhile(condition, emitRemainder);
257 }
258
259 public static <T> FlowableTransformer<T, List<T>> bufferWhile(
260 final BiPredicate<? super List<T>, ? super T> condition) {
261 return toListWhile(condition);
262 }
263
264 private static final class ListFactoryHolder {
265
266 private static final Callable<List<Object>> INSTANCE = new Callable<List<Object>>() {
267
268 @Override
269 public List<Object> call() throws Exception {
270 return new ArrayList<Object>();
271 }
272 };
273
274 private static final BiFunction<List<Object>, Object, List<Object>> ADD = new BiFunction<List<Object>, Object, List<Object>>() {
275
276 @Override
277 public List<Object> apply(List<Object> list, Object t) throws Exception {
278 list.add(t);
279 return list;
280 }
281 };
282
283 @SuppressWarnings("unchecked")
284 static <T> Callable<List<T>> factory() {
285 return (Callable<List<T>>) (Callable<?>) INSTANCE;
286 };
287
288 @SuppressWarnings("unchecked")
289 static <T> BiFunction<List<T>, T, List<T>> add() {
290 return (BiFunction<List<T>, T, List<T>>) (BiFunction<?, ?, ?>) ADD;
291 }
292
293 }
294
295 public static <T extends Comparable<T>> FlowableTransformer<T, T> windowMax(final int windowSize) {
296 return windowMax(windowSize, Transformers.<T>naturalComparator());
297 }
298
299 public static <T> FlowableTransformer<T, T> windowMax(final int windowSize,
300 final Comparator<? super T> comparator) {
301 return new FlowableTransformer<T, T>() {
302 @Override
303 public Flowable<T> apply(Flowable<T> source) {
304 return new FlowableWindowMinMax<T>(source, windowSize, comparator, Metric.MAX);
305 }
306 };
307 }
308
309 public static <T extends Comparable<T>> FlowableTransformer<T, T> windowMin(final int windowSize) {
310 return windowMin(windowSize, Transformers.<T>naturalComparator());
311 }
312
313 public static <T> FlowableTransformer<T, T> windowMin(final int windowSize,
314 final Comparator<? super T> comparator) {
315 return new FlowableTransformer<T, T>() {
316 @Override
317 public Flowable<T> apply(Flowable<T> source) {
318 return new FlowableWindowMinMax<T>(source, windowSize, comparator, Metric.MIN);
319 }
320 };
321 }
322
323 private static class NaturalComparatorHolder {
324 static final Comparator<Comparable<Object>> INSTANCE = new Comparator<Comparable<Object>>() {
325
326 @Override
327 public int compare(Comparable<Object> o1, Comparable<Object> o2) {
328 return o1.compareTo(o2);
329 }
330 };
331 }
332
333 @SuppressWarnings("unchecked")
334 private static <T extends Comparable<T>> Comparator<T> naturalComparator() {
335 return (Comparator<T>) (Comparator<?>) NaturalComparatorHolder.INSTANCE;
336 }
337
338 public static <T> FlowableTransformer<T, T> maxRequest(final long... maxRequest) {
339 return new FlowableTransformer<T, T>() {
340
341 @Override
342 public Publisher<T> apply(Flowable<T> source) {
343 return new FlowableMaxRequest<T>(source, maxRequest);
344 }
345 };
346 }
347
348 public static <T> FlowableTransformer<T, T> minRequest(final int... minRequests) {
349 return new FlowableTransformer<T, T>() {
350
351 @Override
352 public Publisher<T> apply(Flowable<T> source) {
353 return new FlowableMinRequest<T>(source, minRequests);
354 }
355 };
356 }
357
358 public static <T> FlowableTransformer<T, T> rebatchRequests(final int minRequest, final long maxRequest,
359 final boolean constrainFirstRequestMin) {
360 Preconditions.checkArgument(minRequest <= maxRequest, "minRequest cannot be greater than maxRequest");
361 return new FlowableTransformer<T, T>() {
362
363 @Override
364 public Publisher<T> apply(Flowable<T> source) {
365 if (minRequest == maxRequest && constrainFirstRequestMin) {
366 return source.rebatchRequests(minRequest);
367 } else {
368 return source
369 .compose(Transformers.<T>minRequest(constrainFirstRequestMin ? minRequest : 1, minRequest))
370 .compose(Transformers.<T>maxRequest(maxRequest));
371 }
372 }
373 };
374 }
375
376 public static <T> FlowableTransformer<T, T> rebatchRequests(int minRequest, long maxRequest) {
377 return rebatchRequests(minRequest, maxRequest, true);
378 }
379
380 public static <T> Function<Flowable<T>, Flowable<T>> repeat(
381 final Function<? super Flowable<T>, ? extends Flowable<T>> transform, final int maxChained,
382 final long maxIterations, final Function<Observable<T>, Observable<?>> tester) {
383 Preconditions.checkArgument(maxChained > 0, "maxChained must be > 0");
384 Preconditions.checkArgument(maxIterations > 0, "maxIterations must be > 0");
385 Preconditions.checkNotNull(transform, "transform must not be null");
386 Preconditions.checkNotNull(tester, "tester must not be null");
387 return new Function<Flowable<T>, Flowable<T>>() {
388 @Override
389 public Flowable<T> apply(Flowable<T> source) {
390 return new FlowableRepeatingTransform<T>(source, transform, maxChained, maxIterations, tester);
391 }
392 };
393 }
394
395 public static <T> Function<Flowable<T>, Flowable<T>> reduce(
396 final Function<? super Flowable<T>, ? extends Flowable<T>> reducer, final int maxChained,
397 final long maxIterations) {
398 return repeat(reducer, maxChained, maxIterations, Transformers.<T>finishWhenSingle());
399 }
400
401 @SuppressWarnings("unchecked")
402 private static <T> Function<Observable<T>, Observable<?>> finishWhenSingle() {
403 return (Function<Observable<T>, Observable<?>>) (Function<?, Observable<?>>) FINISH_WHEN_SINGLE;
404 }
405
406 private static final Function<Observable<Object>, Observable<?>> FINISH_WHEN_SINGLE = new Function<Observable<Object>, Observable<?>>() {
407
408 @Override
409 public Observable<?> apply(final Observable<Object> o) throws Exception {
410 return Observable.defer(new Callable<Observable<Object>>() {
411
412 final long[] count = new long[1];
413
414 @Override
415 public Observable<Object> call() throws Exception {
416 return o.materialize()
417 .flatMap(new Function<Notification<Object>, Observable<Notification<Object>>>() {
418 @Override
419 public Observable<Notification<Object>> apply(Notification<Object> x) throws Exception {
420 if (x.isOnNext()) {
421 count[0]++;
422 if (count[0] > 1) {
423 return Observable.just(x);
424 } else {
425 return Observable.empty();
426 }
427 } else if (x.isOnComplete()) {
428 if (count[0] <= 1) {
429
430 return Observable.just(x);
431 } else {
432
433 return Observable.never();
434 }
435 } else {
436
437 return Observable.just(x);
438 }
439 }
440 })
441 .dematerialize(Functions.<Notification<Object>>identity());
442 }
443 });
444 }
445 };
446
447 public static <T> Function<Flowable<T>, Flowable<T>> reduce(
448 final Function<? super Flowable<T>, ? extends Flowable<T>> reducer, final int maxChained) {
449 return reduce(reducer, maxChained, Long.MAX_VALUE);
450 }
451
452 public static <T, R> FlowableTransformer<T, R> flatMapInterleaved(
453 final Function<? super T, ? extends Publisher<? extends R>> mapper, final int maxConcurrency) {
454 return flatMapInterleaved(mapper, maxConcurrency, 128, false);
455 }
456
457 public static <T, R> FlowableTransformer<T, R> flatMapInterleaved(
458 final Function<? super T, ? extends Publisher<? extends R>> mapper, final int maxConcurrency,
459 final int bufferSize, final boolean delayErrors) {
460 return new FlowableTransformer<T, R>() {
461 @Override
462 public Publisher<R> apply(Flowable<T> f) {
463 return Flowables.mergeInterleaved(f.map(mapper), maxConcurrency, bufferSize, delayErrors);
464 }
465 };
466 }
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483 public static <T> FlowableTransformer<T, T> insert(
484 final Function<? super T, ? extends Maybe<? extends T>> valueToInsert) {
485 return new FlowableTransformer<T, T>() {
486
487 @Override
488 public Publisher<T> apply(Flowable<T> source) {
489 return new FlowableInsertMaybe<T>(source, valueToInsert);
490 }
491
492 };
493 }
494
495 public static <T> FlowableTransformer<T, T> insert(long timeout, TimeUnit unit, T value) {
496 return insert(Functions.constant(timeout), unit, value);
497 }
498
499 public static <T> FlowableTransformer<T, T> insert(Function<? super T, ? extends Long> timeout, TimeUnit unit,
500 T value) {
501 return insert(timeout, unit, Functions.constant(value));
502 }
503
504 public static <T> FlowableTransformer<T, T> insert(Function<? super T, ? extends Long> timeout, TimeUnit unit,
505 Function<? super T, ? extends T> value) {
506 return insert(timeout, unit, value, Schedulers.computation());
507 }
508
509 public static <T> FlowableTransformer<T, T> insert(final Function<? super T, ? extends Long> timeout,
510 final TimeUnit unit, final Function<? super T, ? extends T> value, final Scheduler scheduler) {
511 return new FlowableTransformer<T, T>() {
512
513 @Override
514 public Publisher<T> apply(Flowable<T> source) {
515 return new FlowableInsertTimeout<T>(source, timeout, unit, value, scheduler);
516 }
517
518 };
519 }
520
521 public static <T> FlowableTransformer<T, T> insert(
522 final Maybe<? extends T> valueToInsert) {
523 return new FlowableTransformer<T, T>() {
524
525 @Override
526 public Publisher<T> apply(Flowable<T> source) {
527 return new FlowableInsertMaybe<T>(source, Functions.constant(valueToInsert));
528 }
529
530 };
531 }
532
533 private static final class MyOptional<T> {
534 private static final MyOptional<Object> EMPTY = new MyOptional<Object>(null);
535
536 final T t;
537
538 private MyOptional(T t) {
539 this.t = t;
540 }
541
542 static <T> MyOptional<T> of(T t) {
543 Preconditions.checkNotNull(t);
544 return new MyOptional<T>(t);
545 }
546
547 @SuppressWarnings("unchecked")
548 static <T> MyOptional<T> empty() {
549 return (MyOptional<T>) EMPTY;
550 }
551
552 boolean isPresent() {
553 return t != null;
554 }
555
556 T get() {
557 Preconditions.checkNotNull(t);
558 return t;
559 }
560
561 private static final Function<Object, MyOptional<Object>> OF = new Function<Object, MyOptional<Object>>() {
562 @Override
563 public MyOptional<Object> apply(Object x) throws Exception {
564 return MyOptional.of(x);
565 }
566 };
567
568 @SuppressWarnings("unchecked")
569 static <T> Function<Object, MyOptional<T>> of() {
570 return (Function<Object, MyOptional<T>>) (Function<Object, ?>) OF;
571 }
572
573 private static final BiFunction<List<Object>, MyOptional<Object>, List<Object>> ADD = new BiFunction<List<Object>, MyOptional<Object>, List<Object>>() {
574 @Override
575 public List<Object> apply(List<Object> list, MyOptional<Object> x) throws Exception {
576 if (x.isPresent()) {
577 list.add(x.get());
578 }
579 return list;
580 }
581 };
582
583 @SuppressWarnings("unchecked")
584 static <T> BiFunction<List<T>, MyOptional<T>, List<T>> addIfPresent() {
585 return (BiFunction<List<T>, MyOptional<T>, List<T>>) (BiFunction<?, ?, ?>) ADD;
586 }
587
588
589 private static final Predicate<List<Object>> LIST_HAS_ELEMENTS = new Predicate<List<Object>>() {
590 @Override
591 public boolean test(List<Object> list) throws Exception {
592 return !list.isEmpty();
593 }
594 };
595
596
597 @SuppressWarnings("unchecked")
598 static <T> Predicate<List<T>> listHasElements() {
599 return (Predicate<List<T>>) (Predicate<?>) LIST_HAS_ELEMENTS;
600 }
601 }
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620 public static <T> FlowableTransformer<T, List<T>> buffer(final int maxSize, final long duration,
621 final TimeUnit unit) {
622 return buffer(maxSize, Functions.constant(duration), unit);
623 }
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644 public static <T> FlowableTransformer<T, List<T>> buffer(final int maxSize, final long duration,
645 final TimeUnit unit, final Scheduler scheduler) {
646 return buffer(maxSize, Functions.constant(duration), unit, scheduler);
647 }
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666 public static <T> FlowableTransformer<T, List<T>> buffer(final int maxSize,
667 final Function<? super T, ? extends Long> duration, final TimeUnit unit) {
668 return buffer(maxSize, duration, unit, Schedulers.computation());
669 }
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690 public static <T> FlowableTransformer<T, List<T>> buffer(final int maxSize,
691 final Function<? super T, ? extends Long> duration, final TimeUnit unit, final Scheduler scheduler) {
692
693 final BiPredicate<List<T>, MyOptional<T>> condition = new BiPredicate<List<T>, MyOptional<T>>() {
694 @Override
695 public boolean test(List<T> list, MyOptional<T> x) throws Exception {
696 return list.size() < maxSize && x.isPresent();
697 }
698 };
699 Function<MyOptional<T>, Long> timeout = new Function<MyOptional<T>, Long>() {
700 @Override
701 public Long apply(MyOptional<T> t) throws Exception {
702 return duration.apply(t.get());
703 }
704 };
705 final FlowableTransformer<MyOptional<T>, MyOptional<T>> insert = insert(timeout, unit,
706 Functions.constant(MyOptional.<T>empty()), scheduler);
707
708 final FlowableTransformer<MyOptional<T>, List<T>> collectWhile = collectWhile(
709
710 ListFactoryHolder.<T>factory(),
711
712 MyOptional.<T>addIfPresent(),
713
714 condition);
715
716 return new FlowableTransformer<T, List<T>>() {
717 @Override
718 public Publisher<List<T>> apply(Flowable<T> source) {
719
720 return source
721 .map(MyOptional.<T>of())
722 .compose(insert)
723 .compose(collectWhile)
724
725 .filter(MyOptional.<T>listHasElements());
726 }
727 };
728 }
729 }