View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import com.github.davidmoten.rx.util.BackpressureStrategy;
4   import com.github.davidmoten.util.Preconditions;
5   
6   import rx.Notification;
7   import rx.Observable;
8   import rx.Observable.OnSubscribe;
9   import rx.Observable.Transformer;
10  import rx.Subscriber;
11  import rx.functions.Func0;
12  import rx.functions.Func1;
13  import rx.functions.Func2;
14  import rx.functions.Func3;
15  
16  public final class TransformerStateMachine<State, In, Out> implements Transformer<In, Out> {
17  
18      private final Func0<? extends State> initialState;
19      private final Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition;
20      private final Func2<? super State, ? super Subscriber<Out>, Boolean> completion;
21      private final BackpressureStrategy backpressureStrategy;
22      private final int initialRequest;
23  
24      private TransformerStateMachine(Func0<? extends State> initialState,
25              Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition,
26              Func2<? super State, ? super Subscriber<Out>, Boolean> completion,
27              BackpressureStrategy backpressureStrategy, int initialRequest) {
28          Preconditions.checkNotNull(initialState);
29          Preconditions.checkNotNull(transition);
30          Preconditions.checkNotNull(completion);
31          Preconditions.checkNotNull(backpressureStrategy);
32          Preconditions.checkArgument(initialRequest > 0, "initialRequest must be greater than zero");
33          this.initialState = initialState;
34          this.transition = transition;
35          this.completion = completion;
36          this.backpressureStrategy = backpressureStrategy;
37          this.initialRequest = initialRequest;
38      }
39  
40      public static <State, In, Out> Transformer<In, Out> create(Func0<? extends State> initialState,
41              Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition,
42              Func2<? super State, ? super Subscriber<Out>, Boolean> completion,
43              BackpressureStrategy backpressureStrategy, int initialRequest) {
44          return new TransformerStateMachine<State, In, Out>(initialState, transition, completion,
45                  backpressureStrategy, initialRequest);
46      }
47  
48      @Override
49      public Observable<Out> call(final Observable<In> source) {
50          // use defer so we can have a single state reference for each
51          // subscription
52          return Observable.defer(new Func0<Observable<Out>>() {
53              @Override
54              public Observable<Out> call() {
55                  Mutable<State> state = new Mutable<State>(initialState.call());
56                  return source.materialize()
57                          // do state transitions and emit notifications
58                          // use flatMap to emit notification values
59                          .flatMap(execute(transition, completion, state, backpressureStrategy),
60                                  initialRequest)
61                          // complete if we encounter an unsubscribed sentinel
62                          .takeWhile(NOT_UNSUBSCRIBED)
63                          // flatten notifications to a stream which will enable
64                          // early termination from the state machine if desired
65                          .dematerialize();
66              }
67          });
68      }
69  
70      private static <State, Out, In> Func1<Notification<In>, Observable<Notification<Out>>> execute(
71              final Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition,
72              final Func2<? super State, ? super Subscriber<Out>, Boolean> completion,
73              final Mutable<State> state, final BackpressureStrategy backpressureStrategy) {
74  
75          return new Func1<Notification<In>, Observable<Notification<Out>>>() {
76  
77              @Override
78              public Observable<Notification<Out>> call(final Notification<In> in) {
79  
80                  Observable<Notification<Out>> o = Observable
81                          .create(new OnSubscribe<Notification<Out>>() {
82  
83                      @Override
84                      public void call(Subscriber<? super Notification<Out>> subscriber) {
85                          Subscriber<Out> w = wrap(subscriber);
86                          if (in.hasValue()) {
87                              state.value = transition.call(state.value, in.getValue(), w);
88                              if (!subscriber.isUnsubscribed())
89                                  subscriber.onCompleted();
90                              else {
91                                  // this is a special emission to indicate that the
92                                  // transition called unsubscribe. It will be
93                                  // filtered later.
94                                  subscriber.onNext(UnsubscribedNotificationHolder.<Out>unsubscribedNotification());
95                              }
96                          } else if (in.isOnCompleted()) {
97                              if (completion.call(state.value, w) && !subscriber.isUnsubscribed()) {
98                                  w.onCompleted();
99                              }
100                         } else if (!subscriber.isUnsubscribed()) {
101                             w.onError(in.getThrowable());
102                         }
103                     }
104                 });
105                 // because the observable we just created does not
106                 // support backpressure we need to apply a backpressure
107                 // handling operator. This operator is supplied by the
108                 // user.
109                 return applyBackpressure(o, backpressureStrategy);
110             }
111 
112         };
113     }
114     
115     private static final class UnsubscribedNotificationHolder {
116         private static final Notification<Object> INSTANCE = Notification.createOnNext(null);
117         
118         @SuppressWarnings("unchecked")
119         static <T> Notification<T> unsubscribedNotification() {
120             return (Notification<T>) INSTANCE;
121         }
122     }
123 
124     private static <Out> Observable<Notification<Out>> applyBackpressure(
125             Observable<Notification<Out>> o, final BackpressureStrategy backpressureStrategy) {
126         if (backpressureStrategy == BackpressureStrategy.BUFFER)
127             return o.onBackpressureBuffer();
128         else if (backpressureStrategy == BackpressureStrategy.DROP)
129             return o.onBackpressureDrop();
130         else if (backpressureStrategy == BackpressureStrategy.LATEST)
131             return o.onBackpressureLatest();
132         else
133             throw new IllegalArgumentException(
134                     "backpressure strategy not supported: " + backpressureStrategy);
135     }
136 
137     private static final Func1<Notification<?>, Boolean> NOT_UNSUBSCRIBED = new Func1<Notification<?>, Boolean>() {
138 
139         @Override
140         public Boolean call(Notification<?> t) {
141             return t != UnsubscribedNotificationHolder.unsubscribedNotification();
142         }
143 
144     };
145 
146     private static final class Mutable<T> {
147         T value;
148 
149         Mutable(T value) {
150             this.value = value;
151         }
152     }
153 
154     private static <Out> NotificationSubscriber<Out> wrap(
155             Subscriber<? super Notification<Out>> sub) {
156         return new NotificationSubscriber<Out>(sub);
157     }
158 
159     private static final class NotificationSubscriber<Out> extends Subscriber<Out> {
160 
161         private final Subscriber<? super Notification<Out>> sub;
162 
163         NotificationSubscriber(Subscriber<? super Notification<Out>> sub) {
164             this.sub = sub;
165             add(sub);
166         }
167 
168         @Override
169         public void onCompleted() {
170             sub.onNext(Notification.<Out> createOnCompleted());
171         }
172 
173         @Override
174         public void onError(Throwable e) {
175             sub.onNext(Notification.<Out> createOnError(e));
176         }
177 
178         @Override
179         public void onNext(Out t) {
180             sub.onNext(Notification.createOnNext(t));
181         }
182 
183     }
184 
185 }