1 package com.github.davidmoten.rx.internal.operators;
2
3 import rx.*;
4 import rx.Observable.*;
5 import rx.exceptions.*;
6 import rx.functions.Func1;
7 import rx.internal.producers.ProducerArbiter;
8
9
10
11
12
13
14 public final class TransformerOnTerminateResume<T> implements Transformer<T, T> {
15
16 final Func1<Throwable, Observable<T>> onError;
17
18 final Observable<T> onCompleted;
19
20 public TransformerOnTerminateResume(Func1<Throwable, Observable<T>> onError, Observable<T> onCompleted) {
21 this.onError = onError;
22 this.onCompleted = onCompleted;
23 }
24
25 @Override
26 public Observable<T> call(final Observable<T> o) {
27 return Observable.create(new OnSubscribe<T>() {
28 @Override
29 public void call(Subscriber<? super T> t) {
30 OnTerminateResumeSubscriber<T> parent = new OnTerminateResumeSubscriber<T>(t, onError, onCompleted);
31
32 t.add(parent);
33 t.setProducer(parent.arbiter);
34
35 o.unsafeSubscribe(parent);
36 }
37 });
38 }
39
40 static final class OnTerminateResumeSubscriber<T> extends Subscriber<T> {
41
42 final Subscriber<? super T> actual;
43
44 final Func1<Throwable, Observable<T>> onError;
45
46 final Observable<T> onCompleted;
47
48 final ProducerArbiter arbiter;
49
50 long produced;
51
52 public OnTerminateResumeSubscriber(Subscriber<? super T> actual, Func1<Throwable,
53 Observable<T>> onError,
54 Observable<T> onCompleted) {
55 this.arbiter = new ProducerArbiter();
56 this.actual = actual;
57 this.onError = onError;
58 this.onCompleted = onCompleted;
59 }
60
61 @Override
62 public void onNext(T t) {
63 produced++;
64 actual.onNext(t);
65 }
66
67 @Override
68 public void onError(Throwable e) {
69 long p = produced;
70 if (p != 0L) {
71 arbiter.produced(p);
72 }
73
74 Observable<T> o;
75
76 try {
77 o = onError.call(e);
78 } catch (Throwable ex) {
79 Exceptions.throwIfFatal(ex);
80
81 actual.onError(new CompositeException(e, ex));
82
83 return;
84 }
85
86 if (o == null) {
87 actual.onError(new NullPointerException("The onError function returned a null Observable."));
88 } else {
89 o.unsafeSubscribe(new ResumeSubscriber<T>(actual, arbiter));
90 }
91 }
92
93 @Override
94 public void onCompleted() {
95 long p = produced;
96 if (p != 0L) {
97 arbiter.produced(p);
98 }
99 onCompleted.unsafeSubscribe(new ResumeSubscriber<T>(actual, arbiter));
100 }
101
102 @Override
103 public void setProducer(Producer p) {
104 arbiter.setProducer(p);
105 }
106
107 static final class ResumeSubscriber<T> extends Subscriber<T> {
108 final Subscriber<? super T> actual;
109
110 final ProducerArbiter arbiter;
111
112 public ResumeSubscriber(Subscriber<? super T> actual, ProducerArbiter arbiter) {
113 this.actual = actual;
114 this.arbiter = arbiter;
115 }
116
117 @Override
118 public void onCompleted() {
119 actual.onCompleted();
120 }
121
122 @Override
123 public void onError(Throwable e) {
124 actual.onError(e);
125 }
126
127 @Override
128 public void onNext(T t) {
129 actual.onNext(t);
130 }
131
132 @Override
133 public void setProducer(Producer p) {
134 arbiter.setProducer(p);
135 }
136 }
137 }
138 }