1 package com.github.davidmoten.rx;
2
3 import java.nio.charset.CharsetDecoder;
4 import java.util.ArrayList;
5 import java.util.Arrays;
6 import java.util.Collection;
7 import java.util.Comparator;
8 import java.util.HashSet;
9 import java.util.List;
10 import java.util.Map;
11 import java.util.Set;
12 import java.util.concurrent.TimeUnit;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.regex.Pattern;
15
16 import com.github.davidmoten.rx.StateMachine.Completion;
17 import com.github.davidmoten.rx.StateMachine.Transition;
18 import com.github.davidmoten.rx.buffertofile.DataSerializer;
19 import com.github.davidmoten.rx.buffertofile.DataSerializers;
20 import com.github.davidmoten.rx.buffertofile.Options;
21 import com.github.davidmoten.rx.internal.operators.OnSubscribeDoOnEmpty;
22 import com.github.davidmoten.rx.internal.operators.OnSubscribeMapLast;
23 import com.github.davidmoten.rx.internal.operators.OperatorBufferPredicateBoundary;
24 import com.github.davidmoten.rx.internal.operators.OperatorBufferToFile;
25 import com.github.davidmoten.rx.internal.operators.OperatorDoOnNth;
26 import com.github.davidmoten.rx.internal.operators.OperatorFromTransformer;
27 import com.github.davidmoten.rx.internal.operators.TransformerOnTerminateResume;
28 import com.github.davidmoten.rx.internal.operators.OperatorSampleFirst;
29 import com.github.davidmoten.rx.internal.operators.OperatorWindowMinMax;
30 import com.github.davidmoten.rx.internal.operators.OperatorWindowMinMax.Metric;
31 import com.github.davidmoten.rx.internal.operators.OrderedMerge;
32 import com.github.davidmoten.rx.internal.operators.TransformerDecode;
33 import com.github.davidmoten.rx.internal.operators.TransformerDelayFinalUnsubscribe;
34 import com.github.davidmoten.rx.internal.operators.TransformerLimitSubscribers;
35 import com.github.davidmoten.rx.internal.operators.TransformerOnBackpressureBufferRequestLimiting;
36 import com.github.davidmoten.rx.internal.operators.TransformerStateMachine;
37 import com.github.davidmoten.rx.internal.operators.TransformerStringSplit;
38 import com.github.davidmoten.rx.util.BackpressureStrategy;
39 import com.github.davidmoten.rx.util.MapWithIndex;
40 import com.github.davidmoten.rx.util.MapWithIndex.Indexed;
41 import com.github.davidmoten.rx.util.Pair;
42 import com.github.davidmoten.util.Optional;
43
44 import rx.Notification;
45 import rx.Observable;
46 import rx.Observable.Operator;
47 import rx.Observable.Transformer;
48 import rx.Observer;
49 import rx.Scheduler;
50 import rx.Scheduler.Worker;
51 import rx.Subscriber;
52 import rx.functions.Action0;
53 import rx.functions.Action1;
54 import rx.functions.Action2;
55 import rx.functions.Func0;
56 import rx.functions.Func1;
57 import rx.functions.Func2;
58 import rx.functions.Func3;
59 import rx.internal.util.RxRingBuffer;
60 import rx.observables.GroupedObservable;
61 import rx.schedulers.Schedulers;
62
63 public final class Transformers {
64
65 static final int DEFAULT_INITIAL_BATCH = 1;
66
67 public static <T, R> Operator<R, T> toOperator(
68 Func1<? super Observable<T>, ? extends Observable<R>> function) {
69 return OperatorFromTransformer.toOperator(function);
70 }
71
72 public static <T extends Number> Transformer<T, Statistics> collectStats() {
73 return new Transformer<T, Statistics>() {
74
75 @Override
76 public Observable<Statistics> call(Observable<T> o) {
77 return o.scan(Statistics.create(), Functions.collectStats());
78 }
79 };
80 }
81
82 public static <T, R extends Number> Transformer<T, Pair<T, Statistics>> collectStats(
83 final Func1<? super T, ? extends R> function) {
84 return new Transformer<T, Pair<T, Statistics>>() {
85
86 @Override
87 public Observable<Pair<T, Statistics>> call(Observable<T> source) {
88 return source.scan(Pair.create((T) null, Statistics.create()),
89 new Func2<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
90 @Override
91 public Pair<T, Statistics> call(Pair<T, Statistics> pair, T t) {
92 return Pair.create(t, pair.b().add(function.call(t)));
93 }
94 }).skip(1);
95 }
96 };
97 }
98
99 public static <T extends Comparable<? super T>> Transformer<T, T> sort() {
100 return new Transformer<T, T>() {
101
102 @Override
103 public Observable<T> call(Observable<T> o) {
104 return o.toSortedList().flatMapIterable(Functions.<List<T>>identity());
105 }
106 };
107 }
108
109 public static <T> Transformer<T, T> sort(final Comparator<? super T> comparator) {
110 return new Transformer<T, T>() {
111
112 @Override
113 public Observable<T> call(Observable<T> o) {
114 return o.toSortedList(Functions.toFunc2(comparator))
115 .flatMapIterable(Functions.<List<T>>identity());
116 }
117 };
118 }
119
120 public static <T> Transformer<T, Set<T>> toSet() {
121 return new Transformer<T, Set<T>>() {
122
123 @Override
124 public Observable<Set<T>> call(Observable<T> o) {
125 return o.collect(new Func0<Set<T>>() {
126
127 @Override
128 public Set<T> call() {
129 return new HashSet<T>();
130 }
131 }, new Action2<Set<T>, T>() {
132
133 @Override
134 public void call(Set<T> set, T t) {
135 set.add(t);
136 }
137 });
138 }
139 };
140 }
141
142 /**
143 * <p>
144 * Returns a {@link Transformer} that wraps stream emissions with their
145 * corresponding zero based index numbers (0,1,2,3,..) in instances of
146 * {@link Indexed}.
147 * <p>
148 * Example usage:
149 *
150 * <pre>
151 *
152 * {@code
153 * Observable
154 * .just("a","b","c)
155 * .mapWithIndex(Transformers.mapWithIndex())
156 * .map(x -> x.index() + "->" + x.value())
157 * .forEach(System.out::println);
158 * }
159 * </pre>
160 *
161 * <p>
162 * <img src=
163 * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/mapWithIndex.png?raw=true"
164 * alt="marble diagram">
165 *
166 * @param <T>
167 * generic type of the stream being supplemented with an index
168 * @return transformer that supplements each stream emission with its index
169 * (zero-based position) in the stream.
170 */
171 public static <T> Transformer<T, Indexed<T>> mapWithIndex() {
172 return MapWithIndex.instance();
173 }
174
175 /**
176 * <p>
177 * Returns a {@link Transformer} that allows processing of the source stream
178 * to be defined in a state machine where transitions of the state machine
179 * may also emit items to downstream that are buffered if necessary when
180 * backpressure is requested. <code>flatMap</code> is part of the processing
181 * chain so the source may experience requests for more items than are
182 * strictly required by the endpoint subscriber.
183 *
184 * <p>
185 * <img src=
186 * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/stateMachine.png?raw=true"
187 * alt="marble diagram">
188 *
189 * @param initialStateFactory
190 * the factory to create the initial state of the state machine.
191 * @param transition
192 * defines state transitions and consequent emissions to
193 * downstream when an item arrives from upstream. The
194 * {@link Subscriber} is called with the emissions to downstream.
195 * You can optionally call {@link Subscriber#isUnsubscribed()} to
196 * check if you can stop emitting from the transition. If you do
197 * wish to terminate the Observable then call
198 * {@link Subscriber#unsubscribe()} and return anything (say
199 * {@code null} from the transition (as the next state which will
200 * not be used). You can also complete the Observable by calling
201 * {@link Subscriber#onCompleted} or {@link Subscriber#onError}
202 * from within the transition and return anything from the
203 * transition (will not be used). The transition should run
204 * synchronously so that completion of a call to the transition
205 * should also signify all emissions from that transition have
206 * been made.
207 * @param completion
208 * defines activity that should happen based on the final state
209 * just before downstream <code>onCompleted()</code> is called.
210 * For example any buffered emissions in state could be emitted
211 * at this point. Don't call <code>observer.onCompleted()</code>
212 * as it is called for you after the action completes if and only
213 * if you return true from this function.
214 * @param backpressureStrategy
215 * is applied to the emissions from one call of transition and
216 * should enforce backpressure.
217 * @param <State>
218 * the class representing the state of the state machine
219 * @param <In>
220 * the input observable type
221 * @param <Out>
222 * the output observable type
223 * @throws NullPointerException
224 * if {@code initialStateFactory} or {@code transition},or
225 * {@code completionAction} is null
226 * @return a backpressure supporting transformer that implements the state
227 * machine specified by the parameters
228 */
229 public static <State, In, Out> Transformer<In, Out> stateMachine(
230 Func0<State> initialStateFactory,
231 Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition,
232 Func2<? super State, ? super Subscriber<Out>, Boolean> completion,
233 BackpressureStrategy backpressureStrategy) {
234 return TransformerStateMachine.<State, In, Out>create(initialStateFactory, transition,
235 completion, backpressureStrategy, DEFAULT_INITIAL_BATCH);
236 }
237
238 public static <State, In, Out> Transformer<In, Out> stateMachine(
239 Func0<State> initialStateFactory,
240 Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition,
241 Func2<? super State, ? super Subscriber<Out>, Boolean> completion,
242 BackpressureStrategy backpressureStrategy, int initialRequest) {
243 return TransformerStateMachine.<State, In, Out>create(initialStateFactory, transition,
244 completion, backpressureStrategy, initialRequest);
245 }
246
247 /**
248 * <p>
249 * Returns a {@link Transformer} that allows processing of the source stream
250 * to be defined in a state machine where transitions of the state machine
251 * may also emit items to downstream that are buffered if necessary when
252 * backpressure is requested. <code>flatMap</code> is part of the processing
253 * chain so the source may experience requests for more items than are
254 * strictly required by the endpoint subscriber. The backpressure strategy
255 * used for emissions from the transition into the flatMap is
256 * {@link BackpressureStrategy#BUFFER} which corresponds to
257 * {@link Observable#onBackpressureBuffer}.
258 *
259 * <p>
260 * <img src=
261 * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/stateMachine.png?raw=true"
262 * alt="marble diagram">
263 *
264 * @param initialStateFactory
265 * the factory to create the initial state of the state machine.
266 * @param transition
267 * defines state transitions and consequent emissions to
268 * downstream when an item arrives from upstream. The
269 * {@link Subscriber} is called with the emissions to downstream.
270 * You can optionally call {@link Subscriber#isUnsubscribed()} to
271 * check if you can stop emitting from the transition. If you do
272 * wish to terminate the Observable then call
273 * {@link Subscriber#unsubscribe()} and return anything (say
274 * {@code null} from the transition (as the next state which will
275 * not be used). You can also complete the Observable by calling
276 * {@link Subscriber#onCompleted} or {@link Subscriber#onError}
277 * from within the transition and return anything from the
278 * transition (will not be used). The transition should run
279 * synchronously so that completion of a call to the transition
280 * should also signify all emissions from that transition have
281 * been made.
282 * @param completion
283 * defines activity that should happen based on the final state
284 * just before downstream <code>onCompleted()</code> is called.
285 * For example any buffered emissions in state could be emitted
286 * at this point. Don't call <code>observer.onCompleted()</code>
287 * as it is called for you after the action completes if and only
288 * if you return true from this function.
289 * @param <State>
290 * the class representing the state of the state machine
291 * @param <In>
292 * the input observable type
293 * @param <Out>
294 * the output observable type
295 * @throws NullPointerException
296 * if {@code initialStateFactory} or {@code transition},or
297 * {@code completion} is null
298 * @return a backpressure supporting transformer that implements the state
299 * machine specified by the parameters
300 */
301 public static <State, In, Out> Transformer<In, Out> stateMachine(
302 Func0<? extends State> initialStateFactory,
303 Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition,
304 Func2<? super State, ? super Subscriber<Out>, Boolean> completion) {
305 return TransformerStateMachine.<State, In, Out>create(initialStateFactory, transition,
306 completion, BackpressureStrategy.BUFFER, DEFAULT_INITIAL_BATCH);
307 }
308
309 public static StateMachine.Builder stateMachine() {
310 return StateMachine.builder();
311 }
312
313 /**
314 * <p>
315 * Returns the source {@link Observable} merged with the <code>other</code>
316 * observable using the given {@link Comparator} for order. A precondition
317 * is that the source and other are already ordered. This transformer
318 * supports backpressure and its inputs must also support backpressure.
319 *
320 * <p>
321 * <img src=
322 * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/orderedMerge.png?raw=true"
323 * alt="marble diagram">
324 *
325 * @param other
326 * the other already ordered observable
327 * @param comparator
328 * the ordering to use
329 * @param <T>
330 * the generic type of the objects being compared
331 * @return merged and ordered observable
332 */
333 public static final <T> Transformer<T, T> orderedMergeWith(final Observable<T> other,
334 final Comparator<? super T> comparator) {
335 @SuppressWarnings("unchecked")
336 Collection<Observable<T>> collection = Arrays.asList(other);
337 return orderedMergeWith(collection, comparator);
338 }
339
340 /**
341 * <p>
342 * Returns the source {@link Observable} merged with all of the other
343 * observables using the given {@link Comparator} for order. A precondition
344 * is that the source and other are already ordered. This transformer
345 * supports backpressure and its inputs must also support backpressure.
346 *
347 * <p>
348 * <img src=
349 * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/orderedMerge.png?raw=true"
350 * alt="marble diagram">
351 *
352 * @param others
353 * a collection of already ordered observables to merge with
354 * @param comparator
355 * the ordering to use
356 * @param <T>
357 * the generic type of the objects being compared
358 * @return merged and ordered observable
359 */
360 public static final <T> Transformer<T, T> orderedMergeWith(
361 final Collection<Observable<T>> others, final Comparator<? super T> comparator) {
362 return new Transformer<T, T>() {
363
364 @Override
365 public Observable<T> call(Observable<T> source) {
366 List<Observable<T>> collection = new ArrayList<Observable<T>>();
367 collection.add(source);
368 collection.addAll(others);
369 return OrderedMerge.<T>create(collection, comparator, false);
370 }
371 };
372 }
373
374 /**
375 * Returns a {@link Transformer} that returns an {@link Observable} that is
376 * a buffering of the source Observable into lists of sequential items that
377 * are equal.
378 *
379 * <p>
380 * For example, the stream
381 * {@code Observable.just(1, 1, 2, 2, 1).compose(toListUntilChanged())}
382 * would emit {@code [1,1], [2], [1]}.
383 *
384 * @param <T>
385 * the generic type of the source Observable
386 * @return transformer as above
387 */
388 public static <T> Transformer<T, List<T>> toListUntilChanged() {
389 Func2<Collection<T>, T, Boolean> equal = HolderEquals.instance();
390 return toListWhile(equal);
391 }
392
393 private static class HolderEquals {
394 private static final Func2<Collection<Object>, Object, Boolean> INSTANCE = new Func2<Collection<Object>, Object, Boolean>() {
395 @Override
396 public Boolean call(Collection<Object> list, Object t) {
397 return list.isEmpty() || list.iterator().next().equals(t);
398 }
399 };
400
401 @SuppressWarnings("unchecked")
402 static <T> Func2<Collection<T>, T, Boolean> instance() {
403 return (Func2<Collection<T>, T, Boolean>) (Func2<?, ?, Boolean>) INSTANCE;
404 }
405 }
406
407 /**
408 * <p>
409 * Returns a {@link Transformer} that returns an {@link Observable} that is
410 * a buffering of the source Observable into lists of sequential items that
411 * satisfy the condition {@code condition}.
412 *
413 * <p>
414 * <img src=
415 * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/toListWhile.png?raw=true"
416 * alt="marble diagram">
417 *
418 * @param condition
419 * condition function that must return true if an item is to be
420 * part of the list being prepared for emission
421 * @param <T>
422 * the generic type of the source Observable
423 * @return transformer as above
424 */
425 public static <T> Transformer<T, List<T>> toListWhile(
426 final Func2<? super List<T>, ? super T, Boolean> condition) {
427
428 Func0<List<T>> initialState = new Func0<List<T>>() {
429 @Override
430 public List<T> call() {
431 return new ArrayList<T>();
432 }
433 };
434
435 Action2<List<T>, T> collect = new Action2<List<T>, T>() {
436
437 @Override
438 public void call(List<T> list, T n) {
439 list.add(n);
440 }
441 };
442 return collectWhile(initialState, collect, condition);
443 }
444
445 /**
446 * <p>
447 * Returns a {@link Transformer} that returns an {@link Observable} that is
448 * collected into {@code Collection} instances created by {@code factory}
449 * that are emitted when the collection and latest emission do not satisfy
450 * {@code condition} or on completion.
451 *
452 * <p>
453 * <img src=
454 * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/collectWhile.png?raw=true"
455 * alt="marble diagram">
456 *
457 * @param factory
458 * collection instance creator
459 * @param collect
460 * collection action
461 * @param condition
462 * returns true if and only if emission should be collected in
463 * current collection being prepared for emission
464 * @param isEmpty
465 * indicates that the collection is empty
466 * @param <T>
467 * generic type of source observable
468 * @param <R>
469 * collection type emitted by transformed Observable
470 * @return transformer as above
471 */
472 public static <T, R> Transformer<T, R> collectWhile(final Func0<R> factory,
473 final Action2<? super R, ? super T> collect,
474 final Func2<? super R, ? super T, Boolean> condition,
475 final Func1<? super R, Boolean> isEmpty) {
476 Func3<R, T, Observer<R>, R> transition = new Func3<R, T, Observer<R>, R>() {
477
478 @Override
479 public R call(R collection, T t, Observer<R> observer) {
480
481 if (condition.call(collection, t)) {
482 collect.call(collection, t);
483 return collection;
484 } else {
485 observer.onNext(collection);
486 R r = factory.call();
487 collect.call(r, t);
488 return r;
489 }
490 }
491
492 };
493 Func2<R, Observer<R>, Boolean> completionAction = new Func2<R, Observer<R>, Boolean>() {
494 @Override
495 public Boolean call(R collection, Observer<R> observer) {
496 if (!isEmpty.call(collection)) {
497 observer.onNext(collection);
498 }
499 return true;
500 }
501 };
502 return Transformers.stateMachine(factory, transition, completionAction);
503 }
504
505 /**
506 * <p>
507 * Returns a {@link Transformer} that returns an {@link Observable} that is
508 * collected into {@code Collection} instances created by {@code factory}
509 * that are emitted when items are not equal or on completion.
510 *
511 * <p>
512 * <img src=
513 * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/collectWhile.png?raw=true"
514 * alt="marble diagram">
515 *
516 * @param factory
517 * collection instance creator
518 * @param collect
519 * collection action
520 * @param <T>
521 * generic type of source observable
522 * @param <R>
523 * collection type emitted by transformed Observable
524 * @return transformer as above
525 */
526 public static <T, R extends Collection<T>> Transformer<T, R> collectWhile(
527 final Func0<R> factory, final Action2<? super R, ? super T> collect) {
528 return collectWhile(factory, collect, HolderEquals.<T>instance());
529 }
530
531 public static <T, R extends Iterable<?>> Transformer<T, R> collectWhile(final Func0<R> factory,
532 final Action2<? super R, ? super T> collect,
533 final Func2<? super R, ? super T, Boolean> condition) {
534 Func1<R, Boolean> isEmpty = new Func1<R, Boolean>() {
535 @Override
536 public Boolean call(R collection) {
537 return !collection.iterator().hasNext();
538 }
539 };
540 return collectWhile(factory, collect, condition, isEmpty);
541 }
542
543 /**
544 * Returns a {@link Transformer} that applied to a source {@link Observable}
545 * calls the given action on the {@code n}th onNext emission.
546 *
547 * @param n
548 * the 1-based count of onNext to do the action on
549 * @param action
550 * is performed on {@code n}th onNext.
551 * @param <T>
552 * the generic type of the Observable being transformed
553 * @return Transformer that applied to a source Observable calls the given
554 * action on the nth onNext emission.
555 */
556 public static <T> Transformer<T, T> doOnNext(final int n, final Action1<? super T> action) {
557 return new Transformer<T, T>() {
558 @Override
559 public Observable<T> call(Observable<T> o) {
560 return o.lift(OperatorDoOnNth.create(action, n));
561 }
562 };
563 }
564
565 /**
566 * Returns a {@link Transformer} that applied to a source {@link Observable}
567 * calls the given action on the first onNext emission.
568 *
569 * @param action
570 * is performed on first onNext
571 * @param <T>
572 * the generic type of the Observable being transformed
573 * @return Transformer that applied to a source Observable calls the given
574 * action on the first onNext emission.
575 */
576 public static <T> Transformer<T, T> doOnFirst(final Action1<? super T> action) {
577 return doOnNext(1, action);
578 }
579
580 /**
581 * <p>
582 * Returns an observable that subscribes to {@code this} and wait for
583 * completion but doesn't emit any items and once completes emits the
584 * {@code next} observable.
585 *
586 * <p>
587 * <img src=
588 * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/ignoreElementsThen.png?raw=true"
589 * alt="marble diagram">
590 *
591 * @param <R>
592 * input observable type
593 * @param <T>
594 * output observable type
595 * @param next
596 * observable to be emitted after ignoring elements of
597 * {@code this}
598 * @return Transformer that applied to a source Observable ignores the
599 * elements of the source and emits the elements of a second
600 * observable
601 */
602 public static <R, T> Transformer<T, R> ignoreElementsThen(final Observable<R> next) {
603 return new Transformer<T, R>() {
604
605 @SuppressWarnings("unchecked")
606 @Override
607 public Observable<R> call(Observable<T> source) {
608 return ((Observable<R>) (Observable<?>) source.ignoreElements()).concatWith(next);
609 }
610 };
611 }
612
613 public static <T> Transformer<String, String> split(String pattern) {
614 return TransformerStringSplit.split(pattern, null);
615 }
616
617 public static <T> Transformer<String, String> split(Pattern pattern) {
618 return TransformerStringSplit.split(null, pattern);
619 }
620
621 /**
622 * <p>
623 * Decodes a stream of multibyte chunks into a stream of strings that works
624 * on infinite streams and handles when a multibyte character spans two
625 * chunks. This method allows for more control over how malformed and
626 * unmappable characters are handled.
627 * <p>
628 * <img width="640" src=
629 * "https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.decode.png"
630 * alt="">
631 *
632 * @param charsetDecoder
633 * decodes the bytes into strings
634 * @return the Observable returning a stream of decoded strings
635 */
636 public static Transformer<byte[], String> decode(final CharsetDecoder charsetDecoder) {
637 return TransformerDecode.decode(charsetDecoder);
638 }
639
640 public static <T> Transformer<T, T> limitSubscribers(AtomicInteger subscriberCount,
641 int maxSubscribers) {
642 return new TransformerLimitSubscribers<T>(subscriberCount, maxSubscribers);
643 }
644
645 public static <T> Transformer<T, T> limitSubscribers(int maxSubscribers) {
646 return new TransformerLimitSubscribers<T>(new AtomicInteger(), maxSubscribers);
647 }
648
649 public static <T> Transformer<T, T> cache(final long duration, final TimeUnit unit,
650 final Worker worker) {
651 return new Transformer<T, T>() {
652 @Override
653 public Observable<T> call(Observable<T> o) {
654 return Obs.cache(o, duration, unit, worker);
655 }
656 };
657 }
658
659 public static <T> Transformer<T, T> sampleFirst(final long duration, final TimeUnit unit) {
660 return sampleFirst(duration, unit, Schedulers.computation());
661 }
662
663 public static <T> Transformer<T, T> sampleFirst(final long duration, final TimeUnit unit,
664 final Scheduler scheduler) {
665 if (duration <= 0) {
666 throw new IllegalArgumentException("duration must be > 0");
667 }
668 return new Transformer<T, T>() {
669
670 @Override
671 public Observable<T> call(Observable<T> source) {
672 return source.lift(new OperatorSampleFirst<T>(duration, unit, scheduler));
673 }
674 };
675 }
676
677 public static <T> Transformer<T, T> onBackpressureBufferToFile() {
678 return onBackpressureBufferToFile(DataSerializers.<T>javaIO(), Schedulers.computation(),
679 Options.defaultInstance());
680 }
681
682 public static <T> Transformer<T, T> onBackpressureBufferToFile(
683 final DataSerializer<T> serializer) {
684 return onBackpressureBufferToFile(serializer, Schedulers.computation(),
685 Options.defaultInstance());
686 }
687
688 public static <T> Transformer<T, T> onBackpressureBufferToFile(
689 final DataSerializer<T> serializer, final Scheduler scheduler) {
690 return onBackpressureBufferToFile(serializer, scheduler, Options.defaultInstance());
691 }
692
693 public static <T> Transformer<T, T> onBackpressureBufferToFile(
694 final DataSerializer<T> serializer, final Scheduler scheduler, final Options options) {
695 return new Transformer<T, T>() {
696 @Override
697 public Observable<T> call(Observable<T> o) {
698 return o.lift(new OperatorBufferToFile<T>(serializer, scheduler, options));
699 }
700 };
701 }
702
703 public static <T> Transformer<T, T> windowMin(final int windowSize,
704 final Comparator<? super T> comparator) {
705 return new Transformer<T, T>() {
706 @Override
707 public Observable<T> call(Observable<T> o) {
708 return o.lift(new OperatorWindowMinMax<T>(windowSize, comparator, Metric.MIN));
709 }
710 };
711 }
712
713 public static <T extends Comparable<T>> Transformer<T, T> windowMax(final int windowSize) {
714 return windowMax(windowSize, Transformers.<T>naturalComparator());
715 }
716
717 public static <T> Transformer<T, T> windowMax(final int windowSize,
718 final Comparator<? super T> comparator) {
719 return new Transformer<T, T>() {
720 @Override
721 public Observable<T> call(Observable<T> o) {
722 return o.lift(new OperatorWindowMinMax<T>(windowSize, comparator, Metric.MAX));
723 }
724 };
725 }
726
727 public static <T extends Comparable<T>> Transformer<T, T> windowMin(final int windowSize) {
728 return windowMin(windowSize, Transformers.<T>naturalComparator());
729 }
730
731 private static class NaturalComparatorHolder {
732 static final Comparator<Comparable<Object>> INSTANCE = new Comparator<Comparable<Object>>() {
733
734 @Override
735 public int compare(Comparable<Object> o1, Comparable<Object> o2) {
736 return o1.compareTo(o2);
737 }
738 };
739 }
740
741 @SuppressWarnings("unchecked")
742 private static <T extends Comparable<T>> Comparator<T> naturalComparator() {
743 return (Comparator<T>) (Comparator<?>) NaturalComparatorHolder.INSTANCE;
744 }
745
746 /**
747 * <p>
748 * Groups the items emitted by an {@code Observable} according to a
749 * specified criterion, and emits these grouped items as
750 * {@link GroupedObservable}s. The emitted {@code GroupedObservable} allows
751 * only a single {@link Subscriber} during its lifetime and if this
752 * {@code Subscriber} unsubscribes before the source terminates, the next
753 * emission by the source having the same key will trigger a new
754 * {@code GroupedObservable} emission.
755 * <p>
756 * <img width="640" height="360" src=
757 * "https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupBy.png"
758 * alt="">
759 * <p>
760 * <em>Note:</em> A {@link GroupedObservable} will cache the items it is to
761 * emit until such time as it is subscribed to. For this reason, in order to
762 * avoid memory leaks, you should not simply ignore those
763 * {@code GroupedObservable}s that do not concern you. Instead, you can
764 * signal to them that they may discard their buffers by applying an
765 * operator like {@code .ignoreElements()} to them.
766 * <dl>
767 * <dt><b>Scheduler:</b></dt>
768 * <dd>{@code groupBy} does not operate by default on a particular
769 * {@link Scheduler}.</dd>
770 * </dl>
771 *
772 * @param keySelector
773 * a function that extracts the key for each item
774 * @param elementSelector
775 * a function that extracts the return element for each item
776 * @param evictingMapFactory
777 * a function that given an eviction action returns a {@link Map}
778 * instance that will be used to assign items to the appropriate
779 * {@code GroupedObservable}s. The {@code Map} instance must be
780 * thread-safe and any eviction must trigger a call to the
781 * supplied action (synchronously or asynchronously). This can be
782 * used to limit the size of the map by evicting keys by maximum
783 * size or access time for instance. If
784 * {@code evictingMapFactory} is null then no eviction strategy
785 * will be applied (and a suitable default thread-safe
786 * implementation of {@code Map} will be supplied). Here's an
787 * example using Guava's {@code CacheBuilder} from v19.0:
788 *
789 * <pre>
790 * {@code
791 * Func1<Action1<K>, Map<K, Object>> mapFactory
792 * = action -> CacheBuilder.newBuilder()
793 * .maximumSize(1000)
794 * .expireAfterAccess(12, TimeUnit.HOUR)
795 * .removalListener(key -> action.call(key))
796 * .<K, Object> build().asMap();
797 * }
798 * </pre>
799 *
800 * @param <T>
801 * the type of the input observable
802 * @param <K>
803 * the key type
804 * @param <R>
805 * the element type
806 * @return an {@code Observable} that emits {@link GroupedObservable}s, each
807 * of which corresponds to a unique key value and each of which
808 * emits those items from the source Observable that share that key
809 * value
810 * @see <a href="http://reactivex.io/documentation/operators/groupby.html">
811 * ReactiveX operators documentation: GroupBy</a>
812 */
813 public static <T, K, R> Transformer<T, GroupedObservable<K, R>> groupByEvicting(
814 final Func1<? super T, ? extends K> keySelector,
815 final Func1<? super T, ? extends R> elementSelector,
816 final Func1<Action1<K>, Map<K, Object>> evictingMapFactory) {
817 return new Transformer<T, GroupedObservable<K, R>>() {
818
819 @Override
820 public Observable<GroupedObservable<K, R>> call(Observable<T> o) {
821 return o.groupBy(keySelector, elementSelector, evictingMapFactory);
822 }
823 };
824 }
825
826 /**
827 * If multiple concurrently open subscriptions happen to a source
828 * transformed by this method then an additional do-nothing subscription
829 * will be maintained to the source and will only be closed after the
830 * specified duration has passed from the final unsubscription of the open
831 * subscriptions. If another subscription happens during this wait period
832 * then the scheduled unsubscription will be cancelled.
833 *
834 * @param duration
835 * duration of period to leave at least one source subscription
836 * open
837 * @param unit
838 * units for duration
839 * @param <T>
840 * generic type of stream
841 * @return transformer
842 */
843 public static <T> Transformer<T, T> delayFinalUnsubscribe(long duration, TimeUnit unit) {
844 return delayFinalUnsubscribe(duration, unit, Schedulers.computation());
845 }
846
847 /**
848 * If multiple concurrently open subscriptions happen to a source
849 * transformed by this method then an additional do-nothing subscription
850 * will be maintained to the source and will only be closed after the
851 * specified duration has passed from the final unsubscription of the open
852 * subscriptions. If another subscription happens during this wait period
853 * then the scheduled unsubscription will be cancelled.
854 *
855 * @param duration
856 * duration of period to leave at least one source subscription
857 * open
858 * @param unit
859 * units for duration
860 * @param scheduler
861 * scheduler to use to schedule wait for unsubscribe
862 * @param <T>
863 * generic type of stream
864 * @return transformer
865 */
866 public static <T> Transformer<T, T> delayFinalUnsubscribe(long duration, TimeUnit unit,
867 Scheduler scheduler) {
868 return new TransformerDelayFinalUnsubscribe<T>(unit.toMillis(duration), scheduler);
869 }
870
871 /**
872 * Removes pairs non-recursively from a stream. Uses
873 * {@code Transformers.stateMachine()} under the covers to ensure items are
874 * emitted as soon as possible (if an item can't be in a pair then it is
875 * emitted straight away).
876 *
877 * @param isCandidateForFirst
878 * returns true if item is potentially the first of a pair that
879 * we want to remove
880 * @param remove
881 * returns true if a pair should be removed
882 * @param <T>
883 * generic type of stream being transformed
884 * @return transformed stream
885 */
886 public static <T> Transformer<T, T> removePairs(
887 final Func1<? super T, Boolean> isCandidateForFirst,
888 final Func2<? super T, ? super T, Boolean> remove) {
889 return new Transformer<T, T>() {
890
891 @Override
892 public Observable<T> call(Observable<T> o) {
893 return o.compose(Transformers. //
894 stateMachine() //
895 .initialState(Optional.<T>absent()) //
896 .transition(new Transition<Optional<T>, T, T>() {
897
898 @Override
899 public Optional<T> call(Optional<T> state, T value,
900 Subscriber<T> subscriber) {
901 if (!state.isPresent()) {
902 if (isCandidateForFirst.call(value)) {
903 return Optional.of(value);
904 } else {
905 subscriber.onNext(value);
906 return Optional.absent();
907 }
908 } else {
909 if (remove.call(state.get(), value)) {
910 // emit nothing and reset state
911 return Optional.absent();
912 } else {
913 subscriber.onNext(state.get());
914 if (isCandidateForFirst.call(value)) {
915 return Optional.of(value);
916 } else {
917 subscriber.onNext(value);
918 return Optional.absent();
919 }
920 }
921 }
922 }
923 }).completion(new Completion<Optional<T>, T>() {
924
925 @Override
926 public Boolean call(Optional<T> state, Subscriber<T> subscriber) {
927 if (state.isPresent())
928 subscriber.onNext(state.get());
929 // yes, complete
930 return true;
931 }
932 }).build());
933 }
934 };
935 }
936
937 /**
938 * Rather than requesting {@code Long.MAX_VALUE} of upstream as does
939 * `Observable.onBackpressureBuffer`, this variant only requests of upstream
940 * what is requested of it. Thus an operator can be written that
941 * overproduces.
942 *
943 * @param <T>
944 * the value type
945 * @return transformer that buffers on backpressure but only requests of
946 * upstream what is requested of it
947 */
948 public static <T> Transformer<T, T> onBackpressureBufferRequestLimiting() {
949 return TransformerOnBackpressureBufferRequestLimiting.instance();
950 }
951
952 /**
953 * Buffers the elements into continuous, non-overlapping Lists where the
954 * boundary is determined by a predicate receiving each item, after being
955 * buffered, and returns true to indicate a new buffer should start.
956 *
957 * <p>
958 * The operator won't return an empty first or last buffer.
959 *
960 * <dl>
961 * <dt><b>Backpressure Support:</b></dt>
962 * <dd>This operator supports backpressure.</dd>
963 * <dt><b>Scheduler:</b></dt>
964 * <dd>This operator does not operate by default on a particular
965 * {@link Scheduler}.</dd>
966 * </dl>
967 *
968 * @param <T>
969 * the input value type
970 * @param predicate
971 * the Func1 that receives each item, after being buffered, and
972 * should return true to indicate a new buffer has to start.
973 * @return the new Observable instance
974 * @see #bufferWhile(Func1)
975 * @since (if this graduates from Experimental/Beta to supported, replace
976 * this parenthetical with the release number)
977 */
978 public static final <T> Transformer<T, List<T>> bufferUntil(
979 Func1<? super T, Boolean> predicate) {
980 return bufferUntil(predicate, 10);
981 }
982
983 /**
984 * Buffers the elements into continuous, non-overlapping Lists where the
985 * boundary is determined by a predicate receiving each item, after being
986 * buffered, and returns true to indicate a new buffer should start.
987 *
988 * <p>
989 * The operator won't return an empty first or last buffer.
990 *
991 * <dl>
992 * <dt><b>Backpressure Support:</b></dt>
993 * <dd>This operator supports backpressure.</dd>
994 * <dt><b>Scheduler:</b></dt>
995 * <dd>This operator does not operate by default on a particular
996 * {@link Scheduler}.</dd>
997 * </dl>
998 *
999 * @param <T>
1000 * the input value type
1001 * @param predicate
1002 * the Func1 that receives each item, after being buffered, and
1003 * should return true to indicate a new buffer has to start.
1004 * @return the new Observable instance
1005 * @see #bufferWhile(Func1)
1006 * @since (if this graduates from Experimental/Beta to supported, replace
1007 * this parenthetical with the release number)
1008 */
1009 public static final <T> Transformer<T, List<T>> toListUntil(
1010 Func1<? super T, Boolean> predicate) {
1011 return bufferUntil(predicate);
1012 }
1013
1014 /**
1015 * Buffers the elements into continuous, non-overlapping Lists where the
1016 * boundary is determined by a predicate receiving each item, after being
1017 * buffered, and returns true to indicate a new buffer should start.
1018 *
1019 * <p>
1020 * The operator won't return an empty first or last buffer.
1021 *
1022 * <dl>
1023 * <dt><b>Backpressure Support:</b></dt>
1024 * <dd>This operator supports backpressure.</dd>
1025 * <dt><b>Scheduler:</b></dt>
1026 * <dd>This operator does not operate by default on a particular
1027 * {@link Scheduler}.</dd>
1028 * </dl>
1029 *
1030 * @param <T>
1031 * the input value type
1032 * @param predicate
1033 * the Func1 that receives each item, after being buffered, and
1034 * should return true to indicate a new buffer has to start.
1035 * @param capacityHint
1036 * the expected number of items in each buffer
1037 * @return the new Observable instance
1038 * @see #bufferWhile(Func1)
1039 * @since (if this graduates from Experimental/Beta to supported, replace
1040 * this parenthetical with the release number)
1041 */
1042 public static final <T> Transformer<T, List<T>> bufferUntil(Func1<? super T, Boolean> predicate,
1043 int capacityHint) {
1044 return new OperatorBufferPredicateBoundary<T>(predicate, RxRingBuffer.SIZE, capacityHint,
1045 true);
1046 }
1047
1048 /**
1049 * Buffers the elements into continuous, non-overlapping Lists where the
1050 * boundary is determined by a predicate receiving each item, after being
1051 * buffered, and returns true to indicate a new buffer should start.
1052 *
1053 * <p>
1054 * The operator won't return an empty first or last buffer.
1055 *
1056 * <dl>
1057 * <dt><b>Backpressure Support:</b></dt>
1058 * <dd>This operator supports backpressure.</dd>
1059 * <dt><b>Scheduler:</b></dt>
1060 * <dd>This operator does not operate by default on a particular
1061 * {@link Scheduler}.</dd>
1062 * </dl>
1063 *
1064 * @param <T>
1065 * the input value type
1066 * @param predicate
1067 * the Func1 that receives each item, after being buffered, and
1068 * should return true to indicate a new buffer has to start.
1069 * @param capacityHint
1070 * the expected number of items in each buffer
1071 * @return the new Observable instance
1072 * @see #bufferWhile(Func1)
1073 * @since (if this graduates from Experimental/Beta to supported, replace
1074 * this parenthetical with the release number)
1075 */
1076 public static final <T> Transformer<T, List<T>> toListUntil(Func1<? super T, Boolean> predicate,
1077 int capacityHint) {
1078 return bufferUntil(predicate, capacityHint);
1079 }
1080
1081 /**
1082 * Buffers the elements into continuous, non-overlapping Lists where the
1083 * boundary is determined by a predicate receiving each item, before or
1084 * after being buffered, and returns true to indicate a new buffer should
1085 * start.
1086 *
1087 * <p>
1088 * The operator won't return an empty first or last buffer.
1089 *
1090 * <dl>
1091 * <dt><b>Backpressure Support:</b></dt>
1092 * <dd>This operator supports backpressure.</dd>
1093 * <dt><b>Scheduler:</b></dt>
1094 * <dd>This operator does not operate by default on a particular
1095 * {@link Scheduler}.</dd>
1096 * </dl>
1097 *
1098 * @param <T>
1099 * the input value type
1100 * @param predicate
1101 * the Func1 that receives each item, before being buffered, and
1102 * should return true to indicate a new buffer has to start.
1103 * @return the new Observable instance
1104 * @see #bufferWhile(Func1)
1105 * @since (if this graduates from Experimental/Beta to supported, replace
1106 * this parenthetical with the release number)
1107 */
1108 public static final <T> Transformer<T, List<T>> bufferWhile(
1109 Func1<? super T, Boolean> predicate) {
1110 return bufferWhile(predicate, 10);
1111 }
1112
1113 /**
1114 * Buffers the elements into continuous, non-overlapping Lists where the
1115 * boundary is determined by a predicate receiving each item, before or
1116 * after being buffered, and returns true to indicate a new buffer should
1117 * start.
1118 *
1119 * <p>
1120 * The operator won't return an empty first or last buffer.
1121 *
1122 * <dl>
1123 * <dt><b>Backpressure Support:</b></dt>
1124 * <dd>This operator supports backpressure.</dd>
1125 * <dt><b>Scheduler:</b></dt>
1126 * <dd>This operator does not operate by default on a particular
1127 * {@link Scheduler}.</dd>
1128 * </dl>
1129 *
1130 * @param <T>
1131 * the input value type
1132 * @param predicate
1133 * the Func1 that receives each item, before being buffered, and
1134 * should return true to indicate a new buffer has to start.
1135 * @return the new Observable instance
1136 * @see #bufferWhile(Func1)
1137 * @since (if this graduates from Experimental/Beta to supported, replace
1138 * this parenthetical with the release number)
1139 */
1140 public static final <T> Transformer<T, List<T>> toListWhile(
1141 Func1<? super T, Boolean> predicate) {
1142 return bufferWhile(predicate);
1143 }
1144
1145 /**
1146 * Buffers the elements into continuous, non-overlapping Lists where the
1147 * boundary is determined by a predicate receiving each item, before being
1148 * buffered, and returns true to indicate a new buffer should start.
1149 *
1150 * <p>
1151 * The operator won't return an empty first or last buffer.
1152 *
1153 * <dl>
1154 * <dt><b>Backpressure Support:</b></dt>
1155 * <dd>This operator supports backpressure.</dd>
1156 * <dt><b>Scheduler:</b></dt>
1157 * <dd>This operator does not operate by default on a particular
1158 * {@link Scheduler}.</dd>
1159 * </dl>
1160 *
1161 * @param <T>
1162 * the input value type
1163 * @param predicate
1164 * the Func1 that receives each item, before being buffered, and
1165 * should return true to indicate a new buffer has to start.
1166 * @param capacityHint
1167 * the expected number of items in each buffer
1168 * @return the new Observable instance
1169 * @see #bufferWhile(Func1)
1170 * @since (if this graduates from Experimental/Beta to supported, replace
1171 * this parenthetical with the release number)
1172 */
1173 public static final <T> Transformer<T, List<T>> bufferWhile(Func1<? super T, Boolean> predicate,
1174 int capacityHint) {
1175 return new OperatorBufferPredicateBoundary<T>(predicate, RxRingBuffer.SIZE, capacityHint,
1176 false);
1177 }
1178
1179 /**
1180 * Buffers the elements into continuous, non-overlapping Lists where the
1181 * boundary is determined by a predicate receiving each item, before being
1182 * buffered, and returns true to indicate a new buffer should start.
1183 *
1184 * <p>
1185 * The operator won't return an empty first or last buffer.
1186 *
1187 * <dl>
1188 * <dt><b>Backpressure Support:</b></dt>
1189 * <dd>This operator supports backpressure.</dd>
1190 * <dt><b>Scheduler:</b></dt>
1191 * <dd>This operator does not operate by default on a particular
1192 * {@link Scheduler}.</dd>
1193 * </dl>
1194 *
1195 * @param <T>
1196 * the input value type
1197 * @param predicate
1198 * the Func1 that receives each item, before being buffered, and
1199 * should return true to indicate a new buffer has to start.
1200 * @param capacityHint
1201 * the expected number of items in each buffer
1202 * @return the new Observable instance
1203 * @see #bufferWhile(Func1)
1204 * @since (if this graduates from Experimental/Beta to supported, replace
1205 * this parenthetical with the release number)
1206 */
1207 public static final <T> Transformer<T, List<T>> toListWhile(Func1<? super T, Boolean> predicate,
1208 int capacityHint) {
1209 return bufferWhile(predicate, capacityHint);
1210 }
1211
1212 public static final <T> Transformer<T, T> delay(final Func1<? super T, Long> time,
1213 final Func0<Double> playRate, final long startTime, final Scheduler scheduler) {
1214 return new Transformer<T, T>() {
1215
1216 @Override
1217 public Observable<T> call(final Observable<T> o) {
1218 return Observable.defer(new Func0<Observable<T>>() {
1219 long startActual = scheduler.now();
1220
1221 @Override
1222 public Observable<T> call() {
1223 return o.concatMap(new Func1<T, Observable<T>>() {
1224
1225 @Override
1226 public Observable<T> call(T t) {
1227 return Observable.just(t) //
1228 .delay(delay(startActual, startTime, time.call(t), playRate,
1229 scheduler.now()), TimeUnit.MILLISECONDS, scheduler);
1230 }
1231
1232 });
1233 }
1234 });
1235 }
1236 };
1237
1238 }
1239
1240 private static long delay(long startActual, long startTime, long emissionTimestamp,
1241 Func0<Double> playRate, long now) {
1242 long elapsedActual = now - startActual;
1243 return Math.max(0,
1244 Math.round((emissionTimestamp - startTime) / playRate.call() - elapsedActual));
1245 }
1246
1247 /**
1248 * <p>
1249 * Modifies the source Observable so that it invokes an action when it calls
1250 * {@code onCompleted} and no items were emitted.
1251 * <dl>
1252 * <dt><b>Scheduler:</b></dt>
1253 * <dd>{@code doOnEmpty} does not operate by default on a particular
1254 * {@link Scheduler}.</dd>
1255 * </dl>
1256 *
1257 * @param onEmpty
1258 * the action to invoke when the source Observable calls
1259 * {@code onCompleted}, contingent on no items were emitted
1260 * @param <T>
1261 * generic type of observable being transformed
1262 * @return the source Observable with the side-effecting behavior applied
1263 */
1264 public static final <T> Transformer<T, T> doOnEmpty(final Action0 onEmpty) {
1265 return new Transformer<T, T>() {
1266
1267 @Override
1268 public Observable<T> call(Observable<T> o) {
1269 return Observable.create(new OnSubscribeDoOnEmpty<T>(o, onEmpty));
1270 }
1271 };
1272 }
1273
1274 public static final <T> Transformer<T, T> onTerminateResume(
1275 final Func1<Throwable, Observable<T>> onError, final Observable<T> onCompleted) {
1276 return new TransformerOnTerminateResume<T>(onError, onCompleted);
1277 }
1278
1279 public static final <T> Transformer<T, T> repeatLast() {
1280 return new Transformer<T, T>() {
1281
1282 @Override
1283 public Observable<T> call(Observable<T> o) {
1284 return o.materialize().buffer(2, 1)
1285 .flatMap(new Func1<List<Notification<T>>, Observable<T>>() {
1286 @Override
1287 public Observable<T> call(List<Notification<T>> list) {
1288 Notification<T> a = list.get(0);
1289 if (list.size() == 2 && list.get(1).isOnCompleted()) {
1290 return Observable.just(a.getValue()).repeat();
1291 } else if (a.isOnError()) {
1292 return Observable.error(list.get(0).getThrowable());
1293 } else if (a.isOnCompleted()) {
1294 return Observable.empty();
1295 } else {
1296 return Observable.just(a.getValue());
1297 }
1298 }
1299 });
1300 }
1301 };
1302 }
1303
1304 public static <T> Transformer<T, T> mapLast(final Func1<? super T, ? extends T> function) {
1305
1306 return new Transformer<T, T>() {
1307
1308 @Override
1309 public Observable<T> call(Observable<T> source) {
1310 return Observable.create(new OnSubscribeMapLast<T>(source, function));
1311 }
1312 };
1313 }
1314 }