View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import rx.Observable;
4   import rx.Observable.OnSubscribe;
5   import rx.Observable.Operator;
6   import rx.Producer;
7   import rx.Subscriber;
8   import rx.functions.Func1;
9   
10  /**
11   * Converts an Transformer (a function converting one Observable into another)
12   * into an {@link Operator}.
13   * 
14   * @param <R>
15   *            to type
16   * @param <T>
17   *            from type
18   */
19  public final class OperatorFromTransformer<R, T> implements Operator<R, T> {
20  
21      public static <R, T> Operator<R, T> toOperator(
22              Func1<? super Observable<T>, ? extends Observable<R>> operation) {
23          return new OperatorFromTransformer<R, T>(operation);
24      }
25  
26      /**
27       * The operation to convert.
28       */
29      private final Func1<? super Observable<T>, ? extends Observable<R>> operation;
30  
31      /**
32       * Constructor.
33       * 
34       * @param operation
35       *            to be converted into {@link Operator}
36       */
37      public OperatorFromTransformer(
38              Func1<? super Observable<T>, ? extends Observable<R>> operation) {
39          this.operation = operation;
40      }
41  
42      @Override
43      public Subscriber<? super T> call(Subscriber<? super R> subscriber) {
44          final ParentSubscriber<T> parent = new ParentSubscriber<T>();
45          Observable<T> middle = Observable.create(new ForwarderOnSubscribe<T>(parent));
46          subscriber.add(parent);
47          operation.call(middle).unsafeSubscribe(subscriber);
48          return parent;
49      }
50  
51      static final class ForwarderOnSubscribe<T> implements OnSubscribe<T> {
52  
53          private final ParentSubscriber<T> parent;
54  
55          ForwarderOnSubscribe(ParentSubscriber<T> parent) {
56              this.parent = parent;
57          }
58  
59          @Override
60          public void call(Subscriber<? super T> sub) {
61              parent.subscriber = sub;
62              sub.setProducer(new Producer() {
63  
64                  @Override
65                  public void request(long n) {
66                      parent.requestMore(n);
67                  }
68              });
69          }
70      }
71  
72      static final class ParentSubscriber<T> extends Subscriber<T> {
73  
74          // TODO may not need to be volatile
75          volatile Subscriber<? super T> subscriber;
76  
77          void requestMore(long n) {
78              request(n);
79          }
80  
81          @Override
82          public void onCompleted() {
83              subscriber.onCompleted();
84          }
85  
86          @Override
87          public void onError(Throwable e) {
88              subscriber.onError(e);
89          }
90  
91          @Override
92          public void onNext(T t) {
93              subscriber.onNext(t);
94          }
95  
96      }
97  
98  }