View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.util.concurrent.Callable;
4   
5   import com.github.davidmoten.guavamini.Preconditions;
6   
7   import io.reactivex.BackpressureStrategy;
8   import io.reactivex.Flowable;
9   import io.reactivex.FlowableEmitter;
10  import io.reactivex.FlowableOnSubscribe;
11  import io.reactivex.FlowableTransformer;
12  import io.reactivex.Notification;
13  import io.reactivex.disposables.Disposable;
14  import io.reactivex.functions.BiPredicate;
15  import io.reactivex.functions.Cancellable;
16  import io.reactivex.functions.Function;
17  import io.reactivex.functions.Function3;
18  import io.reactivex.functions.Predicate;
19  import io.reactivex.plugins.RxJavaPlugins;
20  
21  public final class TransformerStateMachine<State, In, Out> implements FlowableTransformer<In, Out> {
22  
23      private final Callable<? extends State> initialState;
24      private final Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition;
25      private final BiPredicate<? super State, ? super FlowableEmitter<Out>> completion;
26      private final BackpressureStrategy backpressureStrategy;
27      private final int requestBatchSize;
28  
29      private TransformerStateMachine(Callable<? extends State> initialState,
30              Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
31              BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
32              BackpressureStrategy backpressureStrategy, int requestBatchSize) {
33          Preconditions.checkNotNull(initialState);
34          Preconditions.checkNotNull(transition);
35          Preconditions.checkNotNull(completion);
36          Preconditions.checkNotNull(backpressureStrategy);
37          Preconditions.checkArgument(requestBatchSize > 0, "initialRequest must be greater than zero");
38          this.initialState = initialState;
39          this.transition = transition;
40          this.completion = completion;
41          this.backpressureStrategy = backpressureStrategy;
42          this.requestBatchSize = requestBatchSize;
43      }
44  
45      public static <State, In, Out> FlowableTransformer<In, Out> create(Callable<? extends State> initialState,
46              Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
47              BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
48              BackpressureStrategy backpressureStrategy, int requestBatchSize) {
49          return new TransformerStateMachine<State, In, Out>(initialState, transition, completion, backpressureStrategy,
50                  requestBatchSize);
51      }
52  
53      @Override
54      public Flowable<Out> apply(final Flowable<In> source) {
55          // use defer so we can have a single state reference for each
56          // subscription
57          return Flowable.defer(new Callable<Flowable<Out>>() {
58              @Override
59              public Flowable<Out> call() throws Exception {
60                  Mutable<State> state = new Mutable<State>(initialState.call());
61                  return source.materialize()
62                          // do state transitions and emit notifications
63                          // use flatMap to emit notification values
64                          .flatMap(execute(transition, completion, state, backpressureStrategy), requestBatchSize)
65                          // complete if we encounter an unsubscribed sentinel
66                          .takeWhile(NOT_UNSUBSCRIBED)
67                          // flatten notifications to a stream which will enable
68                          // early termination from the state machine if desired
69                          .dematerialize();
70              }
71          });
72      }
73  
74      private static <State, Out, In> Function<Notification<In>, Flowable<Notification<Out>>> execute(
75              final Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
76              final BiPredicate<? super State, ? super FlowableEmitter<Out>> completion, final Mutable<State> state,
77              final BackpressureStrategy backpressureStrategy) {
78  
79          return new Function<Notification<In>, Flowable<Notification<Out>>>() {
80  
81              @Override
82              public Flowable<Notification<Out>> apply(final Notification<In> in) {
83  
84                  return Flowable.create(new FlowableOnSubscribe<Notification<Out>>() {
85  
86                      @Override
87                      public void subscribe(FlowableEmitter<Notification<Out>> emitter) throws Exception {
88                          FlowableEmitter<Out> w = wrap(emitter);
89                          if (in.isOnNext()) {
90                              state.value = transition.apply(state.value, in.getValue(), w);
91                              if (!emitter.isCancelled())
92                                  emitter.onComplete();
93                              else {
94                                  // this is a special emission to indicate that
95                                  // the transition called unsubscribe. It will be
96                                  // filtered later.
97                                  emitter.onNext(UnsubscribedNotificationHolder.<Out>unsubscribedNotification());
98                              }
99                          } else if (in.isOnComplete()) {
100                             if (completion.test(state.value, w) && !emitter.isCancelled()) {
101                                 w.onComplete();
102                             }
103                         } else if (!emitter.isCancelled()) {
104                             w.onError(in.getError());
105                         }
106                     }
107 
108                 }, backpressureStrategy);
109             }
110         };
111     }
112 
113     private static final class UnsubscribedNotificationHolder {
114         private static final Notification<Object> INSTANCE = Notification.createOnNext(new Object());
115 
116         @SuppressWarnings("unchecked")
117         static <T> Notification<T> unsubscribedNotification() {
118             return (Notification<T>) INSTANCE;
119         }
120     }
121 
122     private static final Predicate<Notification<?>> NOT_UNSUBSCRIBED = new Predicate<Notification<?>>() {
123 
124         @Override
125         public boolean test(Notification<?> t) {
126             return t != UnsubscribedNotificationHolder.unsubscribedNotification();
127         }
128 
129     };
130 
131     private static final class Mutable<T> {
132         T value;
133 
134         Mutable(T value) {
135             this.value = value;
136         }
137     }
138 
139     private static <Out> NotificationEmitter<Out> wrap(FlowableEmitter<? super Notification<Out>> emitter) {
140         return new NotificationEmitter<Out>(emitter);
141     }
142 
143     private static final class NotificationEmitter<Out> implements FlowableEmitter<Out> {
144 
145         private final FlowableEmitter<? super Notification<Out>> emitter;
146 
147         NotificationEmitter(FlowableEmitter<? super Notification<Out>> emitter) {
148             this.emitter = emitter;
149         }
150 
151         @Override
152         public void onComplete() {
153             emitter.onNext(Notification.<Out>createOnComplete());
154         }
155 
156         @Override
157         public void onError(Throwable e) {
158             if (!tryOnError(e)) {
159                 RxJavaPlugins.onError(e);
160             }
161         }
162 
163         @Override
164         public void onNext(Out t) {
165             emitter.onNext(Notification.createOnNext(t));
166         }
167 
168         @Override
169         public void setDisposable(Disposable s) {
170             throw new UnsupportedOperationException();
171         }
172 
173         @Override
174         public void setCancellable(Cancellable c) {
175             throw new UnsupportedOperationException();
176         }
177 
178         @Override
179         public long requested() {
180             return emitter.requested();
181         }
182 
183         @Override
184         public boolean isCancelled() {
185             return emitter.isCancelled();
186 
187         }
188 
189         @Override
190         public FlowableEmitter<Out> serialize() {
191             throw new UnsupportedOperationException();
192         }
193 
194         @Override
195         public boolean tryOnError(Throwable e) {
196             if (emitter.isCancelled()) {
197                 return false;
198             } else {
199                 emitter.onNext(Notification.<Out>createOnError(e));
200                 return true;
201             }
202         }
203 
204     }
205 
206 }