1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.util.concurrent.Callable;
4
5 import com.github.davidmoten.guavamini.Preconditions;
6
7 import io.reactivex.BackpressureStrategy;
8 import io.reactivex.Flowable;
9 import io.reactivex.FlowableEmitter;
10 import io.reactivex.FlowableOnSubscribe;
11 import io.reactivex.FlowableTransformer;
12 import io.reactivex.Notification;
13 import io.reactivex.disposables.Disposable;
14 import io.reactivex.functions.BiPredicate;
15 import io.reactivex.functions.Cancellable;
16 import io.reactivex.functions.Function;
17 import io.reactivex.functions.Function3;
18 import io.reactivex.functions.Predicate;
19 import io.reactivex.plugins.RxJavaPlugins;
20
21 public final class TransformerStateMachine<State, In, Out> implements FlowableTransformer<In, Out> {
22
23 private final Callable<? extends State> initialState;
24 private final Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition;
25 private final BiPredicate<? super State, ? super FlowableEmitter<Out>> completion;
26 private final BackpressureStrategy backpressureStrategy;
27 private final int requestBatchSize;
28
29 private TransformerStateMachine(Callable<? extends State> initialState,
30 Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
31 BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
32 BackpressureStrategy backpressureStrategy, int requestBatchSize) {
33 Preconditions.checkNotNull(initialState);
34 Preconditions.checkNotNull(transition);
35 Preconditions.checkNotNull(completion);
36 Preconditions.checkNotNull(backpressureStrategy);
37 Preconditions.checkArgument(requestBatchSize > 0, "initialRequest must be greater than zero");
38 this.initialState = initialState;
39 this.transition = transition;
40 this.completion = completion;
41 this.backpressureStrategy = backpressureStrategy;
42 this.requestBatchSize = requestBatchSize;
43 }
44
45 public static <State, In, Out> FlowableTransformer<In, Out> create(Callable<? extends State> initialState,
46 Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
47 BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
48 BackpressureStrategy backpressureStrategy, int requestBatchSize) {
49 return new TransformerStateMachine<State, In, Out>(initialState, transition, completion, backpressureStrategy,
50 requestBatchSize);
51 }
52
53 @Override
54 public Flowable<Out> apply(final Flowable<In> source) {
55
56
57 return Flowable.defer(new Callable<Flowable<Out>>() {
58 @Override
59 public Flowable<Out> call() throws Exception {
60 Mutable<State> state = new Mutable<State>(initialState.call());
61 return source.materialize()
62
63
64 .flatMap(execute(transition, completion, state, backpressureStrategy), requestBatchSize)
65
66 .takeWhile(NOT_UNSUBSCRIBED)
67
68
69 .dematerialize();
70 }
71 });
72 }
73
74 private static <State, Out, In> Function<Notification<In>, Flowable<Notification<Out>>> execute(
75 final Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
76 final BiPredicate<? super State, ? super FlowableEmitter<Out>> completion, final Mutable<State> state,
77 final BackpressureStrategy backpressureStrategy) {
78
79 return new Function<Notification<In>, Flowable<Notification<Out>>>() {
80
81 @Override
82 public Flowable<Notification<Out>> apply(final Notification<In> in) {
83
84 return Flowable.create(new FlowableOnSubscribe<Notification<Out>>() {
85
86 @Override
87 public void subscribe(FlowableEmitter<Notification<Out>> emitter) throws Exception {
88 FlowableEmitter<Out> w = wrap(emitter);
89 if (in.isOnNext()) {
90 state.value = transition.apply(state.value, in.getValue(), w);
91 if (!emitter.isCancelled())
92 emitter.onComplete();
93 else {
94
95
96
97 emitter.onNext(UnsubscribedNotificationHolder.<Out>unsubscribedNotification());
98 }
99 } else if (in.isOnComplete()) {
100 if (completion.test(state.value, w) && !emitter.isCancelled()) {
101 w.onComplete();
102 }
103 } else if (!emitter.isCancelled()) {
104 w.onError(in.getError());
105 }
106 }
107
108 }, backpressureStrategy);
109 }
110 };
111 }
112
113 private static final class UnsubscribedNotificationHolder {
114 private static final Notification<Object> INSTANCE = Notification.createOnNext(new Object());
115
116 @SuppressWarnings("unchecked")
117 static <T> Notification<T> unsubscribedNotification() {
118 return (Notification<T>) INSTANCE;
119 }
120 }
121
122 private static final Predicate<Notification<?>> NOT_UNSUBSCRIBED = new Predicate<Notification<?>>() {
123
124 @Override
125 public boolean test(Notification<?> t) {
126 return t != UnsubscribedNotificationHolder.unsubscribedNotification();
127 }
128
129 };
130
131 private static final class Mutable<T> {
132 T value;
133
134 Mutable(T value) {
135 this.value = value;
136 }
137 }
138
139 private static <Out> NotificationEmitter<Out> wrap(FlowableEmitter<? super Notification<Out>> emitter) {
140 return new NotificationEmitter<Out>(emitter);
141 }
142
143 private static final class NotificationEmitter<Out> implements FlowableEmitter<Out> {
144
145 private final FlowableEmitter<? super Notification<Out>> emitter;
146
147 NotificationEmitter(FlowableEmitter<? super Notification<Out>> emitter) {
148 this.emitter = emitter;
149 }
150
151 @Override
152 public void onComplete() {
153 emitter.onNext(Notification.<Out>createOnComplete());
154 }
155
156 @Override
157 public void onError(Throwable e) {
158 if (!tryOnError(e)) {
159 RxJavaPlugins.onError(e);
160 }
161 }
162
163 @Override
164 public void onNext(Out t) {
165 emitter.onNext(Notification.createOnNext(t));
166 }
167
168 @Override
169 public void setDisposable(Disposable s) {
170 throw new UnsupportedOperationException();
171 }
172
173 @Override
174 public void setCancellable(Cancellable c) {
175 throw new UnsupportedOperationException();
176 }
177
178 @Override
179 public long requested() {
180 return emitter.requested();
181 }
182
183 @Override
184 public boolean isCancelled() {
185 return emitter.isCancelled();
186
187 }
188
189 @Override
190 public FlowableEmitter<Out> serialize() {
191 throw new UnsupportedOperationException();
192 }
193
194 @Override
195 public boolean tryOnError(Throwable e) {
196 if (emitter.isCancelled()) {
197 return false;
198 } else {
199 emitter.onNext(Notification.<Out>createOnError(e));
200 return true;
201 }
202 }
203
204 }
205
206 }