1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.util.concurrent.Callable;
4 import java.util.regex.Pattern;
5
6 import com.github.davidmoten.rx2.Callables;
7 import com.github.davidmoten.rx2.flowable.Transformers;
8
9 import io.reactivex.BackpressureStrategy;
10 import io.reactivex.FlowableEmitter;
11 import io.reactivex.FlowableTransformer;
12 import io.reactivex.functions.BiPredicate;
13 import io.reactivex.functions.Function3;
14
15 public final class TransformerStringSplit {
16
17 private TransformerStringSplit() {
18
19 }
20
21 public static <T> FlowableTransformer<String, String> split(final String pattern, final Pattern compiledPattern,
22 final BackpressureStrategy backpressureStrategy, int batchSize) {
23 Callable<String> initialState = Callables.constant(null);
24 Function3<String, String, FlowableEmitter<String>, String> transition = new Function3<String, String, FlowableEmitter<String>, String>() {
25
26 @Override
27 public String apply(String leftOver, String s, FlowableEmitter<String> emitter) {
28
29 if (leftOver != null) {
30 s = leftOver + s;
31 }
32
33 String[] parts;
34 if (compiledPattern != null) {
35 parts = compiledPattern.split(s, -1);
36 } else {
37 parts = s.split(pattern, -1);
38 }
39
40
41
42 for (int i = 0; i < parts.length - 1; i++) {
43 if (emitter.isCancelled()) {
44
45 return null;
46 }
47 emitter.onNext(parts[i]);
48 }
49
50
51
52 return parts[parts.length - 1];
53 }
54 };
55
56 BiPredicate<String, FlowableEmitter<String>> completion = new BiPredicate<String, FlowableEmitter<String>>() {
57
58 @Override
59 public boolean test(String leftOver, FlowableEmitter<String> emitter) {
60 if (leftOver != null && !emitter.isCancelled())
61 emitter.onNext(leftOver);
62
63 if (!emitter.isCancelled())
64 emitter.onComplete();
65 return true;
66 }
67 };
68 return Transformers.stateMachine(initialState, transition, completion, backpressureStrategy, batchSize);
69 }
70
71 }