1 package com.github.davidmoten.rx2;
2
3 import java.io.BufferedInputStream;
4 import java.io.ByteArrayOutputStream;
5 import java.io.File;
6 import java.io.FileInputStream;
7 import java.io.FileNotFoundException;
8 import java.io.IOException;
9 import java.io.InputStream;
10 import java.util.Arrays;
11 import java.util.concurrent.Callable;
12 import java.util.zip.ZipEntry;
13 import java.util.zip.ZipInputStream;
14
15 import com.github.davidmoten.rx2.util.ZippedEntry;
16
17 import io.reactivex.Emitter;
18 import io.reactivex.Flowable;
19 import io.reactivex.Single;
20 import io.reactivex.functions.BiConsumer;
21 import io.reactivex.functions.Consumer;
22 import io.reactivex.functions.Function;
23
24 public final class Bytes {
25
26 private static final int DEFAULT_BUFFER_SIZE = 8192;
27
28 private Bytes() {
29
30 }
31
32
33
34
35
36
37
38
39
40
41
42 public static Flowable<byte[]> from(final InputStream is, final int bufferSize) {
43 return Flowable.generate(new Consumer<Emitter<byte[]>>() {
44 @Override
45 public void accept(Emitter<byte[]> emitter) throws Exception {
46 byte[] buffer = new byte[bufferSize];
47 int count = is.read(buffer);
48 if (count == -1) {
49 emitter.onComplete();
50 } else if (count < bufferSize) {
51 emitter.onNext(Arrays.copyOf(buffer, count));
52 } else {
53 emitter.onNext(buffer);
54 }
55 }
56 });
57 }
58
59 public static Flowable<byte[]> from(File file) {
60 return from(file, 8192);
61 }
62
63 public static Flowable<byte[]> from(final File file, final int size) {
64 Callable<InputStream> resourceFactory = new Callable<InputStream>() {
65
66 @Override
67 public InputStream call() throws FileNotFoundException {
68 return new BufferedInputStream(new FileInputStream(file), size);
69 }
70 };
71 Function<InputStream, Flowable<byte[]>> observableFactory = new Function<InputStream, Flowable<byte[]>>() {
72
73 @Override
74 public Flowable<byte[]> apply(InputStream is) {
75 return from(is, size);
76 }
77 };
78 return Flowable.using(resourceFactory, observableFactory, InputStreamCloseHolder.INSTANCE, true);
79 }
80
81 private static final class InputStreamCloseHolder {
82 static final Consumer<InputStream> INSTANCE = new Consumer<InputStream>() {
83
84 @Override
85 public void accept(InputStream is) throws IOException {
86 is.close();
87 }
88 };
89 }
90
91
92
93
94
95
96
97
98
99
100 public static Flowable<byte[]> from(InputStream is) {
101 return from(is, DEFAULT_BUFFER_SIZE);
102 }
103
104 public static Flowable<ZippedEntry> unzip(final File file) {
105 Callable<ZipInputStream> resourceFactory = new Callable<ZipInputStream>() {
106 @Override
107 public ZipInputStream call() throws FileNotFoundException {
108 return new ZipInputStream(new FileInputStream(file));
109 }
110 };
111 Function<ZipInputStream, Flowable<ZippedEntry>> observableFactory = ZipHolder.OBSERVABLE_FACTORY;
112 Consumer<ZipInputStream> disposeAction = ZipHolder.DISPOSER;
113 return Flowable.using(resourceFactory, observableFactory, disposeAction);
114 }
115
116 public static Flowable<ZippedEntry> unzip(final InputStream is) {
117 return unzip(new ZipInputStream(is));
118 }
119
120 public static Flowable<ZippedEntry> unzip(final ZipInputStream zis) {
121
122 return Flowable.generate(new Consumer<Emitter<ZippedEntry>>() {
123 @Override
124 public void accept(Emitter<ZippedEntry> emitter) throws IOException {
125 ZipEntry zipEntry = zis.getNextEntry();
126 if (zipEntry != null) {
127 emitter.onNext(new ZippedEntry(zipEntry, zis));
128 } else {
129
130
131 zis.close();
132 emitter.onComplete();
133 }
134 }
135 });
136
137 }
138
139 public static Single<byte[]> collect(Flowable<byte[]> source) {
140 return source.collect(BosCreatorHolder.INSTANCE, BosCollectorHolder.INSTANCE).map(BosToArrayHolder.INSTANCE);
141 }
142
143 public static Function<Flowable<byte[]>, Single<byte[]>> collect() {
144 return new Function<Flowable<byte[]>, Single<byte[]>>() {
145 @Override
146 public Single<byte[]> apply(Flowable<byte[]> source) throws Exception {
147 return collect(source);
148 }
149 };
150 }
151
152 private static final class BosCreatorHolder {
153 static final Callable<ByteArrayOutputStream> INSTANCE = new Callable<ByteArrayOutputStream>() {
154
155 @Override
156 public ByteArrayOutputStream call() {
157 return new ByteArrayOutputStream();
158 }
159 };
160 }
161
162 private static final class BosCollectorHolder {
163 static final BiConsumer<ByteArrayOutputStream, byte[]> INSTANCE = new BiConsumer<ByteArrayOutputStream, byte[]>() {
164
165 @Override
166 public void accept(ByteArrayOutputStream bos, byte[] bytes) throws IOException {
167 bos.write(bytes);
168 }
169 };
170 }
171
172 private static final class BosToArrayHolder {
173 static final Function<ByteArrayOutputStream, byte[]> INSTANCE = new Function<ByteArrayOutputStream, byte[]>() {
174 @Override
175 public byte[] apply(ByteArrayOutputStream bos) {
176 return bos.toByteArray();
177 }
178 };
179 }
180
181 private static final class ZipHolder {
182 static final Consumer<ZipInputStream> DISPOSER = new Consumer<ZipInputStream>() {
183
184 @Override
185 public void accept(ZipInputStream zis) throws IOException {
186 zis.close();
187 }
188 };
189 final static Function<ZipInputStream, Flowable<ZippedEntry>> OBSERVABLE_FACTORY = new Function<ZipInputStream, Flowable<ZippedEntry>>() {
190 @Override
191 public Flowable<ZippedEntry> apply(ZipInputStream zis) {
192 return unzip(zis);
193 }
194 };
195 }
196
197 }