1 package org.davidmoten.io.extras.internal;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.io.OutputStream;
6 import java.nio.ByteBuffer;
7 import java.util.ArrayDeque;
8 import java.util.Deque;
9
10 import org.davidmoten.io.extras.IOFunction;
11
12 public final class TransformedInputStream extends InputStream {
13
14 private final InputStream is;
15 private final Deque<ByteBuffer> queue;
16 private final int bufferSize;
17 private final OutputStream out;
18 private final byte[] singleByte = new byte[1];
19 private final byte[] buffer;
20 private boolean done;
21 private boolean closed;
22 private int[] count = new int[1];
23
24 public TransformedInputStream(InputStream is, IOFunction<? super OutputStream, ? extends OutputStream> transform,
25 int bufferSize) throws IOException {
26 this.is = is;
27 this.queue = new ArrayDeque<>();
28 this.bufferSize = bufferSize;
29 this.buffer = new byte[bufferSize];
30 this.out = transform.apply(new QueuedOutputStream(queue, count));
31 }
32
33 @Override
34 public int read() throws IOException {
35 int n = readInternal(singleByte, 0, 1);
36 if (n == -1) {
37 return -1;
38 } else {
39 return singleByte[0] & 0xff;
40 }
41 }
42
43 @Override
44 public int read(byte[] b) throws IOException {
45 return readInternal(b, 0, b.length);
46 }
47
48 @Override
49 public int read(byte[] b, int off, int len) throws IOException {
50 return readInternal(b, off, len);
51 }
52
53
54 private int readInternal(byte[] bytes, int offset, int length) throws IOException {
55 if (length == 0) {
56 return 0;
57 }
58 if (closed) {
59 throw new IOException("Stream closed");
60 }
61 while (true) {
62 ByteBuffer bb = queue.poll();
63 if (bb == null) {
64 if (done) {
65 return -1;
66 } else {
67 int n = is.read(buffer);
68 if (n == -1) {
69 done = true;
70 out.close();
71 } else {
72 out.write(buffer, 0, n);
73 }
74 }
75 } else {
76 int n = Math.min(bb.remaining(), length);
77 if (bytes != null) {
78 bb.get(bytes, offset, n);
79 } else {
80 bb.position(bb.position() + n);
81 }
82 count[0] -= n;
83 if (bb.remaining() > 0) {
84 queue.offerLast(bb);
85 }
86 return n;
87 }
88 }
89 }
90
91 @Override
92 public long skip(long n) throws IOException {
93 long result = 0;
94 while (true) {
95 if (n == 0) {
96 return result;
97 } else {
98 int m = Math.min((int) Math.min((long) Integer.MAX_VALUE, n), bufferSize);
99 int v = readInternal(null, 0, m);
100 if (v != -1) {
101 result += v;
102 n -= v;
103 } else {
104 if (result == 0) {
105 return -1;
106 } else {
107 return result;
108 }
109 }
110 }
111 }
112 }
113
114 @Override
115 public int available() throws IOException {
116 return count[0];
117 }
118
119 @Override
120 public void close() throws IOException {
121 closed = true;
122 }
123
124 @Override
125 public void mark(int readlimit) {
126
127 }
128
129 @Override
130 public void reset() throws IOException {
131 throw new IOException("reset not supported");
132 }
133
134 @Override
135 public boolean markSupported() {
136 return false;
137 }
138
139 }