View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import rx.*;
4   import rx.Observable.*;
5   import rx.exceptions.*;
6   import rx.functions.Func1;
7   import rx.internal.producers.ProducerArbiter;
8   
9   /**
10   * Switches to different Observables if the main source completes or signals an error.
11   *
12   * @param <T> the value type
13   */
14  public final class TransformerOnTerminateResume<T> implements Transformer<T, T> {
15      
16      final Func1<Throwable, Observable<T>> onError;
17      
18      final Observable<T> onCompleted;
19  
20      public TransformerOnTerminateResume(Func1<Throwable, Observable<T>> onError, Observable<T> onCompleted) {
21          this.onError = onError;
22          this.onCompleted = onCompleted;
23      }
24  
25      @Override
26      public Observable<T> call(final Observable<T> o) {
27          return Observable.create(new OnSubscribe<T>() {
28              @Override
29              public void call(Subscriber<? super T> t) {
30                  OnTerminateResumeSubscriber<T> parent = new OnTerminateResumeSubscriber<T>(t, onError, onCompleted);
31                  
32                  t.add(parent);
33                  t.setProducer(parent.arbiter);
34                  
35                  o.unsafeSubscribe(parent);
36              }
37          });
38      }
39      
40      static final class OnTerminateResumeSubscriber<T> extends Subscriber<T> {
41  
42          final Subscriber<? super T> actual;
43          
44          final Func1<Throwable, Observable<T>> onError;
45          
46          final Observable<T> onCompleted;
47  
48          final ProducerArbiter arbiter;
49          
50          long produced;
51          
52          public OnTerminateResumeSubscriber(Subscriber<? super T> actual, Func1<Throwable, 
53                  Observable<T>> onError,
54                  Observable<T> onCompleted) {
55              this.arbiter = new ProducerArbiter();
56              this.actual = actual;
57              this.onError = onError;
58              this.onCompleted = onCompleted;
59          }
60  
61          @Override
62          public void onNext(T t) {
63              produced++;
64              actual.onNext(t);
65          }
66  
67          @Override
68          public void onError(Throwable e) {
69              long p = produced;
70              if (p != 0L) {
71                  arbiter.produced(p);
72              }
73              
74              Observable<T> o;
75              
76              try {
77                  o = onError.call(e);
78              } catch (Throwable ex) {
79                  Exceptions.throwIfFatal(ex);
80                  
81                  actual.onError(new CompositeException(e, ex));
82                  
83                  return;
84              }
85              
86              if (o == null) {
87                  actual.onError(new NullPointerException("The onError function returned a null Observable."));
88              } else {
89                  o.unsafeSubscribe(new ResumeSubscriber<T>(actual, arbiter));
90              }
91          }
92  
93          @Override
94          public void onCompleted() {
95              long p = produced;
96              if (p != 0L) {
97                  arbiter.produced(p);
98              }
99              onCompleted.unsafeSubscribe(new ResumeSubscriber<T>(actual, arbiter));
100         }
101         
102         @Override
103         public void setProducer(Producer p) {
104             arbiter.setProducer(p);
105         }
106         
107         static final class ResumeSubscriber<T> extends Subscriber<T> {
108             final Subscriber<? super T> actual;
109             
110             final ProducerArbiter arbiter;
111 
112             public ResumeSubscriber(Subscriber<? super T> actual, ProducerArbiter arbiter) {
113                 this.actual = actual;
114                 this.arbiter = arbiter;
115             }
116 
117             @Override
118             public void onCompleted() {
119                 actual.onCompleted();
120             }
121 
122             @Override
123             public void onError(Throwable e) {
124                 actual.onError(e);
125             }
126 
127             @Override
128             public void onNext(T t) {
129                 actual.onNext(t);
130             }
131 
132             @Override
133             public void setProducer(Producer p) {
134                 arbiter.setProducer(p);
135             }
136         }
137     }
138 }