1 package com.github.davidmoten.rx.internal.operators;
2
3 import com.github.davidmoten.rx.util.BackpressureStrategy;
4 import com.github.davidmoten.util.Preconditions;
5
6 import rx.Notification;
7 import rx.Observable;
8 import rx.Observable.OnSubscribe;
9 import rx.Observable.Transformer;
10 import rx.Subscriber;
11 import rx.functions.Func0;
12 import rx.functions.Func1;
13 import rx.functions.Func2;
14 import rx.functions.Func3;
15
16 public final class TransformerStateMachine<State, In, Out> implements Transformer<In, Out> {
17
18 private final Func0<? extends State> initialState;
19 private final Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition;
20 private final Func2<? super State, ? super Subscriber<Out>, Boolean> completion;
21 private final BackpressureStrategy backpressureStrategy;
22 private final int initialRequest;
23
24 private TransformerStateMachine(Func0<? extends State> initialState,
25 Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition,
26 Func2<? super State, ? super Subscriber<Out>, Boolean> completion,
27 BackpressureStrategy backpressureStrategy, int initialRequest) {
28 Preconditions.checkNotNull(initialState);
29 Preconditions.checkNotNull(transition);
30 Preconditions.checkNotNull(completion);
31 Preconditions.checkNotNull(backpressureStrategy);
32 Preconditions.checkArgument(initialRequest > 0, "initialRequest must be greater than zero");
33 this.initialState = initialState;
34 this.transition = transition;
35 this.completion = completion;
36 this.backpressureStrategy = backpressureStrategy;
37 this.initialRequest = initialRequest;
38 }
39
40 public static <State, In, Out> Transformer<In, Out> create(Func0<? extends State> initialState,
41 Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition,
42 Func2<? super State, ? super Subscriber<Out>, Boolean> completion,
43 BackpressureStrategy backpressureStrategy, int initialRequest) {
44 return new TransformerStateMachine<State, In, Out>(initialState, transition, completion,
45 backpressureStrategy, initialRequest);
46 }
47
48 @Override
49 public Observable<Out> call(final Observable<In> source) {
50
51
52 return Observable.defer(new Func0<Observable<Out>>() {
53 @Override
54 public Observable<Out> call() {
55 Mutable<State> state = new Mutable<State>(initialState.call());
56 return source.materialize()
57
58
59 .flatMap(execute(transition, completion, state, backpressureStrategy),
60 initialRequest)
61
62 .takeWhile(NOT_UNSUBSCRIBED)
63
64
65 .dematerialize();
66 }
67 });
68 }
69
70 private static <State, Out, In> Func1<Notification<In>, Observable<Notification<Out>>> execute(
71 final Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition,
72 final Func2<? super State, ? super Subscriber<Out>, Boolean> completion,
73 final Mutable<State> state, final BackpressureStrategy backpressureStrategy) {
74
75 return new Func1<Notification<In>, Observable<Notification<Out>>>() {
76
77 @Override
78 public Observable<Notification<Out>> call(final Notification<In> in) {
79
80 Observable<Notification<Out>> o = Observable
81 .create(new OnSubscribe<Notification<Out>>() {
82
83 @Override
84 public void call(Subscriber<? super Notification<Out>> subscriber) {
85 Subscriber<Out> w = wrap(subscriber);
86 if (in.hasValue()) {
87 state.value = transition.call(state.value, in.getValue(), w);
88 if (!subscriber.isUnsubscribed())
89 subscriber.onCompleted();
90 else {
91
92
93
94 subscriber.onNext(UnsubscribedNotificationHolder.<Out>unsubscribedNotification());
95 }
96 } else if (in.isOnCompleted()) {
97 if (completion.call(state.value, w) && !subscriber.isUnsubscribed()) {
98 w.onCompleted();
99 }
100 } else if (!subscriber.isUnsubscribed()) {
101 w.onError(in.getThrowable());
102 }
103 }
104 });
105
106
107
108
109 return applyBackpressure(o, backpressureStrategy);
110 }
111
112 };
113 }
114
115 private static final class UnsubscribedNotificationHolder {
116 private static final Notification<Object> INSTANCE = Notification.createOnNext(null);
117
118 @SuppressWarnings("unchecked")
119 static <T> Notification<T> unsubscribedNotification() {
120 return (Notification<T>) INSTANCE;
121 }
122 }
123
124 private static <Out> Observable<Notification<Out>> applyBackpressure(
125 Observable<Notification<Out>> o, final BackpressureStrategy backpressureStrategy) {
126 if (backpressureStrategy == BackpressureStrategy.BUFFER)
127 return o.onBackpressureBuffer();
128 else if (backpressureStrategy == BackpressureStrategy.DROP)
129 return o.onBackpressureDrop();
130 else if (backpressureStrategy == BackpressureStrategy.LATEST)
131 return o.onBackpressureLatest();
132 else
133 throw new IllegalArgumentException(
134 "backpressure strategy not supported: " + backpressureStrategy);
135 }
136
137 private static final Func1<Notification<?>, Boolean> NOT_UNSUBSCRIBED = new Func1<Notification<?>, Boolean>() {
138
139 @Override
140 public Boolean call(Notification<?> t) {
141 return t != UnsubscribedNotificationHolder.unsubscribedNotification();
142 }
143
144 };
145
146 private static final class Mutable<T> {
147 T value;
148
149 Mutable(T value) {
150 this.value = value;
151 }
152 }
153
154 private static <Out> NotificationSubscriber<Out> wrap(
155 Subscriber<? super Notification<Out>> sub) {
156 return new NotificationSubscriber<Out>(sub);
157 }
158
159 private static final class NotificationSubscriber<Out> extends Subscriber<Out> {
160
161 private final Subscriber<? super Notification<Out>> sub;
162
163 NotificationSubscriber(Subscriber<? super Notification<Out>> sub) {
164 this.sub = sub;
165 add(sub);
166 }
167
168 @Override
169 public void onCompleted() {
170 sub.onNext(Notification.<Out> createOnCompleted());
171 }
172
173 @Override
174 public void onError(Throwable e) {
175 sub.onNext(Notification.<Out> createOnError(e));
176 }
177
178 @Override
179 public void onNext(Out t) {
180 sub.onNext(Notification.createOnNext(t));
181 }
182
183 }
184
185 }