View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.nio.ByteBuffer;
4   import java.nio.CharBuffer;
5   import java.nio.charset.CharacterCodingException;
6   import java.nio.charset.CharsetDecoder;
7   import java.nio.charset.CoderResult;
8   
9   import com.github.davidmoten.rx.Transformers;
10  
11  import rx.Observable.Transformer;
12  import rx.Subscriber;
13  import rx.functions.Func0;
14  import rx.functions.Func2;
15  import rx.functions.Func3;
16  
17  public final class TransformerDecode {
18  
19      public static Transformer<byte[], String> decode(final CharsetDecoder decoder) {
20          Func0<ByteBuffer> initialState = new Func0<ByteBuffer>() {
21  
22              @Override
23              public ByteBuffer call() {
24                  return null;
25              }
26          };
27          Func3<ByteBuffer, byte[], Subscriber<String>, ByteBuffer> transition = new Func3<ByteBuffer, byte[], Subscriber<String>, ByteBuffer>() {
28  
29              @Override
30              public ByteBuffer call(ByteBuffer last, byte[] next, Subscriber<String> o) {
31                  Result result = process(next, last, false, decoder, o);
32                  return result.leftOver;
33              }
34          };
35          Func2<ByteBuffer, Subscriber<String>, Boolean> completion = new Func2<ByteBuffer, Subscriber<String>, Boolean>() {
36  
37              @Override
38              public Boolean call(ByteBuffer last, Subscriber<String> subscriber) {
39                  return process(null, last, true, decoder, subscriber).canEmitFurther;
40              }
41          };
42  
43          return Transformers.stateMachine(initialState, transition, completion);
44      }
45  
46      private static final class Result {
47          final ByteBuffer leftOver;
48          final boolean canEmitFurther;
49  
50          Result(ByteBuffer leftOver, boolean canEmitFurther) {
51              this.leftOver = leftOver;
52              this.canEmitFurther = canEmitFurther;
53          }
54  
55      }
56  
57      public static Result process(byte[] next, ByteBuffer last, boolean endOfInput,
58              CharsetDecoder decoder, Subscriber<String> o) {
59          if (o.isUnsubscribed())
60              return new Result(null, false);
61  
62          ByteBuffer bb;
63          if (last != null) {
64              if (next != null) {
65                  // merge leftover in front of the next bytes
66                  bb = ByteBuffer.allocate(last.remaining() + next.length);
67                  bb.put(last);
68                  bb.put(next);
69                  bb.flip();
70              } else { // next == null
71                  bb = last;
72              }
73          } else { // last == null
74              if (next != null) {
75                  bb = ByteBuffer.wrap(next);
76              } else { // next == null
77                  return new Result(null, true);
78              }
79          }
80  
81          CharBuffer cb = CharBuffer.allocate((int) (bb.limit() * decoder.averageCharsPerByte()));
82          CoderResult cr = decoder.decode(bb, cb, endOfInput);
83          cb.flip();
84  
85          if (cr.isError()) {
86              try {
87                  cr.throwException();
88              } catch (CharacterCodingException e) {
89                  o.onError(e);
90                  return new Result(null, false);
91              }
92          }
93  
94          ByteBuffer leftOver;
95          if (bb.remaining() > 0) {
96              leftOver = bb;
97          } else {
98              leftOver = null;
99          }
100 
101         String string = cb.toString();
102         if (!string.isEmpty())
103             o.onNext(string);
104 
105         return new Result(leftOver, true);
106     }
107 
108 }