1 package com.github.davidmoten.rx.internal.operators; 2 3 import java.util.concurrent.atomic.AtomicBoolean; 4 5 import rx.Observable; 6 import rx.Observable.OnSubscribe; 7 import rx.exceptions.Exceptions; 8 import rx.exceptions.OnErrorThrowable; 9 import rx.Producer; 10 import rx.Subscriber; 11 import rx.functions.Func1; 12 13 public final class OnSubscribeMapLast<T> implements OnSubscribe<T> { 14 15 private final Observable<T> source; 16 private final Func1<? super T, ? extends T> function; 17 18 public OnSubscribeMapLast(Observable<T> source, Func1<? super T, ? extends T> function) { 19 this.source = source; 20 this.function = function; 21 } 22 23 @Override 24 public void call(Subscriber<? super T> child) { 25 final MapLastSubscriber<T> parent = new MapLastSubscriber<T>(child, function); 26 child.add(parent); 27 child.setProducer(new Producer() { 28 @Override 29 public void request(long n) { 30 parent.requestMore(n); 31 } 32 }); 33 source.unsafeSubscribe(parent); 34 } 35 36 private final static class MapLastSubscriber<T> extends Subscriber<T> { 37 38 private static final Object EMPTY = new Object(); 39 40 private final Subscriber<? super T> child; 41 private final Func1<? super T, ? extends T> function; 42 private final AtomicBoolean firstRequest = new AtomicBoolean(true); 43 @SuppressWarnings("unchecked") 44 private T value = (T) EMPTY; 45 46 public MapLastSubscriber(Subscriber<? super T> child, 47 Func1<? super T, ? extends T> function) { 48 this.child = child; 49 this.function = function; 50 } 51 52 public void requestMore(long n) { 53 if (n < 0) { 54 throw new IllegalArgumentException("cannot request negative amount"); 55 } else if (n == 0) { 56 return; 57 } else if (firstRequest.compareAndSet(true, false)) { 58 long m = n + 1; 59 if (m < 0) { 60 m = Long.MAX_VALUE; 61 } 62 request(m); 63 } else { 64 request(n); 65 } 66 } 67 68 @Override 69 public void onCompleted() { 70 if (value != EMPTY) { 71 T value2; 72 try { 73 value2 = function.call(value); 74 } catch (Throwable e) { 75 Exceptions.throwIfFatal(e); 76 onError(OnErrorThrowable.addValueAsLastCause(e, value)); 77 return; 78 } 79 child.onNext(value2); 80 } 81 child.onCompleted(); 82 } 83 84 @Override 85 public void onError(Throwable e) { 86 if (value != EMPTY) { 87 child.onNext(value); 88 } 89 child.onError(e); 90 } 91 92 @Override 93 public void onNext(T t) { 94 if (value == EMPTY) { 95 value = t; 96 } else { 97 child.onNext(value); 98 value = t; 99 } 100 } 101 102 } 103 104 }