1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.io.*;
4 import java.nio.charset.Charset;
5 import java.util.concurrent.atomic.AtomicReference;
6
7 import org.reactivestreams.*;
8
9 import io.reactivex.FlowableSubscriber;
10 import io.reactivex.internal.subscriptions.SubscriptionHelper;
11
12
13
14
15
16 public final class FlowableStringInputStream {
17
18 private FlowableStringInputStream() {
19 throw new IllegalStateException("No instances!");
20 }
21
22 public static InputStream createInputStream(Publisher<String> source, Charset charset) {
23 StringInputStream parent = new StringInputStream(charset);
24 source.subscribe(parent);
25 return parent;
26 }
27
28 static final class StringInputStream extends InputStream
29 implements FlowableSubscriber<String> {
30
31 final AtomicReference<Subscription> upstream;
32
33 final Charset charset;
34
35 volatile byte[] bytes;
36
37 int index;
38
39 volatile boolean done;
40 Throwable error;
41
42 StringInputStream(Charset charset) {
43 this.charset = charset;
44 upstream = new AtomicReference<Subscription>();
45 }
46
47 @Override
48 public void onSubscribe(Subscription s) {
49 if (SubscriptionHelper.setOnce(upstream, s)) {
50 s.request(1);
51 }
52 }
53
54 @Override
55 public void onNext(String t) {
56 bytes = t.getBytes(charset);
57 synchronized (this) {
58 notifyAll();
59 }
60 }
61
62 @Override
63 public void onError(Throwable t) {
64 error = t;
65 done = true;
66 synchronized (this) {
67 notifyAll();
68 }
69 }
70
71 @Override
72 public void onComplete() {
73 done = true;
74 synchronized (this) {
75 notifyAll();
76 }
77 }
78
79 @Override
80 public int read() throws IOException {
81 for (;;) {
82 byte[] a = awaitBufferIfNecessary();
83 if (a == null) {
84 Throwable ex = error;
85 if (ex != null) {
86 if (ex instanceof IOException) {
87 throw (IOException)ex;
88 }
89 throw new IOException(ex);
90 }
91 return -1;
92 }
93 int idx = index;
94 if (idx == a.length) {
95 index = 0;
96 bytes = null;
97 upstream.get().request(1);
98 } else {
99 int result = a[idx] & 0xFF;
100 index = idx + 1;
101 return result;
102 }
103 }
104 }
105
106 byte[] awaitBufferIfNecessary() throws IOException {
107 byte[] a = bytes;
108 if (a == null) {
109 synchronized (this) {
110 for (;;) {
111 boolean d = done;
112 a = bytes;
113 if (a != null) {
114 break;
115 }
116 if (d || upstream.get() == SubscriptionHelper.CANCELLED) {
117 break;
118 }
119 try {
120 wait();
121 } catch (InterruptedException ex) {
122 if (upstream.get() != SubscriptionHelper.CANCELLED) {
123 InterruptedIOException exc = new InterruptedIOException();
124 exc.initCause(ex);
125 throw exc;
126 }
127 break;
128 }
129 }
130 }
131 }
132 return a;
133 }
134
135 @Override
136 public int read(byte[] b, int off, int len) throws IOException {
137 if (off < 0 || len < 0 || off >= b.length || off + len > b.length) {
138 throw new IndexOutOfBoundsException("b.length=" + b.length + ", off=" + off + ", len=" + len);
139 }
140 for (;;) {
141 byte[] a = awaitBufferIfNecessary();
142 if (a == null) {
143 Throwable ex = error;
144 if (ex != null) {
145 if (ex instanceof IOException) {
146 throw (IOException)ex;
147 }
148 throw new IOException(ex);
149 }
150 return -1;
151 }
152 int idx = index;
153 if (idx == a.length) {
154 index = 0;
155 bytes = null;
156 upstream.get().request(1);
157 } else {
158 int r = 0;
159 while (idx < a.length && len > 0) {
160 b[off] = a[idx];
161 idx++;
162 off++;
163 r++;
164 len--;
165 }
166 index = idx;
167 return r;
168 }
169 }
170 }
171
172 @Override
173 public int available() throws IOException {
174 byte[] a = bytes;
175 int idx = index;
176 return a != null ? Math.max(0, a.length - idx) : 0;
177 }
178
179 @Override
180 public void close() throws IOException {
181 SubscriptionHelper.cancel(upstream);
182 synchronized (this) {
183 notifyAll();
184 }
185 }
186 }
187 }