1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.nio.ByteBuffer;
4 import java.nio.CharBuffer;
5 import java.nio.charset.CharacterCodingException;
6 import java.nio.charset.CharsetDecoder;
7 import java.nio.charset.CoderResult;
8 import java.util.concurrent.Callable;
9
10 import io.reactivex.BackpressureStrategy;
11 import io.reactivex.FlowableEmitter;
12 import io.reactivex.FlowableTransformer;
13 import io.reactivex.functions.BiPredicate;
14 import io.reactivex.functions.Function3;
15
16 public final class TransformerDecode {
17
18 private TransformerDecode() {
19
20 }
21
22 public static FlowableTransformer<byte[], String> decode(final CharsetDecoder decoder,
23 BackpressureStrategy backpressureStrategy, int batchSize) {
24 Callable<ByteBuffer> initialState = new Callable<ByteBuffer>() {
25
26 @Override
27 public ByteBuffer call() {
28 return null;
29 }
30 };
31 Function3<ByteBuffer, byte[], FlowableEmitter<String>, ByteBuffer> transition = new Function3<ByteBuffer, byte[], FlowableEmitter<String>, ByteBuffer>() {
32
33 @Override
34 public ByteBuffer apply(ByteBuffer last, byte[] next, FlowableEmitter<String> o) {
35 Result result = process(next, last, false, decoder, o);
36 return result.leftOver;
37 }
38 };
39 BiPredicate<ByteBuffer, FlowableEmitter<String>> completion = new BiPredicate<ByteBuffer, FlowableEmitter<String>>() {
40
41 @Override
42 public boolean test(ByteBuffer last, FlowableEmitter<String> subscriber) {
43 return process(null, last, true, decoder, subscriber).canEmitFurther;
44 }
45 };
46
47 return com.github.davidmoten.rx2.flowable.Transformers.stateMachine(initialState, transition, completion,
48 backpressureStrategy, batchSize);
49 }
50
51 private static final class Result {
52 final ByteBuffer leftOver;
53 final boolean canEmitFurther;
54
55 Result(ByteBuffer leftOver, boolean canEmitFurther) {
56 this.leftOver = leftOver;
57 this.canEmitFurther = canEmitFurther;
58 }
59
60 }
61
62 public static Result process(byte[] next, ByteBuffer last, boolean endOfInput, CharsetDecoder decoder,
63 FlowableEmitter<String> emitter) {
64 if (emitter.isCancelled())
65 return new Result(null, false);
66
67 ByteBuffer bb;
68 if (last != null) {
69 if (next != null) {
70
71 bb = ByteBuffer.allocate(last.remaining() + next.length);
72 bb.put(last);
73 bb.put(next);
74 bb.flip();
75 } else {
76 bb = last;
77 }
78 } else {
79 if (next != null) {
80 bb = ByteBuffer.wrap(next);
81 } else {
82 return new Result(null, true);
83 }
84 }
85
86 CharBuffer cb = CharBuffer.allocate((int) (bb.limit() * decoder.averageCharsPerByte()));
87 CoderResult cr = decoder.decode(bb, cb, endOfInput);
88 cb.flip();
89
90 if (cr.isError()) {
91 try {
92 cr.throwException();
93 } catch (CharacterCodingException e) {
94 emitter.onError(e);
95 return new Result(null, false);
96 }
97 }
98
99 ByteBuffer leftOver;
100 if (bb.remaining() > 0) {
101 leftOver = bb;
102 } else {
103 leftOver = null;
104 }
105
106 String string = cb.toString();
107 if (!string.isEmpty())
108 emitter.onNext(string);
109
110 return new Result(leftOver, true);
111 }
112
113 }