1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.nio.ByteBuffer;
4 import java.nio.CharBuffer;
5 import java.nio.charset.CharacterCodingException;
6 import java.nio.charset.CharsetDecoder;
7 import java.nio.charset.CoderResult;
8
9 import com.github.davidmoten.rx.Transformers;
10
11 import rx.Observable.Transformer;
12 import rx.Subscriber;
13 import rx.functions.Func0;
14 import rx.functions.Func2;
15 import rx.functions.Func3;
16
17 public final class TransformerDecode {
18
19 public static Transformer<byte[], String> decode(final CharsetDecoder decoder) {
20 Func0<ByteBuffer> initialState = new Func0<ByteBuffer>() {
21
22 @Override
23 public ByteBuffer call() {
24 return null;
25 }
26 };
27 Func3<ByteBuffer, byte[], Subscriber<String>, ByteBuffer> transition = new Func3<ByteBuffer, byte[], Subscriber<String>, ByteBuffer>() {
28
29 @Override
30 public ByteBuffer call(ByteBuffer last, byte[] next, Subscriber<String> o) {
31 Result result = process(next, last, false, decoder, o);
32 return result.leftOver;
33 }
34 };
35 Func2<ByteBuffer, Subscriber<String>, Boolean> completion = new Func2<ByteBuffer, Subscriber<String>, Boolean>() {
36
37 @Override
38 public Boolean call(ByteBuffer last, Subscriber<String> subscriber) {
39 return process(null, last, true, decoder, subscriber).canEmitFurther;
40 }
41 };
42
43 return Transformers.stateMachine(initialState, transition, completion);
44 }
45
46 private static final class Result {
47 final ByteBuffer leftOver;
48 final boolean canEmitFurther;
49
50 Result(ByteBuffer leftOver, boolean canEmitFurther) {
51 this.leftOver = leftOver;
52 this.canEmitFurther = canEmitFurther;
53 }
54
55 }
56
57 public static Result process(byte[] next, ByteBuffer last, boolean endOfInput,
58 CharsetDecoder decoder, Subscriber<String> o) {
59 if (o.isUnsubscribed())
60 return new Result(null, false);
61
62 ByteBuffer bb;
63 if (last != null) {
64 if (next != null) {
65
66 bb = ByteBuffer.allocate(last.remaining() + next.length);
67 bb.put(last);
68 bb.put(next);
69 bb.flip();
70 } else {
71 bb = last;
72 }
73 } else {
74 if (next != null) {
75 bb = ByteBuffer.wrap(next);
76 } else {
77 return new Result(null, true);
78 }
79 }
80
81 CharBuffer cb = CharBuffer.allocate((int) (bb.limit() * decoder.averageCharsPerByte()));
82 CoderResult cr = decoder.decode(bb, cb, endOfInput);
83 cb.flip();
84
85 if (cr.isError()) {
86 try {
87 cr.throwException();
88 } catch (CharacterCodingException e) {
89 o.onError(e);
90 return new Result(null, false);
91 }
92 }
93
94 ByteBuffer leftOver;
95 if (bb.remaining() > 0) {
96 leftOver = bb;
97 } else {
98 leftOver = null;
99 }
100
101 String string = cb.toString();
102 if (!string.isEmpty())
103 o.onNext(string);
104
105 return new Result(leftOver, true);
106 }
107
108 }