View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.util.concurrent.atomic.AtomicBoolean;
4   
5   import rx.Observable;
6   import rx.Observable.OnSubscribe;
7   import rx.exceptions.Exceptions;
8   import rx.exceptions.OnErrorThrowable;
9   import rx.Producer;
10  import rx.Subscriber;
11  import rx.functions.Func1;
12  
13  public final class OnSubscribeMapLast<T> implements OnSubscribe<T> {
14  
15      private final Observable<T> source;
16      private final Func1<? super T, ? extends T> function;
17  
18      public OnSubscribeMapLast(Observable<T> source, Func1<? super T, ? extends T> function) {
19          this.source = source;
20          this.function = function;
21      }
22  
23      @Override
24      public void call(Subscriber<? super T> child) {
25          final MapLastSubscriber<T> parent = new MapLastSubscriber<T>(child, function);
26          child.add(parent);
27          child.setProducer(new Producer() {
28              @Override
29              public void request(long n) {
30                  parent.requestMore(n);
31              }
32          });
33          source.unsafeSubscribe(parent);
34      }
35  
36      private final static class MapLastSubscriber<T> extends Subscriber<T> {
37  
38          private static final Object EMPTY = new Object();
39  
40          private final Subscriber<? super T> child;
41          private final Func1<? super T, ? extends T> function;
42          private final AtomicBoolean firstRequest = new AtomicBoolean(true);
43          @SuppressWarnings("unchecked")
44          private T value = (T) EMPTY;
45  
46          public MapLastSubscriber(Subscriber<? super T> child,
47                  Func1<? super T, ? extends T> function) {
48              this.child = child;
49              this.function = function;
50          }
51  
52          public void requestMore(long n) {
53              if (n < 0) {
54                  throw new IllegalArgumentException("cannot request negative amount");
55              } else if (n == 0) {
56                  return;
57              } else if (firstRequest.compareAndSet(true, false)) {
58                  long m = n + 1;
59                  if (m < 0) {
60                      m = Long.MAX_VALUE;
61                  }
62                  request(m);
63              } else {
64                  request(n);
65              }
66          }
67  
68          @Override
69          public void onCompleted() {
70              if (value != EMPTY) {
71                  T value2;
72                  try {
73                      value2 = function.call(value);
74                  } catch (Throwable e) {
75                      Exceptions.throwIfFatal(e);
76                      onError(OnErrorThrowable.addValueAsLastCause(e, value));
77                      return;
78                  }
79                  child.onNext(value2);
80              }
81              child.onCompleted();
82          }
83  
84          @Override
85          public void onError(Throwable e) {
86              if (value != EMPTY) {
87                  child.onNext(value);
88              }
89              child.onError(e);
90          }
91  
92          @Override
93          public void onNext(T t) {
94              if (value == EMPTY) {
95                  value = t;
96              } else {
97                  child.onNext(value);
98                  value = t;
99              }
100         }
101 
102     }
103 
104 }