View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.util.concurrent.atomic.AtomicBoolean;
4   
5   import org.reactivestreams.Subscriber;
6   import org.reactivestreams.Subscription;
7   
8   import io.reactivex.Flowable;
9   import io.reactivex.FlowableSubscriber;
10  import io.reactivex.exceptions.Exceptions;
11  import io.reactivex.functions.Function;
12  import io.reactivex.internal.subscriptions.SubscriptionHelper;
13  
14  public final class FlowableMapLast<T> extends Flowable<T> {
15  
16      private final Flowable<T> source;
17      private final Function<? super T, ? extends T> function;
18  
19      public FlowableMapLast(Flowable<T> source, Function<? super T, ? extends T> function) {
20          this.source = source;
21          this.function = function;
22      }
23  
24      @Override
25      protected void subscribeActual(Subscriber<? super T> s) {
26          source.subscribe(new MapLastSubscriber<T>(s, function));
27      }
28  
29      private final static class MapLastSubscriber<T> implements FlowableSubscriber<T>, Subscription {
30  
31          private static final Object EMPTY = new Object();
32  
33          private final Subscriber<? super T> actual;
34          private final Function<? super T, ? extends T> function;
35          private final AtomicBoolean firstRequest = new AtomicBoolean(true);
36  
37          // mutable state
38          @SuppressWarnings("unchecked")
39          private T value = (T) EMPTY;
40          private Subscription parent;
41          private boolean done;
42  
43          public MapLastSubscriber(Subscriber<? super T> actual, Function<? super T, ? extends T> function) {
44              this.actual = actual;
45              this.function = function;
46          }
47  
48          @Override
49          public void onSubscribe(Subscription subscription) {
50              if (SubscriptionHelper.validate(this.parent, subscription)) {
51                  this.parent = subscription;
52                  actual.onSubscribe(this);
53              }
54          }
55  
56          @Override
57          public void onNext(T t) {
58              if (done) {
59                  return;
60              }
61              if (value == EMPTY) {
62                  value = t;
63              } else {
64                  actual.onNext(value);
65                  value = t;
66              }
67          }
68  
69          @Override
70          public void onComplete() {
71              if (done) {
72                  return;
73              }
74              if (value != EMPTY) {
75                  T value2;
76                  try {
77                      value2 = function.apply(value);
78                  } catch (Throwable e) {
79                      Exceptions.throwIfFatal(e);
80                      parent.cancel();
81                      onError(e);
82                      return;
83                  }
84                  actual.onNext(value2);
85              }
86              done = true;
87              actual.onComplete();
88          }
89  
90          @Override
91          public void onError(Throwable e) {
92              if (done) {
93                  return;
94              }
95              if (value != EMPTY) {
96                  actual.onNext(value);
97              }
98              done = true;
99              actual.onError(e);
100         }
101 
102         @Override
103         public void cancel() {
104             parent.cancel();
105         }
106 
107         @Override
108         public void request(long n) {
109             if (SubscriptionHelper.validate(n)) {
110                 if (firstRequest.compareAndSet(true, false)) {
111                     long m = n + 1;
112                     if (m < 0) {
113                         m = Long.MAX_VALUE;
114                     }
115                     parent.request(m);
116                 } else {
117                     parent.request(n);
118                 }
119             }
120         }
121 
122     }
123 
124 }