View Javadoc
1   package org.davidmoten.io.extras.internal;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.io.OutputStream;
6   import java.nio.ByteBuffer;
7   import java.util.ArrayDeque;
8   import java.util.Deque;
9   
10  import org.davidmoten.io.extras.IOFunction;
11  
12  public final class TransformedInputStream extends InputStream {
13  
14      private final InputStream is;
15      private final Deque<ByteBuffer> queue;
16      private final int bufferSize;
17      private final OutputStream out;
18      private final byte[] singleByte = new byte[1];
19      private final byte[] buffer;
20      private boolean done;
21      private boolean closed;
22      private int[] count = new int[1];
23  
24      public TransformedInputStream(InputStream is, IOFunction<? super OutputStream, ? extends OutputStream> transform,
25              int bufferSize) throws IOException {
26          this.is = is;
27          this.queue = new ArrayDeque<>();
28          this.bufferSize = bufferSize;
29          this.buffer = new byte[bufferSize];
30          this.out = transform.apply(new QueuedOutputStream(queue, count));
31      }
32  
33      @Override
34      public int read() throws IOException {
35          int n = readInternal(singleByte, 0, 1);
36          if (n == -1) {
37              return -1;
38          } else {
39              return singleByte[0] & 0xff; // must be 0-255
40          }
41      }
42  
43      @Override
44      public int read(byte[] b) throws IOException {
45          return readInternal(b, 0, b.length);
46      }
47  
48      @Override
49      public int read(byte[] b, int off, int len) throws IOException {
50          return readInternal(b, off, len);
51      }
52  
53      //bytes can be null when are skipping
54      private int readInternal(byte[] bytes, int offset, int length) throws IOException {
55          if (length == 0) {
56              return 0;
57          }
58          if (closed) {
59              throw new IOException("Stream closed");
60          }
61          while (true) {
62              ByteBuffer bb = queue.poll();
63              if (bb == null) {
64                  if (done) {
65                      return -1;
66                  } else {
67                      int n = is.read(buffer);
68                      if (n == -1) {
69                          done = true;
70                          out.close();
71                      } else {
72                          out.write(buffer, 0, n);
73                      }
74                  }
75              } else {
76                  int n = Math.min(bb.remaining(), length);
77                  if (bytes != null) {
78                      bb.get(bytes, offset, n);
79                  } else {
80                      bb.position(bb.position() + n);
81                  }
82                  count[0] -= n;
83                  if (bb.remaining() > 0) {
84                      queue.offerLast(bb);
85                  }
86                  return n;
87              }
88          }
89      }
90  
91      @Override
92      public long skip(long n) throws IOException {
93          long result = 0;
94          while (true) {
95              if (n == 0) {
96                  return result;
97              } else {
98                  int m = Math.min((int) Math.min((long) Integer.MAX_VALUE, n), bufferSize);
99                  int v = readInternal(null, 0, m);
100                 if (v != -1) {
101                     result += v;
102                     n -= v;
103                 } else {
104                     if (result == 0) {
105                         return -1;
106                     } else {
107                         return result;
108                     }
109                 }
110             }
111         }
112     }
113 
114     @Override
115     public int available() throws IOException {
116         return count[0];
117     }
118 
119     @Override
120     public void close() throws IOException {
121         closed = true;
122     }
123 
124     @Override
125     public void mark(int readlimit) {
126         // do nothing
127     }
128 
129     @Override
130     public void reset() throws IOException {
131         throw new IOException("reset not supported");
132     }
133 
134     @Override
135     public boolean markSupported() {
136         return false;
137     }
138 
139 }