1 package com.github.davidmoten.rx2;
2
3 import java.util.concurrent.Callable;
4
5 import org.reactivestreams.Publisher;
6
7 import com.github.davidmoten.rx2.StateMachine.Completion2;
8 import com.github.davidmoten.rx2.StateMachine.Errored;
9 import com.github.davidmoten.rx2.StateMachine.Transition2;
10 import com.github.davidmoten.rx2.internal.flowable.FlowableStateMachine;
11
12 import io.reactivex.BackpressureStrategy;
13 import io.reactivex.Flowable;
14 import io.reactivex.FlowableTransformer;
15
16 public class StateMachine2 {
17
18 public static Builder builder() {
19 return new Builder();
20 }
21
22 public static final class Builder {
23
24 private Builder() {
25
26 }
27
28 public <State> Builder2<State> initialStateFactory(Callable<State> initialState) {
29 return new Builder2<State>(initialState);
30 }
31
32 public <State> Builder2<State> initialState(final State initialState) {
33 return initialStateFactory(Callables.constant(initialState));
34 }
35
36 }
37
38 public static final class Builder2<State> {
39
40 private final Callable<State> initialState;
41
42 private Builder2(Callable<State> initialState) {
43 this.initialState = initialState;
44 }
45
46 public <In, Out> Builder3<State, In, Out> transition(Transition2<State, In, Out> transition) {
47 return new Builder3<State, In, Out>(initialState, transition);
48 }
49
50 }
51
52 public static final class Builder3<State, In, Out> {
53
54 private static final int DEFAULT_REQUEST_SIZE = 1;
55
56 private final Callable<State> initialState;
57 private final Transition2<State, In, Out> transition;
58 private Completion2<State, Out> completion = null;
59 private Errored<State, Out> errored = null;
60 private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
61 private int requestBatchSize = DEFAULT_REQUEST_SIZE;
62
63 private Builder3(Callable<State> initialState, Transition2<State, In, Out> transition) {
64 this.initialState = initialState;
65 this.transition = transition;
66 }
67
68 public Builder3<State, In, Out> completion(Completion2<State, Out> completion) {
69 this.completion = completion;
70 return this;
71 }
72
73 public Builder3<State, In, Out> errored(Errored<State, Out> errored) {
74 this.errored = errored;
75 return this;
76 }
77
78 public Builder3<State, In, Out> backpressureStrategy(BackpressureStrategy backpressureStrategy) {
79 this.backpressureStrategy = backpressureStrategy;
80 return this;
81 }
82
83 public Builder3<State, In, Out> requestBatchSize(int value) {
84 this.requestBatchSize = value;
85 return this;
86 }
87
88 public FlowableTransformer<In, Out> build() {
89 return new FlowableTransformer<In, Out>() {
90
91 @Override
92 public Publisher<Out> apply(Flowable<In> source) {
93 return new FlowableStateMachine<State, In, Out>(source, initialState, transition, completion,
94 errored, backpressureStrategy, requestBatchSize);
95 }
96 };
97 }
98
99 }
100
101 }