View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.nio.ByteBuffer;
4   import java.nio.CharBuffer;
5   import java.nio.charset.CharacterCodingException;
6   import java.nio.charset.CharsetDecoder;
7   import java.nio.charset.CoderResult;
8   import java.util.concurrent.Callable;
9   
10  import io.reactivex.BackpressureStrategy;
11  import io.reactivex.FlowableEmitter;
12  import io.reactivex.FlowableTransformer;
13  import io.reactivex.functions.BiPredicate;
14  import io.reactivex.functions.Function3;
15  
16  public final class TransformerDecode {
17  
18      private TransformerDecode() {
19          // prevent instantiation
20      }
21  
22      public static FlowableTransformer<byte[], String> decode(final CharsetDecoder decoder,
23              BackpressureStrategy backpressureStrategy, int batchSize) {
24          Callable<ByteBuffer> initialState = new Callable<ByteBuffer>() {
25  
26              @Override
27              public ByteBuffer call() {
28                  return null;
29              }
30          };
31          Function3<ByteBuffer, byte[], FlowableEmitter<String>, ByteBuffer> transition = new Function3<ByteBuffer, byte[], FlowableEmitter<String>, ByteBuffer>() {
32  
33              @Override
34              public ByteBuffer apply(ByteBuffer last, byte[] next, FlowableEmitter<String> o) {
35                  Result result = process(next, last, false, decoder, o);
36                  return result.leftOver;
37              }
38          };
39          BiPredicate<ByteBuffer, FlowableEmitter<String>> completion = new BiPredicate<ByteBuffer, FlowableEmitter<String>>() {
40  
41              @Override
42              public boolean test(ByteBuffer last, FlowableEmitter<String> subscriber) {
43                  return process(null, last, true, decoder, subscriber).canEmitFurther;
44              }
45          };
46  
47          return com.github.davidmoten.rx2.flowable.Transformers.stateMachine(initialState, transition, completion,
48                  backpressureStrategy, batchSize);
49      }
50  
51      private static final class Result {
52          final ByteBuffer leftOver;
53          final boolean canEmitFurther;
54  
55          Result(ByteBuffer leftOver, boolean canEmitFurther) {
56              this.leftOver = leftOver;
57              this.canEmitFurther = canEmitFurther;
58          }
59  
60      }
61  
62      public static Result process(byte[] next, ByteBuffer last, boolean endOfInput, CharsetDecoder decoder,
63              FlowableEmitter<String> emitter) {
64          if (emitter.isCancelled())
65              return new Result(null, false);
66  
67          ByteBuffer bb;
68          if (last != null) {
69              if (next != null) {
70                  // merge leftover in front of the next bytes
71                  bb = ByteBuffer.allocate(last.remaining() + next.length);
72                  bb.put(last);
73                  bb.put(next);
74                  bb.flip();
75              } else { // next == null
76                  bb = last;
77              }
78          } else { // last == null
79              if (next != null) {
80                  bb = ByteBuffer.wrap(next);
81              } else { // next == null
82                  return new Result(null, true);
83              }
84          }
85  
86          CharBuffer cb = CharBuffer.allocate((int) (bb.limit() * decoder.averageCharsPerByte()));
87          CoderResult cr = decoder.decode(bb, cb, endOfInput);
88          cb.flip();
89  
90          if (cr.isError()) {
91              try {
92                  cr.throwException();
93              } catch (CharacterCodingException e) {
94                  emitter.onError(e);
95                  return new Result(null, false);
96              }
97          }
98  
99          ByteBuffer leftOver;
100         if (bb.remaining() > 0) {
101             leftOver = bb;
102         } else {
103             leftOver = null;
104         }
105 
106         String string = cb.toString();
107         if (!string.isEmpty())
108             emitter.onNext(string);
109 
110         return new Result(leftOver, true);
111     }
112 
113 }