View Javadoc
1   package com.github.davidmoten.rx;
2   
3   import java.nio.charset.CharsetDecoder;
4   import java.util.ArrayList;
5   import java.util.Arrays;
6   import java.util.Collection;
7   import java.util.Comparator;
8   import java.util.HashSet;
9   import java.util.List;
10  import java.util.Map;
11  import java.util.Set;
12  import java.util.concurrent.TimeUnit;
13  import java.util.concurrent.atomic.AtomicInteger;
14  import java.util.regex.Pattern;
15  
16  import com.github.davidmoten.rx.StateMachine.Completion;
17  import com.github.davidmoten.rx.StateMachine.Transition;
18  import com.github.davidmoten.rx.buffertofile.DataSerializer;
19  import com.github.davidmoten.rx.buffertofile.DataSerializers;
20  import com.github.davidmoten.rx.buffertofile.Options;
21  import com.github.davidmoten.rx.internal.operators.OnSubscribeDoOnEmpty;
22  import com.github.davidmoten.rx.internal.operators.OnSubscribeMapLast;
23  import com.github.davidmoten.rx.internal.operators.OperatorBufferPredicateBoundary;
24  import com.github.davidmoten.rx.internal.operators.OperatorBufferToFile;
25  import com.github.davidmoten.rx.internal.operators.OperatorDoOnNth;
26  import com.github.davidmoten.rx.internal.operators.OperatorFromTransformer;
27  import com.github.davidmoten.rx.internal.operators.TransformerOnTerminateResume;
28  import com.github.davidmoten.rx.internal.operators.OperatorSampleFirst;
29  import com.github.davidmoten.rx.internal.operators.OperatorWindowMinMax;
30  import com.github.davidmoten.rx.internal.operators.OperatorWindowMinMax.Metric;
31  import com.github.davidmoten.rx.internal.operators.OrderedMerge;
32  import com.github.davidmoten.rx.internal.operators.TransformerDecode;
33  import com.github.davidmoten.rx.internal.operators.TransformerDelayFinalUnsubscribe;
34  import com.github.davidmoten.rx.internal.operators.TransformerLimitSubscribers;
35  import com.github.davidmoten.rx.internal.operators.TransformerOnBackpressureBufferRequestLimiting;
36  import com.github.davidmoten.rx.internal.operators.TransformerStateMachine;
37  import com.github.davidmoten.rx.internal.operators.TransformerStringSplit;
38  import com.github.davidmoten.rx.util.BackpressureStrategy;
39  import com.github.davidmoten.rx.util.MapWithIndex;
40  import com.github.davidmoten.rx.util.MapWithIndex.Indexed;
41  import com.github.davidmoten.rx.util.Pair;
42  import com.github.davidmoten.util.Optional;
43  
44  import rx.Notification;
45  import rx.Observable;
46  import rx.Observable.Operator;
47  import rx.Observable.Transformer;
48  import rx.Observer;
49  import rx.Scheduler;
50  import rx.Scheduler.Worker;
51  import rx.Subscriber;
52  import rx.functions.Action0;
53  import rx.functions.Action1;
54  import rx.functions.Action2;
55  import rx.functions.Func0;
56  import rx.functions.Func1;
57  import rx.functions.Func2;
58  import rx.functions.Func3;
59  import rx.internal.util.RxRingBuffer;
60  import rx.observables.GroupedObservable;
61  import rx.schedulers.Schedulers;
62  
63  public final class Transformers {
64  
65      static final int DEFAULT_INITIAL_BATCH = 1;
66  
67      public static <T, R> Operator<R, T> toOperator(
68              Func1<? super Observable<T>, ? extends Observable<R>> function) {
69          return OperatorFromTransformer.toOperator(function);
70      }
71  
72      public static <T extends Number> Transformer<T, Statistics> collectStats() {
73          return new Transformer<T, Statistics>() {
74  
75              @Override
76              public Observable<Statistics> call(Observable<T> o) {
77                  return o.scan(Statistics.create(), Functions.collectStats());
78              }
79          };
80      }
81  
82      public static <T, R extends Number> Transformer<T, Pair<T, Statistics>> collectStats(
83              final Func1<? super T, ? extends R> function) {
84          return new Transformer<T, Pair<T, Statistics>>() {
85  
86              @Override
87              public Observable<Pair<T, Statistics>> call(Observable<T> source) {
88                  return source.scan(Pair.create((T) null, Statistics.create()),
89                          new Func2<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
90                              @Override
91                              public Pair<T, Statistics> call(Pair<T, Statistics> pair, T t) {
92                                  return Pair.create(t, pair.b().add(function.call(t)));
93                              }
94                          }).skip(1);
95              }
96          };
97      }
98  
99      public static <T extends Comparable<? super T>> Transformer<T, T> sort() {
100         return new Transformer<T, T>() {
101 
102             @Override
103             public Observable<T> call(Observable<T> o) {
104                 return o.toSortedList().flatMapIterable(Functions.<List<T>>identity());
105             }
106         };
107     }
108 
109     public static <T> Transformer<T, T> sort(final Comparator<? super T> comparator) {
110         return new Transformer<T, T>() {
111 
112             @Override
113             public Observable<T> call(Observable<T> o) {
114                 return o.toSortedList(Functions.toFunc2(comparator))
115                         .flatMapIterable(Functions.<List<T>>identity());
116             }
117         };
118     }
119 
120     public static <T> Transformer<T, Set<T>> toSet() {
121         return new Transformer<T, Set<T>>() {
122 
123             @Override
124             public Observable<Set<T>> call(Observable<T> o) {
125                 return o.collect(new Func0<Set<T>>() {
126 
127                     @Override
128                     public Set<T> call() {
129                         return new HashSet<T>();
130                     }
131                 }, new Action2<Set<T>, T>() {
132 
133                     @Override
134                     public void call(Set<T> set, T t) {
135                         set.add(t);
136                     }
137                 });
138             }
139         };
140     }
141 
142     /**
143      * <p>
144      * Returns a {@link Transformer} that wraps stream emissions with their
145      * corresponding zero based index numbers (0,1,2,3,..) in instances of
146      * {@link Indexed}.
147      * <p>
148      * Example usage:
149      * 
150      * <pre>
151      * 
152      *  {@code
153      *    Observable
154      *      .just("a","b","c)
155      *      .mapWithIndex(Transformers.mapWithIndex())
156      *      .map(x -> x.index() + "->" + x.value())
157      *      .forEach(System.out::println);
158      *  }
159      * </pre>
160      * 
161      * <p>
162      * <img src=
163      * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/mapWithIndex.png?raw=true"
164      * alt="marble diagram">
165      * 
166      * @param <T>
167      *            generic type of the stream being supplemented with an index
168      * @return transformer that supplements each stream emission with its index
169      *         (zero-based position) in the stream.
170      */
171     public static <T> Transformer<T, Indexed<T>> mapWithIndex() {
172         return MapWithIndex.instance();
173     }
174 
175     /**
176      * <p>
177      * Returns a {@link Transformer} that allows processing of the source stream
178      * to be defined in a state machine where transitions of the state machine
179      * may also emit items to downstream that are buffered if necessary when
180      * backpressure is requested. <code>flatMap</code> is part of the processing
181      * chain so the source may experience requests for more items than are
182      * strictly required by the endpoint subscriber.
183      * 
184      * <p>
185      * <img src=
186      * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/stateMachine.png?raw=true"
187      * alt="marble diagram">
188      * 
189      * @param initialStateFactory
190      *            the factory to create the initial state of the state machine.
191      * @param transition
192      *            defines state transitions and consequent emissions to
193      *            downstream when an item arrives from upstream. The
194      *            {@link Subscriber} is called with the emissions to downstream.
195      *            You can optionally call {@link Subscriber#isUnsubscribed()} to
196      *            check if you can stop emitting from the transition. If you do
197      *            wish to terminate the Observable then call
198      *            {@link Subscriber#unsubscribe()} and return anything (say
199      *            {@code null} from the transition (as the next state which will
200      *            not be used). You can also complete the Observable by calling
201      *            {@link Subscriber#onCompleted} or {@link Subscriber#onError}
202      *            from within the transition and return anything from the
203      *            transition (will not be used). The transition should run
204      *            synchronously so that completion of a call to the transition
205      *            should also signify all emissions from that transition have
206      *            been made.
207      * @param completion
208      *            defines activity that should happen based on the final state
209      *            just before downstream <code>onCompleted()</code> is called.
210      *            For example any buffered emissions in state could be emitted
211      *            at this point. Don't call <code>observer.onCompleted()</code>
212      *            as it is called for you after the action completes if and only
213      *            if you return true from this function.
214      * @param backpressureStrategy
215      *            is applied to the emissions from one call of transition and
216      *            should enforce backpressure.
217      * @param <State>
218      *            the class representing the state of the state machine
219      * @param <In>
220      *            the input observable type
221      * @param <Out>
222      *            the output observable type
223      * @throws NullPointerException
224      *             if {@code initialStateFactory} or {@code transition},or
225      *             {@code completionAction} is null
226      * @return a backpressure supporting transformer that implements the state
227      *         machine specified by the parameters
228      */
229     public static <State, In, Out> Transformer<In, Out> stateMachine(
230             Func0<State> initialStateFactory,
231             Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition,
232             Func2<? super State, ? super Subscriber<Out>, Boolean> completion,
233             BackpressureStrategy backpressureStrategy) {
234         return TransformerStateMachine.<State, In, Out>create(initialStateFactory, transition,
235                 completion, backpressureStrategy, DEFAULT_INITIAL_BATCH);
236     }
237 
238     public static <State, In, Out> Transformer<In, Out> stateMachine(
239             Func0<State> initialStateFactory,
240             Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition,
241             Func2<? super State, ? super Subscriber<Out>, Boolean> completion,
242             BackpressureStrategy backpressureStrategy, int initialRequest) {
243         return TransformerStateMachine.<State, In, Out>create(initialStateFactory, transition,
244                 completion, backpressureStrategy, initialRequest);
245     }
246 
247     /**
248      * <p>
249      * Returns a {@link Transformer} that allows processing of the source stream
250      * to be defined in a state machine where transitions of the state machine
251      * may also emit items to downstream that are buffered if necessary when
252      * backpressure is requested. <code>flatMap</code> is part of the processing
253      * chain so the source may experience requests for more items than are
254      * strictly required by the endpoint subscriber. The backpressure strategy
255      * used for emissions from the transition into the flatMap is
256      * {@link BackpressureStrategy#BUFFER} which corresponds to
257      * {@link Observable#onBackpressureBuffer}.
258      * 
259      * <p>
260      * <img src=
261      * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/stateMachine.png?raw=true"
262      * alt="marble diagram">
263      * 
264      * @param initialStateFactory
265      *            the factory to create the initial state of the state machine.
266      * @param transition
267      *            defines state transitions and consequent emissions to
268      *            downstream when an item arrives from upstream. The
269      *            {@link Subscriber} is called with the emissions to downstream.
270      *            You can optionally call {@link Subscriber#isUnsubscribed()} to
271      *            check if you can stop emitting from the transition. If you do
272      *            wish to terminate the Observable then call
273      *            {@link Subscriber#unsubscribe()} and return anything (say
274      *            {@code null} from the transition (as the next state which will
275      *            not be used). You can also complete the Observable by calling
276      *            {@link Subscriber#onCompleted} or {@link Subscriber#onError}
277      *            from within the transition and return anything from the
278      *            transition (will not be used). The transition should run
279      *            synchronously so that completion of a call to the transition
280      *            should also signify all emissions from that transition have
281      *            been made.
282      * @param completion
283      *            defines activity that should happen based on the final state
284      *            just before downstream <code>onCompleted()</code> is called.
285      *            For example any buffered emissions in state could be emitted
286      *            at this point. Don't call <code>observer.onCompleted()</code>
287      *            as it is called for you after the action completes if and only
288      *            if you return true from this function.
289      * @param <State>
290      *            the class representing the state of the state machine
291      * @param <In>
292      *            the input observable type
293      * @param <Out>
294      *            the output observable type
295      * @throws NullPointerException
296      *             if {@code initialStateFactory} or {@code transition},or
297      *             {@code completion} is null
298      * @return a backpressure supporting transformer that implements the state
299      *         machine specified by the parameters
300      */
301     public static <State, In, Out> Transformer<In, Out> stateMachine(
302             Func0<? extends State> initialStateFactory,
303             Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition,
304             Func2<? super State, ? super Subscriber<Out>, Boolean> completion) {
305         return TransformerStateMachine.<State, In, Out>create(initialStateFactory, transition,
306                 completion, BackpressureStrategy.BUFFER, DEFAULT_INITIAL_BATCH);
307     }
308 
309     public static StateMachine.Builder stateMachine() {
310         return StateMachine.builder();
311     }
312 
313     /**
314      * <p>
315      * Returns the source {@link Observable} merged with the <code>other</code>
316      * observable using the given {@link Comparator} for order. A precondition
317      * is that the source and other are already ordered. This transformer
318      * supports backpressure and its inputs must also support backpressure.
319      * 
320      * <p>
321      * <img src=
322      * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/orderedMerge.png?raw=true"
323      * alt="marble diagram">
324      * 
325      * @param other
326      *            the other already ordered observable
327      * @param comparator
328      *            the ordering to use
329      * @param <T>
330      *            the generic type of the objects being compared
331      * @return merged and ordered observable
332      */
333     public static final <T> Transformer<T, T> orderedMergeWith(final Observable<T> other,
334             final Comparator<? super T> comparator) {
335         @SuppressWarnings("unchecked")
336         Collection<Observable<T>> collection = Arrays.asList(other);
337         return orderedMergeWith(collection, comparator);
338     }
339 
340     /**
341      * <p>
342      * Returns the source {@link Observable} merged with all of the other
343      * observables using the given {@link Comparator} for order. A precondition
344      * is that the source and other are already ordered. This transformer
345      * supports backpressure and its inputs must also support backpressure.
346      * 
347      * <p>
348      * <img src=
349      * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/orderedMerge.png?raw=true"
350      * alt="marble diagram">
351      * 
352      * @param others
353      *            a collection of already ordered observables to merge with
354      * @param comparator
355      *            the ordering to use
356      * @param <T>
357      *            the generic type of the objects being compared
358      * @return merged and ordered observable
359      */
360     public static final <T> Transformer<T, T> orderedMergeWith(
361             final Collection<Observable<T>> others, final Comparator<? super T> comparator) {
362         return new Transformer<T, T>() {
363 
364             @Override
365             public Observable<T> call(Observable<T> source) {
366                 List<Observable<T>> collection = new ArrayList<Observable<T>>();
367                 collection.add(source);
368                 collection.addAll(others);
369                 return OrderedMerge.<T>create(collection, comparator, false);
370             }
371         };
372     }
373 
374     /**
375      * Returns a {@link Transformer} that returns an {@link Observable} that is
376      * a buffering of the source Observable into lists of sequential items that
377      * are equal.
378      * 
379      * <p>
380      * For example, the stream
381      * {@code Observable.just(1, 1, 2, 2, 1).compose(toListUntilChanged())}
382      * would emit {@code [1,1], [2], [1]}.
383      * 
384      * @param <T>
385      *            the generic type of the source Observable
386      * @return transformer as above
387      */
388     public static <T> Transformer<T, List<T>> toListUntilChanged() {
389         Func2<Collection<T>, T, Boolean> equal = HolderEquals.instance();
390         return toListWhile(equal);
391     }
392 
393     private static class HolderEquals {
394         private static final Func2<Collection<Object>, Object, Boolean> INSTANCE = new Func2<Collection<Object>, Object, Boolean>() {
395             @Override
396             public Boolean call(Collection<Object> list, Object t) {
397                 return list.isEmpty() || list.iterator().next().equals(t);
398             }
399         };
400 
401         @SuppressWarnings("unchecked")
402         static <T> Func2<Collection<T>, T, Boolean> instance() {
403             return (Func2<Collection<T>, T, Boolean>) (Func2<?, ?, Boolean>) INSTANCE;
404         }
405     }
406 
407     /**
408      * <p>
409      * Returns a {@link Transformer} that returns an {@link Observable} that is
410      * a buffering of the source Observable into lists of sequential items that
411      * satisfy the condition {@code condition}.
412      * 
413      * <p>
414      * <img src=
415      * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/toListWhile.png?raw=true"
416      * alt="marble diagram">
417      * 
418      * @param condition
419      *            condition function that must return true if an item is to be
420      *            part of the list being prepared for emission
421      * @param <T>
422      *            the generic type of the source Observable
423      * @return transformer as above
424      */
425     public static <T> Transformer<T, List<T>> toListWhile(
426             final Func2<? super List<T>, ? super T, Boolean> condition) {
427 
428         Func0<List<T>> initialState = new Func0<List<T>>() {
429             @Override
430             public List<T> call() {
431                 return new ArrayList<T>();
432             }
433         };
434 
435         Action2<List<T>, T> collect = new Action2<List<T>, T>() {
436 
437             @Override
438             public void call(List<T> list, T n) {
439                 list.add(n);
440             }
441         };
442         return collectWhile(initialState, collect, condition);
443     }
444 
445     /**
446      * <p>
447      * Returns a {@link Transformer} that returns an {@link Observable} that is
448      * collected into {@code Collection} instances created by {@code factory}
449      * that are emitted when the collection and latest emission do not satisfy
450      * {@code condition} or on completion.
451      * 
452      * <p>
453      * <img src=
454      * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/collectWhile.png?raw=true"
455      * alt="marble diagram">
456      * 
457      * @param factory
458      *            collection instance creator
459      * @param collect
460      *            collection action
461      * @param condition
462      *            returns true if and only if emission should be collected in
463      *            current collection being prepared for emission
464      * @param isEmpty
465      *            indicates that the collection is empty
466      * @param <T>
467      *            generic type of source observable
468      * @param <R>
469      *            collection type emitted by transformed Observable
470      * @return transformer as above
471      */
472     public static <T, R> Transformer<T, R> collectWhile(final Func0<R> factory,
473             final Action2<? super R, ? super T> collect,
474             final Func2<? super R, ? super T, Boolean> condition,
475             final Func1<? super R, Boolean> isEmpty) {
476         Func3<R, T, Observer<R>, R> transition = new Func3<R, T, Observer<R>, R>() {
477 
478             @Override
479             public R call(R collection, T t, Observer<R> observer) {
480 
481                 if (condition.call(collection, t)) {
482                     collect.call(collection, t);
483                     return collection;
484                 } else {
485                     observer.onNext(collection);
486                     R r = factory.call();
487                     collect.call(r, t);
488                     return r;
489                 }
490             }
491 
492         };
493         Func2<R, Observer<R>, Boolean> completionAction = new Func2<R, Observer<R>, Boolean>() {
494             @Override
495             public Boolean call(R collection, Observer<R> observer) {
496                 if (!isEmpty.call(collection)) {
497                     observer.onNext(collection);
498                 }
499                 return true;
500             }
501         };
502         return Transformers.stateMachine(factory, transition, completionAction);
503     }
504 
505     /**
506      * <p>
507      * Returns a {@link Transformer} that returns an {@link Observable} that is
508      * collected into {@code Collection} instances created by {@code factory}
509      * that are emitted when items are not equal or on completion.
510      * 
511      * <p>
512      * <img src=
513      * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/collectWhile.png?raw=true"
514      * alt="marble diagram">
515      * 
516      * @param factory
517      *            collection instance creator
518      * @param collect
519      *            collection action
520      * @param <T>
521      *            generic type of source observable
522      * @param <R>
523      *            collection type emitted by transformed Observable
524      * @return transformer as above
525      */
526     public static <T, R extends Collection<T>> Transformer<T, R> collectWhile(
527             final Func0<R> factory, final Action2<? super R, ? super T> collect) {
528         return collectWhile(factory, collect, HolderEquals.<T>instance());
529     }
530 
531     public static <T, R extends Iterable<?>> Transformer<T, R> collectWhile(final Func0<R> factory,
532             final Action2<? super R, ? super T> collect,
533             final Func2<? super R, ? super T, Boolean> condition) {
534         Func1<R, Boolean> isEmpty = new Func1<R, Boolean>() {
535             @Override
536             public Boolean call(R collection) {
537                 return !collection.iterator().hasNext();
538             }
539         };
540         return collectWhile(factory, collect, condition, isEmpty);
541     }
542 
543     /**
544      * Returns a {@link Transformer} that applied to a source {@link Observable}
545      * calls the given action on the {@code n}th onNext emission.
546      * 
547      * @param n
548      *            the 1-based count of onNext to do the action on
549      * @param action
550      *            is performed on {@code n}th onNext.
551      * @param <T>
552      *            the generic type of the Observable being transformed
553      * @return Transformer that applied to a source Observable calls the given
554      *         action on the nth onNext emission.
555      */
556     public static <T> Transformer<T, T> doOnNext(final int n, final Action1<? super T> action) {
557         return new Transformer<T, T>() {
558             @Override
559             public Observable<T> call(Observable<T> o) {
560                 return o.lift(OperatorDoOnNth.create(action, n));
561             }
562         };
563     }
564 
565     /**
566      * Returns a {@link Transformer} that applied to a source {@link Observable}
567      * calls the given action on the first onNext emission.
568      * 
569      * @param action
570      *            is performed on first onNext
571      * @param <T>
572      *            the generic type of the Observable being transformed
573      * @return Transformer that applied to a source Observable calls the given
574      *         action on the first onNext emission.
575      */
576     public static <T> Transformer<T, T> doOnFirst(final Action1<? super T> action) {
577         return doOnNext(1, action);
578     }
579 
580     /**
581      * <p>
582      * Returns an observable that subscribes to {@code this} and wait for
583      * completion but doesn't emit any items and once completes emits the
584      * {@code next} observable.
585      * 
586      * <p>
587      * <img src=
588      * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/ignoreElementsThen.png?raw=true"
589      * alt="marble diagram">
590      * 
591      * @param <R>
592      *            input observable type
593      * @param <T>
594      *            output observable type
595      * @param next
596      *            observable to be emitted after ignoring elements of
597      *            {@code this}
598      * @return Transformer that applied to a source Observable ignores the
599      *         elements of the source and emits the elements of a second
600      *         observable
601      */
602     public static <R, T> Transformer<T, R> ignoreElementsThen(final Observable<R> next) {
603         return new Transformer<T, R>() {
604 
605             @SuppressWarnings("unchecked")
606             @Override
607             public Observable<R> call(Observable<T> source) {
608                 return ((Observable<R>) (Observable<?>) source.ignoreElements()).concatWith(next);
609             }
610         };
611     }
612 
613     public static <T> Transformer<String, String> split(String pattern) {
614         return TransformerStringSplit.split(pattern, null);
615     }
616 
617     public static <T> Transformer<String, String> split(Pattern pattern) {
618         return TransformerStringSplit.split(null, pattern);
619     }
620 
621     /**
622      * <p>
623      * Decodes a stream of multibyte chunks into a stream of strings that works
624      * on infinite streams and handles when a multibyte character spans two
625      * chunks. This method allows for more control over how malformed and
626      * unmappable characters are handled.
627      * <p>
628      * <img width="640" src=
629      * "https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.decode.png"
630      * alt="">
631      * 
632      * @param charsetDecoder
633      *            decodes the bytes into strings
634      * @return the Observable returning a stream of decoded strings
635      */
636     public static Transformer<byte[], String> decode(final CharsetDecoder charsetDecoder) {
637         return TransformerDecode.decode(charsetDecoder);
638     }
639 
640     public static <T> Transformer<T, T> limitSubscribers(AtomicInteger subscriberCount,
641             int maxSubscribers) {
642         return new TransformerLimitSubscribers<T>(subscriberCount, maxSubscribers);
643     }
644 
645     public static <T> Transformer<T, T> limitSubscribers(int maxSubscribers) {
646         return new TransformerLimitSubscribers<T>(new AtomicInteger(), maxSubscribers);
647     }
648 
649     public static <T> Transformer<T, T> cache(final long duration, final TimeUnit unit,
650             final Worker worker) {
651         return new Transformer<T, T>() {
652             @Override
653             public Observable<T> call(Observable<T> o) {
654                 return Obs.cache(o, duration, unit, worker);
655             }
656         };
657     }
658 
659     public static <T> Transformer<T, T> sampleFirst(final long duration, final TimeUnit unit) {
660         return sampleFirst(duration, unit, Schedulers.computation());
661     }
662 
663     public static <T> Transformer<T, T> sampleFirst(final long duration, final TimeUnit unit,
664             final Scheduler scheduler) {
665         if (duration <= 0) {
666             throw new IllegalArgumentException("duration must be > 0");
667         }
668         return new Transformer<T, T>() {
669 
670             @Override
671             public Observable<T> call(Observable<T> source) {
672                 return source.lift(new OperatorSampleFirst<T>(duration, unit, scheduler));
673             }
674         };
675     }
676 
677     public static <T> Transformer<T, T> onBackpressureBufferToFile() {
678         return onBackpressureBufferToFile(DataSerializers.<T>javaIO(), Schedulers.computation(),
679                 Options.defaultInstance());
680     }
681 
682     public static <T> Transformer<T, T> onBackpressureBufferToFile(
683             final DataSerializer<T> serializer) {
684         return onBackpressureBufferToFile(serializer, Schedulers.computation(),
685                 Options.defaultInstance());
686     }
687 
688     public static <T> Transformer<T, T> onBackpressureBufferToFile(
689             final DataSerializer<T> serializer, final Scheduler scheduler) {
690         return onBackpressureBufferToFile(serializer, scheduler, Options.defaultInstance());
691     }
692 
693     public static <T> Transformer<T, T> onBackpressureBufferToFile(
694             final DataSerializer<T> serializer, final Scheduler scheduler, final Options options) {
695         return new Transformer<T, T>() {
696             @Override
697             public Observable<T> call(Observable<T> o) {
698                 return o.lift(new OperatorBufferToFile<T>(serializer, scheduler, options));
699             }
700         };
701     }
702 
703     public static <T> Transformer<T, T> windowMin(final int windowSize,
704             final Comparator<? super T> comparator) {
705         return new Transformer<T, T>() {
706             @Override
707             public Observable<T> call(Observable<T> o) {
708                 return o.lift(new OperatorWindowMinMax<T>(windowSize, comparator, Metric.MIN));
709             }
710         };
711     }
712 
713     public static <T extends Comparable<T>> Transformer<T, T> windowMax(final int windowSize) {
714         return windowMax(windowSize, Transformers.<T>naturalComparator());
715     }
716 
717     public static <T> Transformer<T, T> windowMax(final int windowSize,
718             final Comparator<? super T> comparator) {
719         return new Transformer<T, T>() {
720             @Override
721             public Observable<T> call(Observable<T> o) {
722                 return o.lift(new OperatorWindowMinMax<T>(windowSize, comparator, Metric.MAX));
723             }
724         };
725     }
726 
727     public static <T extends Comparable<T>> Transformer<T, T> windowMin(final int windowSize) {
728         return windowMin(windowSize, Transformers.<T>naturalComparator());
729     }
730 
731     private static class NaturalComparatorHolder {
732         static final Comparator<Comparable<Object>> INSTANCE = new Comparator<Comparable<Object>>() {
733 
734             @Override
735             public int compare(Comparable<Object> o1, Comparable<Object> o2) {
736                 return o1.compareTo(o2);
737             }
738         };
739     }
740 
741     @SuppressWarnings("unchecked")
742     private static <T extends Comparable<T>> Comparator<T> naturalComparator() {
743         return (Comparator<T>) (Comparator<?>) NaturalComparatorHolder.INSTANCE;
744     }
745 
746     /**
747      * <p>
748      * Groups the items emitted by an {@code Observable} according to a
749      * specified criterion, and emits these grouped items as
750      * {@link GroupedObservable}s. The emitted {@code GroupedObservable} allows
751      * only a single {@link Subscriber} during its lifetime and if this
752      * {@code Subscriber} unsubscribes before the source terminates, the next
753      * emission by the source having the same key will trigger a new
754      * {@code GroupedObservable} emission.
755      * <p>
756      * <img width="640" height="360" src=
757      * "https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupBy.png"
758      * alt="">
759      * <p>
760      * <em>Note:</em> A {@link GroupedObservable} will cache the items it is to
761      * emit until such time as it is subscribed to. For this reason, in order to
762      * avoid memory leaks, you should not simply ignore those
763      * {@code GroupedObservable}s that do not concern you. Instead, you can
764      * signal to them that they may discard their buffers by applying an
765      * operator like {@code .ignoreElements()} to them.
766      * <dl>
767      * <dt><b>Scheduler:</b></dt>
768      * <dd>{@code groupBy} does not operate by default on a particular
769      * {@link Scheduler}.</dd>
770      * </dl>
771      * 
772      * @param keySelector
773      *            a function that extracts the key for each item
774      * @param elementSelector
775      *            a function that extracts the return element for each item
776      * @param evictingMapFactory
777      *            a function that given an eviction action returns a {@link Map}
778      *            instance that will be used to assign items to the appropriate
779      *            {@code GroupedObservable}s. The {@code Map} instance must be
780      *            thread-safe and any eviction must trigger a call to the
781      *            supplied action (synchronously or asynchronously). This can be
782      *            used to limit the size of the map by evicting keys by maximum
783      *            size or access time for instance. If
784      *            {@code evictingMapFactory} is null then no eviction strategy
785      *            will be applied (and a suitable default thread-safe
786      *            implementation of {@code Map} will be supplied). Here's an
787      *            example using Guava's {@code CacheBuilder} from v19.0:
788      * 
789      *            <pre>
790      *            {@code
791      *            Func1<Action1<K>, Map<K, Object>> mapFactory 
792      *              = action -> CacheBuilder.newBuilder()
793      *                  .maximumSize(1000)
794      *                  .expireAfterAccess(12, TimeUnit.HOUR)
795      *                  .removalListener(key -> action.call(key))
796      *                  .<K, Object> build().asMap();
797      *            }
798      *            </pre>
799      * 
800      * @param <T>
801      *            the type of the input observable
802      * @param <K>
803      *            the key type
804      * @param <R>
805      *            the element type
806      * @return an {@code Observable} that emits {@link GroupedObservable}s, each
807      *         of which corresponds to a unique key value and each of which
808      *         emits those items from the source Observable that share that key
809      *         value
810      * @see <a href="http://reactivex.io/documentation/operators/groupby.html">
811      *      ReactiveX operators documentation: GroupBy</a>
812      */
813     public static <T, K, R> Transformer<T, GroupedObservable<K, R>> groupByEvicting(
814             final Func1<? super T, ? extends K> keySelector,
815             final Func1<? super T, ? extends R> elementSelector,
816             final Func1<Action1<K>, Map<K, Object>> evictingMapFactory) {
817         return new Transformer<T, GroupedObservable<K, R>>() {
818 
819             @Override
820             public Observable<GroupedObservable<K, R>> call(Observable<T> o) {
821                 return o.groupBy(keySelector, elementSelector, evictingMapFactory);
822             }
823         };
824     }
825 
826     /**
827      * If multiple concurrently open subscriptions happen to a source
828      * transformed by this method then an additional do-nothing subscription
829      * will be maintained to the source and will only be closed after the
830      * specified duration has passed from the final unsubscription of the open
831      * subscriptions. If another subscription happens during this wait period
832      * then the scheduled unsubscription will be cancelled.
833      * 
834      * @param duration
835      *            duration of period to leave at least one source subscription
836      *            open
837      * @param unit
838      *            units for duration
839      * @param <T>
840      *            generic type of stream
841      * @return transformer
842      */
843     public static <T> Transformer<T, T> delayFinalUnsubscribe(long duration, TimeUnit unit) {
844         return delayFinalUnsubscribe(duration, unit, Schedulers.computation());
845     }
846 
847     /**
848      * If multiple concurrently open subscriptions happen to a source
849      * transformed by this method then an additional do-nothing subscription
850      * will be maintained to the source and will only be closed after the
851      * specified duration has passed from the final unsubscription of the open
852      * subscriptions. If another subscription happens during this wait period
853      * then the scheduled unsubscription will be cancelled.
854      * 
855      * @param duration
856      *            duration of period to leave at least one source subscription
857      *            open
858      * @param unit
859      *            units for duration
860      * @param scheduler
861      *            scheduler to use to schedule wait for unsubscribe
862      * @param <T>
863      *            generic type of stream
864      * @return transformer
865      */
866     public static <T> Transformer<T, T> delayFinalUnsubscribe(long duration, TimeUnit unit,
867             Scheduler scheduler) {
868         return new TransformerDelayFinalUnsubscribe<T>(unit.toMillis(duration), scheduler);
869     }
870 
871     /**
872      * Removes pairs non-recursively from a stream. Uses
873      * {@code Transformers.stateMachine()} under the covers to ensure items are
874      * emitted as soon as possible (if an item can't be in a pair then it is
875      * emitted straight away).
876      * 
877      * @param isCandidateForFirst
878      *            returns true if item is potentially the first of a pair that
879      *            we want to remove
880      * @param remove
881      *            returns true if a pair should be removed
882      * @param <T>
883      *            generic type of stream being transformed
884      * @return transformed stream
885      */
886     public static <T> Transformer<T, T> removePairs(
887             final Func1<? super T, Boolean> isCandidateForFirst,
888             final Func2<? super T, ? super T, Boolean> remove) {
889         return new Transformer<T, T>() {
890 
891             @Override
892             public Observable<T> call(Observable<T> o) {
893                 return o.compose(Transformers. //
894                 stateMachine() //
895                         .initialState(Optional.<T>absent()) //
896                         .transition(new Transition<Optional<T>, T, T>() {
897 
898                             @Override
899                             public Optional<T> call(Optional<T> state, T value,
900                                     Subscriber<T> subscriber) {
901                                 if (!state.isPresent()) {
902                                     if (isCandidateForFirst.call(value)) {
903                                         return Optional.of(value);
904                                     } else {
905                                         subscriber.onNext(value);
906                                         return Optional.absent();
907                                     }
908                                 } else {
909                                     if (remove.call(state.get(), value)) {
910                                         // emit nothing and reset state
911                                         return Optional.absent();
912                                     } else {
913                                         subscriber.onNext(state.get());
914                                         if (isCandidateForFirst.call(value)) {
915                                             return Optional.of(value);
916                                         } else {
917                                             subscriber.onNext(value);
918                                             return Optional.absent();
919                                         }
920                                     }
921                                 }
922                             }
923                         }).completion(new Completion<Optional<T>, T>() {
924 
925                             @Override
926                             public Boolean call(Optional<T> state, Subscriber<T> subscriber) {
927                                 if (state.isPresent())
928                                     subscriber.onNext(state.get());
929                                 // yes, complete
930                                 return true;
931                             }
932                         }).build());
933             }
934         };
935     }
936 
937     /**
938      * Rather than requesting {@code Long.MAX_VALUE} of upstream as does
939      * `Observable.onBackpressureBuffer`, this variant only requests of upstream
940      * what is requested of it. Thus an operator can be written that
941      * overproduces.
942      * 
943      * @param <T>
944      *            the value type
945      * @return transformer that buffers on backpressure but only requests of
946      *         upstream what is requested of it
947      */
948     public static <T> Transformer<T, T> onBackpressureBufferRequestLimiting() {
949         return TransformerOnBackpressureBufferRequestLimiting.instance();
950     }
951 
952     /**
953      * Buffers the elements into continuous, non-overlapping Lists where the
954      * boundary is determined by a predicate receiving each item, after being
955      * buffered, and returns true to indicate a new buffer should start.
956      * 
957      * <p>
958      * The operator won't return an empty first or last buffer.
959      * 
960      * <dl>
961      * <dt><b>Backpressure Support:</b></dt>
962      * <dd>This operator supports backpressure.</dd>
963      * <dt><b>Scheduler:</b></dt>
964      * <dd>This operator does not operate by default on a particular
965      * {@link Scheduler}.</dd>
966      * </dl>
967      * 
968      * @param <T>
969      *            the input value type
970      * @param predicate
971      *            the Func1 that receives each item, after being buffered, and
972      *            should return true to indicate a new buffer has to start.
973      * @return the new Observable instance
974      * @see #bufferWhile(Func1)
975      * @since (if this graduates from Experimental/Beta to supported, replace
976      *        this parenthetical with the release number)
977      */
978     public static final <T> Transformer<T, List<T>> bufferUntil(
979             Func1<? super T, Boolean> predicate) {
980         return bufferUntil(predicate, 10);
981     }
982 
983     /**
984      * Buffers the elements into continuous, non-overlapping Lists where the
985      * boundary is determined by a predicate receiving each item, after being
986      * buffered, and returns true to indicate a new buffer should start.
987      * 
988      * <p>
989      * The operator won't return an empty first or last buffer.
990      * 
991      * <dl>
992      * <dt><b>Backpressure Support:</b></dt>
993      * <dd>This operator supports backpressure.</dd>
994      * <dt><b>Scheduler:</b></dt>
995      * <dd>This operator does not operate by default on a particular
996      * {@link Scheduler}.</dd>
997      * </dl>
998      * 
999      * @param <T>
1000      *            the input value type
1001      * @param predicate
1002      *            the Func1 that receives each item, after being buffered, and
1003      *            should return true to indicate a new buffer has to start.
1004      * @return the new Observable instance
1005      * @see #bufferWhile(Func1)
1006      * @since (if this graduates from Experimental/Beta to supported, replace
1007      *        this parenthetical with the release number)
1008      */
1009     public static final <T> Transformer<T, List<T>> toListUntil(
1010             Func1<? super T, Boolean> predicate) {
1011         return bufferUntil(predicate);
1012     }
1013 
1014     /**
1015      * Buffers the elements into continuous, non-overlapping Lists where the
1016      * boundary is determined by a predicate receiving each item, after being
1017      * buffered, and returns true to indicate a new buffer should start.
1018      * 
1019      * <p>
1020      * The operator won't return an empty first or last buffer.
1021      * 
1022      * <dl>
1023      * <dt><b>Backpressure Support:</b></dt>
1024      * <dd>This operator supports backpressure.</dd>
1025      * <dt><b>Scheduler:</b></dt>
1026      * <dd>This operator does not operate by default on a particular
1027      * {@link Scheduler}.</dd>
1028      * </dl>
1029      * 
1030      * @param <T>
1031      *            the input value type
1032      * @param predicate
1033      *            the Func1 that receives each item, after being buffered, and
1034      *            should return true to indicate a new buffer has to start.
1035      * @param capacityHint
1036      *            the expected number of items in each buffer
1037      * @return the new Observable instance
1038      * @see #bufferWhile(Func1)
1039      * @since (if this graduates from Experimental/Beta to supported, replace
1040      *        this parenthetical with the release number)
1041      */
1042     public static final <T> Transformer<T, List<T>> bufferUntil(Func1<? super T, Boolean> predicate,
1043             int capacityHint) {
1044         return new OperatorBufferPredicateBoundary<T>(predicate, RxRingBuffer.SIZE, capacityHint,
1045                 true);
1046     }
1047 
1048     /**
1049      * Buffers the elements into continuous, non-overlapping Lists where the
1050      * boundary is determined by a predicate receiving each item, after being
1051      * buffered, and returns true to indicate a new buffer should start.
1052      * 
1053      * <p>
1054      * The operator won't return an empty first or last buffer.
1055      * 
1056      * <dl>
1057      * <dt><b>Backpressure Support:</b></dt>
1058      * <dd>This operator supports backpressure.</dd>
1059      * <dt><b>Scheduler:</b></dt>
1060      * <dd>This operator does not operate by default on a particular
1061      * {@link Scheduler}.</dd>
1062      * </dl>
1063      * 
1064      * @param <T>
1065      *            the input value type
1066      * @param predicate
1067      *            the Func1 that receives each item, after being buffered, and
1068      *            should return true to indicate a new buffer has to start.
1069      * @param capacityHint
1070      *            the expected number of items in each buffer
1071      * @return the new Observable instance
1072      * @see #bufferWhile(Func1)
1073      * @since (if this graduates from Experimental/Beta to supported, replace
1074      *        this parenthetical with the release number)
1075      */
1076     public static final <T> Transformer<T, List<T>> toListUntil(Func1<? super T, Boolean> predicate,
1077             int capacityHint) {
1078         return bufferUntil(predicate, capacityHint);
1079     }
1080 
1081     /**
1082      * Buffers the elements into continuous, non-overlapping Lists where the
1083      * boundary is determined by a predicate receiving each item, before or
1084      * after being buffered, and returns true to indicate a new buffer should
1085      * start.
1086      * 
1087      * <p>
1088      * The operator won't return an empty first or last buffer.
1089      * 
1090      * <dl>
1091      * <dt><b>Backpressure Support:</b></dt>
1092      * <dd>This operator supports backpressure.</dd>
1093      * <dt><b>Scheduler:</b></dt>
1094      * <dd>This operator does not operate by default on a particular
1095      * {@link Scheduler}.</dd>
1096      * </dl>
1097      * 
1098      * @param <T>
1099      *            the input value type
1100      * @param predicate
1101      *            the Func1 that receives each item, before being buffered, and
1102      *            should return true to indicate a new buffer has to start.
1103      * @return the new Observable instance
1104      * @see #bufferWhile(Func1)
1105      * @since (if this graduates from Experimental/Beta to supported, replace
1106      *        this parenthetical with the release number)
1107      */
1108     public static final <T> Transformer<T, List<T>> bufferWhile(
1109             Func1<? super T, Boolean> predicate) {
1110         return bufferWhile(predicate, 10);
1111     }
1112 
1113     /**
1114      * Buffers the elements into continuous, non-overlapping Lists where the
1115      * boundary is determined by a predicate receiving each item, before or
1116      * after being buffered, and returns true to indicate a new buffer should
1117      * start.
1118      * 
1119      * <p>
1120      * The operator won't return an empty first or last buffer.
1121      * 
1122      * <dl>
1123      * <dt><b>Backpressure Support:</b></dt>
1124      * <dd>This operator supports backpressure.</dd>
1125      * <dt><b>Scheduler:</b></dt>
1126      * <dd>This operator does not operate by default on a particular
1127      * {@link Scheduler}.</dd>
1128      * </dl>
1129      * 
1130      * @param <T>
1131      *            the input value type
1132      * @param predicate
1133      *            the Func1 that receives each item, before being buffered, and
1134      *            should return true to indicate a new buffer has to start.
1135      * @return the new Observable instance
1136      * @see #bufferWhile(Func1)
1137      * @since (if this graduates from Experimental/Beta to supported, replace
1138      *        this parenthetical with the release number)
1139      */
1140     public static final <T> Transformer<T, List<T>> toListWhile(
1141             Func1<? super T, Boolean> predicate) {
1142         return bufferWhile(predicate);
1143     }
1144 
1145     /**
1146      * Buffers the elements into continuous, non-overlapping Lists where the
1147      * boundary is determined by a predicate receiving each item, before being
1148      * buffered, and returns true to indicate a new buffer should start.
1149      * 
1150      * <p>
1151      * The operator won't return an empty first or last buffer.
1152      * 
1153      * <dl>
1154      * <dt><b>Backpressure Support:</b></dt>
1155      * <dd>This operator supports backpressure.</dd>
1156      * <dt><b>Scheduler:</b></dt>
1157      * <dd>This operator does not operate by default on a particular
1158      * {@link Scheduler}.</dd>
1159      * </dl>
1160      * 
1161      * @param <T>
1162      *            the input value type
1163      * @param predicate
1164      *            the Func1 that receives each item, before being buffered, and
1165      *            should return true to indicate a new buffer has to start.
1166      * @param capacityHint
1167      *            the expected number of items in each buffer
1168      * @return the new Observable instance
1169      * @see #bufferWhile(Func1)
1170      * @since (if this graduates from Experimental/Beta to supported, replace
1171      *        this parenthetical with the release number)
1172      */
1173     public static final <T> Transformer<T, List<T>> bufferWhile(Func1<? super T, Boolean> predicate,
1174             int capacityHint) {
1175         return new OperatorBufferPredicateBoundary<T>(predicate, RxRingBuffer.SIZE, capacityHint,
1176                 false);
1177     }
1178 
1179     /**
1180      * Buffers the elements into continuous, non-overlapping Lists where the
1181      * boundary is determined by a predicate receiving each item, before being
1182      * buffered, and returns true to indicate a new buffer should start.
1183      * 
1184      * <p>
1185      * The operator won't return an empty first or last buffer.
1186      * 
1187      * <dl>
1188      * <dt><b>Backpressure Support:</b></dt>
1189      * <dd>This operator supports backpressure.</dd>
1190      * <dt><b>Scheduler:</b></dt>
1191      * <dd>This operator does not operate by default on a particular
1192      * {@link Scheduler}.</dd>
1193      * </dl>
1194      * 
1195      * @param <T>
1196      *            the input value type
1197      * @param predicate
1198      *            the Func1 that receives each item, before being buffered, and
1199      *            should return true to indicate a new buffer has to start.
1200      * @param capacityHint
1201      *            the expected number of items in each buffer
1202      * @return the new Observable instance
1203      * @see #bufferWhile(Func1)
1204      * @since (if this graduates from Experimental/Beta to supported, replace
1205      *        this parenthetical with the release number)
1206      */
1207     public static final <T> Transformer<T, List<T>> toListWhile(Func1<? super T, Boolean> predicate,
1208             int capacityHint) {
1209         return bufferWhile(predicate, capacityHint);
1210     }
1211 
1212     public static final <T> Transformer<T, T> delay(final Func1<? super T, Long> time,
1213             final Func0<Double> playRate, final long startTime, final Scheduler scheduler) {
1214         return new Transformer<T, T>() {
1215 
1216             @Override
1217             public Observable<T> call(final Observable<T> o) {
1218                 return Observable.defer(new Func0<Observable<T>>() {
1219                     long startActual = scheduler.now();
1220 
1221                     @Override
1222                     public Observable<T> call() {
1223                         return o.concatMap(new Func1<T, Observable<T>>() {
1224 
1225                             @Override
1226                             public Observable<T> call(T t) {
1227                                 return Observable.just(t) //
1228                                         .delay(delay(startActual, startTime, time.call(t), playRate,
1229                                                 scheduler.now()), TimeUnit.MILLISECONDS, scheduler);
1230                             }
1231 
1232                         });
1233                     }
1234                 });
1235             }
1236         };
1237 
1238     }
1239 
1240     private static long delay(long startActual, long startTime, long emissionTimestamp,
1241             Func0<Double> playRate, long now) {
1242         long elapsedActual = now - startActual;
1243         return Math.max(0,
1244                 Math.round((emissionTimestamp - startTime) / playRate.call() - elapsedActual));
1245     }
1246 
1247     /**
1248      * <p>
1249      * Modifies the source Observable so that it invokes an action when it calls
1250      * {@code onCompleted} and no items were emitted.
1251      * <dl>
1252      * <dt><b>Scheduler:</b></dt>
1253      * <dd>{@code doOnEmpty} does not operate by default on a particular
1254      * {@link Scheduler}.</dd>
1255      * </dl>
1256      *
1257      * @param onEmpty
1258      *            the action to invoke when the source Observable calls
1259      *            {@code onCompleted}, contingent on no items were emitted
1260      * @param <T>
1261      *            generic type of observable being transformed
1262      * @return the source Observable with the side-effecting behavior applied
1263      */
1264     public static final <T> Transformer<T, T> doOnEmpty(final Action0 onEmpty) {
1265         return new Transformer<T, T>() {
1266 
1267             @Override
1268             public Observable<T> call(Observable<T> o) {
1269                 return Observable.create(new OnSubscribeDoOnEmpty<T>(o, onEmpty));
1270             }
1271         };
1272     }
1273 
1274     public static final <T> Transformer<T, T> onTerminateResume(
1275             final Func1<Throwable, Observable<T>> onError, final Observable<T> onCompleted) {
1276         return new TransformerOnTerminateResume<T>(onError, onCompleted);
1277     }
1278 
1279     public static final <T> Transformer<T, T> repeatLast() {
1280         return new Transformer<T, T>() {
1281 
1282             @Override
1283             public Observable<T> call(Observable<T> o) {
1284                 return o.materialize().buffer(2, 1)
1285                         .flatMap(new Func1<List<Notification<T>>, Observable<T>>() {
1286                             @Override
1287                             public Observable<T> call(List<Notification<T>> list) {
1288                                 Notification<T> a = list.get(0);
1289                                 if (list.size() == 2 && list.get(1).isOnCompleted()) {
1290                                     return Observable.just(a.getValue()).repeat();
1291                                 } else if (a.isOnError()) {
1292                                     return Observable.error(list.get(0).getThrowable());
1293                                 } else if (a.isOnCompleted()) {
1294                                     return Observable.empty();
1295                                 } else {
1296                                     return Observable.just(a.getValue());
1297                                 }
1298                             }
1299                         });
1300             }
1301         };
1302     }
1303 
1304     public static <T> Transformer<T, T> mapLast(final Func1<? super T, ? extends T> function) {
1305 
1306         return new Transformer<T, T>() {
1307 
1308             @Override
1309             public Observable<T> call(Observable<T> source) {
1310                 return Observable.create(new OnSubscribeMapLast<T>(source, function));
1311             }
1312         };
1313     }
1314 }