1 package com.github.davidmoten.rx.internal.operators;
2
3 import rx.Observable;
4 import rx.Observable.OnSubscribe;
5 import rx.Observable.Operator;
6 import rx.Producer;
7 import rx.Subscriber;
8 import rx.functions.Func1;
9
10
11
12
13
14
15
16
17
18
19 public final class OperatorFromTransformer<R, T> implements Operator<R, T> {
20
21 public static <R, T> Operator<R, T> toOperator(
22 Func1<? super Observable<T>, ? extends Observable<R>> operation) {
23 return new OperatorFromTransformer<R, T>(operation);
24 }
25
26
27
28
29 private final Func1<? super Observable<T>, ? extends Observable<R>> operation;
30
31
32
33
34
35
36
37 public OperatorFromTransformer(
38 Func1<? super Observable<T>, ? extends Observable<R>> operation) {
39 this.operation = operation;
40 }
41
42 @Override
43 public Subscriber<? super T> call(Subscriber<? super R> subscriber) {
44 final ParentSubscriber<T> parent = new ParentSubscriber<T>();
45 Observable<T> middle = Observable.create(new ForwarderOnSubscribe<T>(parent));
46 subscriber.add(parent);
47 operation.call(middle).unsafeSubscribe(subscriber);
48 return parent;
49 }
50
51 static final class ForwarderOnSubscribe<T> implements OnSubscribe<T> {
52
53 private final ParentSubscriber<T> parent;
54
55 ForwarderOnSubscribe(ParentSubscriber<T> parent) {
56 this.parent = parent;
57 }
58
59 @Override
60 public void call(Subscriber<? super T> sub) {
61 parent.subscriber = sub;
62 sub.setProducer(new Producer() {
63
64 @Override
65 public void request(long n) {
66 parent.requestMore(n);
67 }
68 });
69 }
70 }
71
72 static final class ParentSubscriber<T> extends Subscriber<T> {
73
74
75 volatile Subscriber<? super T> subscriber;
76
77 void requestMore(long n) {
78 request(n);
79 }
80
81 @Override
82 public void onCompleted() {
83 subscriber.onCompleted();
84 }
85
86 @Override
87 public void onError(Throwable e) {
88 subscriber.onError(e);
89 }
90
91 @Override
92 public void onNext(T t) {
93 subscriber.onNext(t);
94 }
95
96 }
97
98 }