View Javadoc
1   package com.github.davidmoten.rx2;
2   
3   import java.util.concurrent.Callable;
4   
5   import org.reactivestreams.Publisher;
6   
7   import com.github.davidmoten.rx2.StateMachine.Completion2;
8   import com.github.davidmoten.rx2.StateMachine.Errored;
9   import com.github.davidmoten.rx2.StateMachine.Transition2;
10  import com.github.davidmoten.rx2.internal.flowable.FlowableStateMachine;
11  
12  import io.reactivex.BackpressureStrategy;
13  import io.reactivex.Flowable;
14  import io.reactivex.FlowableTransformer;
15  
16  public class StateMachine2 {
17  
18      public static Builder builder() {
19          return new Builder();
20      }
21  
22      public static final class Builder {
23  
24          private Builder() {
25              // prevent instantiation from other packages
26          }
27  
28          public <State> Builder2<State> initialStateFactory(Callable<State> initialState) {
29              return new Builder2<State>(initialState);
30          }
31  
32          public <State> Builder2<State> initialState(final State initialState) {
33              return initialStateFactory(Callables.constant(initialState));
34          }
35  
36      }
37  
38      public static final class Builder2<State> {
39  
40          private final Callable<State> initialState;
41  
42          private Builder2(Callable<State> initialState) {
43              this.initialState = initialState;
44          }
45  
46          public <In, Out> Builder3<State, In, Out> transition(Transition2<State, In, Out> transition) {
47              return new Builder3<State, In, Out>(initialState, transition);
48          }
49  
50      }
51  
52      public static final class Builder3<State, In, Out> {
53  
54          private static final int DEFAULT_REQUEST_SIZE = 1;
55  
56          private final Callable<State> initialState;
57          private final Transition2<State, In, Out> transition;
58          private Completion2<State, Out> completion = null;
59          private Errored<State, Out> errored = null;
60          private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
61          private int requestBatchSize = DEFAULT_REQUEST_SIZE;
62  
63          private Builder3(Callable<State> initialState, Transition2<State, In, Out> transition) {
64              this.initialState = initialState;
65              this.transition = transition;
66          }
67  
68          public Builder3<State, In, Out> completion(Completion2<State, Out> completion) {
69              this.completion = completion;
70              return this;
71          }
72  
73          public Builder3<State, In, Out> errored(Errored<State, Out> errored) {
74              this.errored = errored;
75              return this;
76          }
77  
78          public Builder3<State, In, Out> backpressureStrategy(BackpressureStrategy backpressureStrategy) {
79              this.backpressureStrategy = backpressureStrategy;
80              return this;
81          }
82  
83          public Builder3<State, In, Out> requestBatchSize(int value) {
84              this.requestBatchSize = value;
85              return this;
86          }
87  
88          public FlowableTransformer<In, Out> build() {
89              return new FlowableTransformer<In, Out>() {
90  
91                  @Override
92                  public Publisher<Out> apply(Flowable<In> source) {
93                      return new FlowableStateMachine<State, In, Out>(source, initialState, transition, completion,
94                              errored, backpressureStrategy, requestBatchSize);
95                  }
96              };
97          }
98  
99      }
100 
101 }