1 package com.github.davidmoten.rx2;
2
3 import java.util.concurrent.Callable;
4
5 import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
6 import com.github.davidmoten.rx2.flowable.Transformers;
7 import com.github.davidmoten.rx2.functions.Consumer3;
8
9 import io.reactivex.BackpressureStrategy;
10 import io.reactivex.FlowableEmitter;
11 import io.reactivex.FlowableTransformer;
12 import io.reactivex.functions.BiConsumer;
13 import io.reactivex.functions.BiPredicate;
14 import io.reactivex.functions.Function3;
15
16 public final class StateMachine {
17
18 private StateMachine() {
19
20 }
21
22 public interface Transition<State, In, Out> extends Function3<State, In, FlowableEmitter<Out>, State> {
23
24
25 @Override
26 State apply(State state, In value, FlowableEmitter<Out> FlowableEmitter);
27
28 }
29
30 public interface Transition2<State, In, Out> extends Function3<State, In, Emitter<Out>, State> {
31
32
33 @Override
34 State apply(State state, In value, Emitter<Out> emitter);
35
36 }
37
38 public interface Completion<State, Out> extends BiPredicate<State, FlowableEmitter<Out>> {
39
40
41 @Override
42 boolean test(State state, FlowableEmitter<Out> emitter);
43
44 }
45
46 public interface Completion2<State, Out> extends BiConsumer<State, Emitter<Out>> {
47
48
49 @Override
50 void accept(State state, Emitter<Out> emitter);
51
52 }
53
54
55 public interface Errored<State, Out> extends Consumer3<State, Throwable, Emitter<Out>> {
56
57
58 @Override
59 void accept(State state, Throwable error, Emitter<Out> emitter);
60
61 }
62
63 public static Builder builder() {
64 return new Builder();
65 }
66
67 public static final class Builder {
68
69 private Builder() {
70
71 }
72
73 public <State> Builder2<State> initialStateFactory(Callable<State> initialState) {
74 return new Builder2<State>(initialState);
75 }
76
77 public <State> Builder2<State> initialState(final State initialState) {
78 return initialStateFactory(Callables.constant(initialState));
79 }
80
81 }
82
83 public static final class Builder2<State> {
84
85 private final Callable<State> initialState;
86
87 private Builder2(Callable<State> initialState) {
88 this.initialState = initialState;
89 }
90
91 public <In, Out> Builder3<State, In, Out> transition(Transition<State, In, Out> transition) {
92 return new Builder3<State, In, Out>(initialState, transition);
93 }
94
95 }
96
97 public static final class Builder3<State, In, Out> {
98
99 private static final int DEFAULT_REQUEST_SIZE = 1;
100
101 private final Callable<State> initialState;
102 private final Transition<State, In, Out> transition;
103 private Completion<State, Out> completion = CompletionAlwaysTrueHolder.instance();
104 private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
105 private int requestBatchSize = DEFAULT_REQUEST_SIZE;
106
107 private Builder3(Callable<State> initialState, Transition<State, In, Out> transition) {
108 this.initialState = initialState;
109 this.transition = transition;
110 }
111
112 public Builder3<State, In, Out> completion(Completion<State, Out> completion) {
113 this.completion = completion;
114 return this;
115 }
116
117 public Builder3<State, In, Out> backpressureStrategy(BackpressureStrategy backpressureStrategy) {
118 this.backpressureStrategy = backpressureStrategy;
119 return this;
120 }
121
122 public Builder3<State, In, Out> requestBatchSize(int value) {
123 this.requestBatchSize = value;
124 return this;
125 }
126
127 public FlowableTransformer<In, Out> build() {
128 return Transformers.stateMachine(initialState, transition, completion, backpressureStrategy,
129 requestBatchSize);
130 }
131
132 }
133
134 @VisibleForTesting
135 static final class CompletionAlwaysTrueHolder {
136
137 private CompletionAlwaysTrueHolder() {
138
139 }
140
141 private static final Completion<Object, Object> INSTANCE = new Completion<Object, Object>() {
142 @Override
143 public boolean test(Object t1, FlowableEmitter<Object> t2) {
144 return true;
145 }
146 };
147
148 @SuppressWarnings("unchecked")
149 static <State, Out> Completion<State, Out> instance() {
150 return (Completion<State, Out>) INSTANCE;
151 }
152 }
153
154 public static interface Emitter<T> {
155 void onNext_(T t);
156
157 void onError_(Throwable e);
158
159 void onComplete_();
160
161 void cancel_();
162 }
163
164 }