1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.util.concurrent.atomic.AtomicBoolean;
4
5 import rx.Observable;
6 import rx.Observable.OnSubscribe;
7 import rx.exceptions.Exceptions;
8 import rx.exceptions.OnErrorThrowable;
9 import rx.Producer;
10 import rx.Subscriber;
11 import rx.functions.Func1;
12
13 public final class OnSubscribeMapLast<T> implements OnSubscribe<T> {
14
15 private final Observable<T> source;
16 private final Func1<? super T, ? extends T> function;
17
18 public OnSubscribeMapLast(Observable<T> source, Func1<? super T, ? extends T> function) {
19 this.source = source;
20 this.function = function;
21 }
22
23 @Override
24 public void call(Subscriber<? super T> child) {
25 final MapLastSubscriber<T> parent = new MapLastSubscriber<T>(child, function);
26 child.add(parent);
27 child.setProducer(new Producer() {
28 @Override
29 public void request(long n) {
30 parent.requestMore(n);
31 }
32 });
33 source.unsafeSubscribe(parent);
34 }
35
36 private final static class MapLastSubscriber<T> extends Subscriber<T> {
37
38 private static final Object EMPTY = new Object();
39
40 private final Subscriber<? super T> child;
41 private final Func1<? super T, ? extends T> function;
42 private final AtomicBoolean firstRequest = new AtomicBoolean(true);
43 @SuppressWarnings("unchecked")
44 private T value = (T) EMPTY;
45
46 public MapLastSubscriber(Subscriber<? super T> child,
47 Func1<? super T, ? extends T> function) {
48 this.child = child;
49 this.function = function;
50 }
51
52 public void requestMore(long n) {
53 if (n < 0) {
54 throw new IllegalArgumentException("cannot request negative amount");
55 } else if (n == 0) {
56 return;
57 } else if (firstRequest.compareAndSet(true, false)) {
58 long m = n + 1;
59 if (m < 0) {
60 m = Long.MAX_VALUE;
61 }
62 request(m);
63 } else {
64 request(n);
65 }
66 }
67
68 @Override
69 public void onCompleted() {
70 if (value != EMPTY) {
71 T value2;
72 try {
73 value2 = function.call(value);
74 } catch (Throwable e) {
75 Exceptions.throwIfFatal(e);
76 onError(OnErrorThrowable.addValueAsLastCause(e, value));
77 return;
78 }
79 child.onNext(value2);
80 }
81 child.onCompleted();
82 }
83
84 @Override
85 public void onError(Throwable e) {
86 if (value != EMPTY) {
87 child.onNext(value);
88 }
89 child.onError(e);
90 }
91
92 @Override
93 public void onNext(T t) {
94 if (value == EMPTY) {
95 value = t;
96 } else {
97 child.onNext(value);
98 value = t;
99 }
100 }
101
102 }
103
104 }