1 package com.github.davidmoten.rx;
2
3 import java.io.BufferedInputStream;
4 import java.io.ByteArrayOutputStream;
5 import java.io.File;
6 import java.io.FileInputStream;
7 import java.io.FileNotFoundException;
8 import java.io.IOException;
9 import java.io.InputStream;
10 import java.util.zip.ZipEntry;
11 import java.util.zip.ZipInputStream;
12
13 import com.github.davidmoten.rx.internal.operators.OnSubscribeInputStream;
14 import com.github.davidmoten.rx.util.ZippedEntry;
15
16 import rx.Observable;
17 import rx.Observable.Transformer;
18 import rx.Observer;
19 import rx.functions.Action1;
20 import rx.functions.Action2;
21 import rx.functions.Func0;
22 import rx.functions.Func1;
23 import rx.observables.SyncOnSubscribe;
24
25 public final class Bytes {
26
27 private Bytes() {
28
29 }
30
31
32
33
34
35
36
37
38
39
40
41 public static Observable<byte[]> from(InputStream is, int size) {
42 return Observable.create(new OnSubscribeInputStream(is, size));
43 }
44
45 public static Observable<byte[]> from(File file) {
46 return from(file, 8192);
47 }
48
49 public static Observable<byte[]> from(final File file, final int size) {
50 Func0<InputStream> resourceFactory = new Func0<InputStream>() {
51
52 @Override
53 public InputStream call() {
54 try {
55 return new BufferedInputStream(new FileInputStream(file), size);
56 } catch (FileNotFoundException e) {
57 throw new RuntimeException(e);
58 }
59 }
60 };
61 Func1<InputStream, Observable<byte[]>> observableFactory = new Func1<InputStream, Observable<byte[]>>() {
62
63 @Override
64 public Observable<byte[]> call(InputStream is) {
65 return from(is, size);
66 }
67 };
68 return Observable.using(resourceFactory, observableFactory, InputStreamCloseHolder.INSTANCE, true);
69 }
70
71 private static class InputStreamCloseHolder {
72 private static final Action1<InputStream> INSTANCE = new Action1<InputStream>() {
73
74 @Override
75 public void call(InputStream is) {
76 try {
77 is.close();
78 } catch (IOException e) {
79 throw new RuntimeException(e);
80 }
81 }
82 };
83 }
84
85
86
87
88
89
90
91
92
93
94 public static Observable<byte[]> from(InputStream is) {
95 return from(is, 8192);
96 }
97
98 public static Observable<ZippedEntry> unzip(final File file) {
99 Func0<ZipInputStream> resourceFactory = new Func0<ZipInputStream>() {
100 @Override
101 public ZipInputStream call() {
102 try {
103 return new ZipInputStream(new FileInputStream(file));
104 } catch (FileNotFoundException e) {
105 throw new RuntimeException(e);
106 }
107 }
108 };
109 Func1<ZipInputStream, Observable<ZippedEntry>> observableFactory = ZipHolder.OBSERVABLE_FACTORY;
110 Action1<ZipInputStream> disposeAction = ZipHolder.DISPOSER;
111 return Observable.using(resourceFactory, observableFactory, disposeAction);
112 }
113
114 private static final class ZipHolder {
115 static final Action1<ZipInputStream> DISPOSER = new Action1<ZipInputStream>() {
116
117 @Override
118 public void call(ZipInputStream zis) {
119 try {
120 zis.close();
121 } catch (IOException e) {
122 throw new RuntimeException(e);
123 }
124 }
125 };
126 final static Func1<ZipInputStream, Observable<ZippedEntry>> OBSERVABLE_FACTORY = new Func1<ZipInputStream, Observable<ZippedEntry>>() {
127 @Override
128 public Observable<ZippedEntry> call(ZipInputStream zis) {
129 return unzip(zis);
130 }
131 };
132 }
133
134 public static Observable<ZippedEntry> unzip(final InputStream is) {
135 return unzip(new ZipInputStream(is));
136 }
137
138 public static Observable<ZippedEntry> unzip(final ZipInputStream zis) {
139 return Observable.create(new SyncOnSubscribe<ZipInputStream, ZippedEntry>() {
140
141 @Override
142 protected ZipInputStream generateState() {
143 return zis;
144 }
145
146 @Override
147 protected ZipInputStream next(ZipInputStream zis, Observer<? super ZippedEntry> observer) {
148 try {
149 ZipEntry zipEntry = zis.getNextEntry();
150 if (zipEntry != null) {
151 observer.onNext(new ZippedEntry(zipEntry, zis));
152 } else {
153 zis.close();
154 observer.onCompleted();
155 }
156 } catch (IOException e) {
157 observer.onError(e);
158 }
159 return zis;
160 }
161 });
162 }
163
164 public static Transformer<byte[], byte[]> collect() {
165 return new Transformer<byte[], byte[]>() {
166
167 @Override
168 public Observable<byte[]> call(Observable<byte[]> source) {
169 return source.collect(BosCreatorHolder.INSTANCE, BosCollectorHolder.INSTANCE)
170 .map(BosToArrayHolder.INSTANCE);
171 }
172 };
173 }
174
175 private static final class BosCreatorHolder {
176
177 static final Func0<ByteArrayOutputStream> INSTANCE = new Func0<ByteArrayOutputStream>() {
178
179 @Override
180 public ByteArrayOutputStream call() {
181 return new ByteArrayOutputStream();
182 }
183 };
184 }
185
186 private static final class BosCollectorHolder {
187
188 static final Action2<ByteArrayOutputStream, byte[]> INSTANCE = new Action2<ByteArrayOutputStream, byte[]>() {
189
190 @Override
191 public void call(ByteArrayOutputStream bos, byte[] bytes) {
192 try {
193 bos.write(bytes);
194 } catch (IOException e) {
195 throw new RuntimeException(e);
196 }
197 }
198 };
199 }
200
201 private static final class BosToArrayHolder {
202 static final Func1<ByteArrayOutputStream, byte[]> INSTANCE = new Func1<ByteArrayOutputStream, byte[]>() {
203 @Override
204 public byte[] call(ByteArrayOutputStream bos) {
205 return bos.toByteArray();
206 }
207 };
208 }
209
210 }