View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.io.*;
4   import java.nio.charset.Charset;
5   import java.util.concurrent.atomic.AtomicReference;
6   
7   import org.reactivestreams.*;
8   
9   import io.reactivex.FlowableSubscriber;
10  import io.reactivex.internal.subscriptions.SubscriptionHelper;
11  
12  /**
13   * @author David Karnok
14   *
15   */
16  public final class FlowableStringInputStream {
17  
18      private FlowableStringInputStream() {
19          throw new IllegalStateException("No instances!");
20      }
21  
22      public static InputStream createInputStream(Publisher<String> source, Charset charset) {
23          StringInputStream parent = new StringInputStream(charset);
24          source.subscribe(parent);
25          return parent;
26      }
27  
28      static final class StringInputStream extends InputStream
29      implements FlowableSubscriber<String> {
30  
31          final AtomicReference<Subscription> upstream;
32  
33          final Charset charset;
34  
35          volatile byte[] bytes;
36  
37          int index;
38  
39          volatile boolean done;
40          Throwable error;
41  
42          StringInputStream(Charset charset) {
43              this.charset = charset;
44              upstream = new AtomicReference<Subscription>();
45          }
46  
47          @Override
48          public void onSubscribe(Subscription s) {
49              if (SubscriptionHelper.setOnce(upstream, s)) {
50                  s.request(1);
51              }
52          }
53  
54          @Override
55          public void onNext(String t) {
56              bytes = t.getBytes(charset);
57              synchronized (this) {
58                  notifyAll();
59              }
60          }
61  
62          @Override
63          public void onError(Throwable t) {
64              error = t;
65              done = true;
66              synchronized (this) {
67                  notifyAll();
68              }
69          }
70  
71          @Override
72          public void onComplete() {
73              done = true;
74              synchronized (this) {
75                  notifyAll();
76              }
77          }
78  
79          @Override
80          public int read() throws IOException {
81              for (;;) {
82                  byte[] a = awaitBufferIfNecessary();
83                  if (a == null) {
84                      Throwable ex = error;
85                      if (ex != null) {
86                          if (ex instanceof IOException) {
87                              throw (IOException)ex;
88                          }
89                          throw new IOException(ex);
90                      }
91                      return -1;
92                  }
93                  int idx = index;
94                  if (idx == a.length) {
95                      index = 0;
96                      bytes = null;
97                      upstream.get().request(1);
98                  } else {
99                      int result = a[idx] & 0xFF;
100                     index = idx + 1;
101                     return result;
102                 }
103             }
104         }
105 
106         byte[] awaitBufferIfNecessary() throws IOException {
107             byte[] a = bytes;
108             if (a == null) {
109                 synchronized (this) {
110                     for (;;) {
111                         boolean d = done;
112                         a = bytes;
113                         if (a != null) {
114                             break;
115                         }
116                         if (d || upstream.get() == SubscriptionHelper.CANCELLED) {
117                             break;
118                         }
119                         try {
120                             wait();
121                         } catch (InterruptedException ex) {
122                             if (upstream.get() != SubscriptionHelper.CANCELLED) {
123                                 InterruptedIOException exc = new InterruptedIOException();
124                                 exc.initCause(ex);
125                                 throw exc;
126                             }
127                             break;
128                         }
129                     } 
130                 }
131             }
132             return a;
133         }
134 
135         @Override
136         public int read(byte[] b, int off, int len) throws IOException {
137             if (off < 0 || len < 0 || off >= b.length || off + len > b.length) {
138                 throw new IndexOutOfBoundsException("b.length=" + b.length + ", off=" + off + ", len=" + len);
139             }
140             for (;;) {
141                 byte[] a = awaitBufferIfNecessary();
142                 if (a == null) {
143                     Throwable ex = error;
144                     if (ex != null) {
145                         if (ex instanceof IOException) {
146                             throw (IOException)ex;
147                         }
148                         throw new IOException(ex);
149                     }
150                     return -1;
151                 }
152                 int idx = index;
153                 if (idx == a.length) {
154                     index = 0;
155                     bytes = null;
156                     upstream.get().request(1);
157                 } else {
158                     int r = 0;
159                     while (idx < a.length && len > 0) {
160                         b[off] = a[idx];
161                         idx++;
162                         off++;
163                         r++;
164                         len--;
165                     }
166                     index = idx;
167                     return r;
168                 }
169             }
170         }
171 
172         @Override
173         public int available() throws IOException {
174             byte[] a = bytes;
175             int idx = index;
176             return a != null ? Math.max(0, a.length - idx) : 0;
177         }
178 
179         @Override
180         public void close() throws IOException {
181             SubscriptionHelper.cancel(upstream);
182             synchronized (this) {
183                 notifyAll();
184             }
185         }
186     }
187 }