View Javadoc
1   package com.github.davidmoten.rx2;
2   
3   import java.util.concurrent.Callable;
4   
5   import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
6   import com.github.davidmoten.rx2.flowable.Transformers;
7   import com.github.davidmoten.rx2.functions.Consumer3;
8   
9   import io.reactivex.BackpressureStrategy;
10  import io.reactivex.FlowableEmitter;
11  import io.reactivex.FlowableTransformer;
12  import io.reactivex.functions.BiConsumer;
13  import io.reactivex.functions.BiPredicate;
14  import io.reactivex.functions.Function3;
15  
16  public final class StateMachine {
17  
18      private StateMachine() {
19          // prevent instantiation
20      }
21  
22      public interface Transition<State, In, Out> extends Function3<State, In, FlowableEmitter<Out>, State> {
23  
24          // override so IDEs have better suggestions for parameters
25          @Override
26          State apply(State state, In value, FlowableEmitter<Out> FlowableEmitter);
27  
28      }
29  
30      public interface Transition2<State, In, Out> extends Function3<State, In, Emitter<Out>, State> {
31  
32          // override so IDEs have better suggestions for parameters
33          @Override
34          State apply(State state, In value, Emitter<Out> emitter);
35  
36      }
37      
38      public interface Completion<State, Out> extends BiPredicate<State, FlowableEmitter<Out>> {
39  
40          // override so IDEs have better suggestions for parameters
41          @Override
42          boolean test(State state, FlowableEmitter<Out> emitter);
43  
44      }
45  
46      public interface Completion2<State, Out> extends BiConsumer<State, Emitter<Out>> {
47  
48          // override so IDEs have better suggestions for parameters
49          @Override
50          void accept(State state, Emitter<Out> emitter);
51  
52      }
53  
54      
55      public interface Errored<State, Out> extends Consumer3<State, Throwable, Emitter<Out>> {
56  
57          // override so IDEs have better suggestions for parameters
58          @Override
59          void accept(State state, Throwable error, Emitter<Out> emitter);
60  
61      }
62  
63      public static Builder builder() {
64          return new Builder();
65      }
66  
67      public static final class Builder {
68  
69          private Builder() {
70              // prevent instantiation from other packages
71          }
72  
73          public <State> Builder2<State> initialStateFactory(Callable<State> initialState) {
74              return new Builder2<State>(initialState);
75          }
76  
77          public <State> Builder2<State> initialState(final State initialState) {
78              return initialStateFactory(Callables.constant(initialState));
79          }
80  
81      }
82  
83      public static final class Builder2<State> {
84  
85          private final Callable<State> initialState;
86  
87          private Builder2(Callable<State> initialState) {
88              this.initialState = initialState;
89          }
90  
91          public <In, Out> Builder3<State, In, Out> transition(Transition<State, In, Out> transition) {
92              return new Builder3<State, In, Out>(initialState, transition);
93          }
94  
95      }
96  
97      public static final class Builder3<State, In, Out> {
98  
99          private static final int DEFAULT_REQUEST_SIZE = 1;
100 
101         private final Callable<State> initialState;
102         private final Transition<State, In, Out> transition;
103         private Completion<State, Out> completion = CompletionAlwaysTrueHolder.instance();
104         private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
105         private int requestBatchSize = DEFAULT_REQUEST_SIZE;
106 
107         private Builder3(Callable<State> initialState, Transition<State, In, Out> transition) {
108             this.initialState = initialState;
109             this.transition = transition;
110         }
111 
112         public Builder3<State, In, Out> completion(Completion<State, Out> completion) {
113             this.completion = completion;
114             return this;
115         }
116 
117         public Builder3<State, In, Out> backpressureStrategy(BackpressureStrategy backpressureStrategy) {
118             this.backpressureStrategy = backpressureStrategy;
119             return this;
120         }
121 
122         public Builder3<State, In, Out> requestBatchSize(int value) {
123             this.requestBatchSize = value;
124             return this;
125         }
126 
127         public FlowableTransformer<In, Out> build() {
128             return Transformers.stateMachine(initialState, transition, completion, backpressureStrategy,
129                     requestBatchSize);
130         }
131 
132     }
133 
134     @VisibleForTesting
135     static final class CompletionAlwaysTrueHolder {
136 
137         private CompletionAlwaysTrueHolder() {
138             // prevent instantiation
139         }
140 
141         private static final Completion<Object, Object> INSTANCE = new Completion<Object, Object>() {
142             @Override
143             public boolean test(Object t1, FlowableEmitter<Object> t2) {
144                 return true;
145             }
146         };
147 
148         @SuppressWarnings("unchecked")
149         static <State, Out> Completion<State, Out> instance() {
150             return (Completion<State, Out>) INSTANCE;
151         }
152     }
153 
154     public static interface Emitter<T> {
155         void onNext_(T t);
156 
157         void onError_(Throwable e);
158 
159         void onComplete_();
160 
161         void cancel_();
162     }
163 
164 }