1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.util.concurrent.atomic.AtomicBoolean;
4
5 import org.reactivestreams.Subscriber;
6 import org.reactivestreams.Subscription;
7
8 import io.reactivex.Flowable;
9 import io.reactivex.FlowableSubscriber;
10 import io.reactivex.exceptions.Exceptions;
11 import io.reactivex.functions.Function;
12 import io.reactivex.internal.subscriptions.SubscriptionHelper;
13
14 public final class FlowableMapLast<T> extends Flowable<T> {
15
16 private final Flowable<T> source;
17 private final Function<? super T, ? extends T> function;
18
19 public FlowableMapLast(Flowable<T> source, Function<? super T, ? extends T> function) {
20 this.source = source;
21 this.function = function;
22 }
23
24 @Override
25 protected void subscribeActual(Subscriber<? super T> s) {
26 source.subscribe(new MapLastSubscriber<T>(s, function));
27 }
28
29 private final static class MapLastSubscriber<T> implements FlowableSubscriber<T>, Subscription {
30
31 private static final Object EMPTY = new Object();
32
33 private final Subscriber<? super T> actual;
34 private final Function<? super T, ? extends T> function;
35 private final AtomicBoolean firstRequest = new AtomicBoolean(true);
36
37
38 @SuppressWarnings("unchecked")
39 private T value = (T) EMPTY;
40 private Subscription parent;
41 private boolean done;
42
43 public MapLastSubscriber(Subscriber<? super T> actual, Function<? super T, ? extends T> function) {
44 this.actual = actual;
45 this.function = function;
46 }
47
48 @Override
49 public void onSubscribe(Subscription subscription) {
50 if (SubscriptionHelper.validate(this.parent, subscription)) {
51 this.parent = subscription;
52 actual.onSubscribe(this);
53 }
54 }
55
56 @Override
57 public void onNext(T t) {
58 if (done) {
59 return;
60 }
61 if (value == EMPTY) {
62 value = t;
63 } else {
64 actual.onNext(value);
65 value = t;
66 }
67 }
68
69 @Override
70 public void onComplete() {
71 if (done) {
72 return;
73 }
74 if (value != EMPTY) {
75 T value2;
76 try {
77 value2 = function.apply(value);
78 } catch (Throwable e) {
79 Exceptions.throwIfFatal(e);
80 parent.cancel();
81 onError(e);
82 return;
83 }
84 actual.onNext(value2);
85 }
86 done = true;
87 actual.onComplete();
88 }
89
90 @Override
91 public void onError(Throwable e) {
92 if (done) {
93 return;
94 }
95 if (value != EMPTY) {
96 actual.onNext(value);
97 }
98 done = true;
99 actual.onError(e);
100 }
101
102 @Override
103 public void cancel() {
104 parent.cancel();
105 }
106
107 @Override
108 public void request(long n) {
109 if (SubscriptionHelper.validate(n)) {
110 if (firstRequest.compareAndSet(true, false)) {
111 long m = n + 1;
112 if (m < 0) {
113 m = Long.MAX_VALUE;
114 }
115 parent.request(m);
116 } else {
117 parent.request(n);
118 }
119 }
120 }
121
122 }
123
124 }