View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.util.concurrent.Callable;
4   import java.util.regex.Pattern;
5   
6   import com.github.davidmoten.rx2.Callables;
7   import com.github.davidmoten.rx2.flowable.Transformers;
8   
9   import io.reactivex.BackpressureStrategy;
10  import io.reactivex.FlowableEmitter;
11  import io.reactivex.FlowableTransformer;
12  import io.reactivex.functions.BiPredicate;
13  import io.reactivex.functions.Function3;
14  
15  public final class TransformerStringSplit {
16  
17      private TransformerStringSplit() {
18          // prevent instantiation
19      }
20  
21      public static <T> FlowableTransformer<String, String> split(final String pattern, final Pattern compiledPattern,
22              final BackpressureStrategy backpressureStrategy, int batchSize) {
23          Callable<String> initialState = Callables.constant(null);
24          Function3<String, String, FlowableEmitter<String>, String> transition = new Function3<String, String, FlowableEmitter<String>, String>() {
25  
26              @Override
27              public String apply(String leftOver, String s, FlowableEmitter<String> emitter) {
28                  // prepend leftover to the string before splitting
29                  if (leftOver != null) {
30                      s = leftOver + s;
31                  }
32  
33                  String[] parts;
34                  if (compiledPattern != null) {
35                      parts = compiledPattern.split(s, -1);
36                  } else {
37                      parts = s.split(pattern, -1);
38                  }
39  
40                  // can emit all parts except the last part because it hasn't
41                  // been terminated by the pattern/end-of-stream yet
42                  for (int i = 0; i < parts.length - 1; i++) {
43                      if (emitter.isCancelled()) {
44                          // won't be used so can return null
45                          return null;
46                      }
47                      emitter.onNext(parts[i]);
48                  }
49  
50                  // we have to assign the last part as leftOver because we
51                  // don't know if it has been terminated yet
52                  return parts[parts.length - 1];
53              }
54          };
55  
56          BiPredicate<String, FlowableEmitter<String>> completion = new BiPredicate<String, FlowableEmitter<String>>() {
57  
58              @Override
59              public boolean test(String leftOver, FlowableEmitter<String> emitter) {
60                  if (leftOver != null && !emitter.isCancelled())
61                      emitter.onNext(leftOver);
62                  // TODO is this check needed?
63                  if (!emitter.isCancelled())
64                      emitter.onComplete();
65                  return true;
66              }
67          };
68          return Transformers.stateMachine(initialState, transition, completion, backpressureStrategy, batchSize);
69      }
70  
71  }