1 package com.github.davidmoten.rx2.flowable;
2
3 import java.io.BufferedInputStream;
4 import java.io.BufferedOutputStream;
5 import java.io.EOFException;
6 import java.io.File;
7 import java.io.FileInputStream;
8 import java.io.FileNotFoundException;
9 import java.io.FileOutputStream;
10 import java.io.IOException;
11 import java.io.InputStream;
12 import java.io.ObjectInputStream;
13 import java.io.ObjectOutputStream;
14 import java.io.Serializable;
15 import java.util.concurrent.Callable;
16
17 import com.esotericsoftware.kryo.Kryo;
18 import com.esotericsoftware.kryo.io.Input;
19 import com.esotericsoftware.kryo.io.Output;
20 import com.github.davidmoten.rx2.Consumers;
21
22 import io.reactivex.Emitter;
23 import io.reactivex.Flowable;
24 import io.reactivex.functions.Consumer;
25 import io.reactivex.functions.Function;
26
27
28
29
30
31
32 public final class Serialized {
33
34 private static final int DEFAULT_BUFFER_SIZE = 8192;
35
36 private Serialized() {
37
38 }
39
40
41
42
43
44
45
46
47
48
49
50
51 public static <T extends Serializable> Flowable<T> read(final ObjectInputStream ois) {
52 return Flowable.generate(new Consumer<Emitter<T>>() {
53 @Override
54 public void accept(Emitter<T> emitter) throws Exception {
55 try {
56 @SuppressWarnings("unchecked")
57 T t = (T) ois.readObject();
58 emitter.onNext(t);
59 } catch (EOFException e) {
60 emitter.onComplete();
61 } catch (ClassNotFoundException e) {
62 emitter.onError(e);
63 } catch (IOException e) {
64 emitter.onError(e);
65 }
66 }
67 });
68 }
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 public static <T extends Serializable> Flowable<T> read(final File file, final int bufferSize) {
86 Callable<ObjectInputStream> resourceFactory = new Callable<ObjectInputStream>() {
87 @Override
88 public ObjectInputStream call() throws IOException {
89 return new ObjectInputStream(new BufferedInputStream(new FileInputStream(file), bufferSize));
90 }
91 };
92 @SuppressWarnings("unchecked")
93 Function<ObjectInputStream, Flowable<T>> flowableFactory = (Function<ObjectInputStream, Flowable<T>>) (Function<?, ?>) ObjectInputStreamFlowableFactoryHolder.INSTANCE;
94 Consumer<ObjectInputStream> disposeAction = Consumers.close();
95 return Flowable.using(resourceFactory, flowableFactory, disposeAction, true);
96 }
97
98
99 private static final class ObjectInputStreamFlowableFactoryHolder {
100 static final Function<ObjectInputStream, Flowable<Serializable>> INSTANCE = new Function<ObjectInputStream, Flowable<Serializable>>() {
101
102 @Override
103 public Flowable<Serializable> apply(ObjectInputStream is) throws Exception {
104 return read(is);
105 }
106
107 };
108 }
109
110
111
112
113
114
115
116
117
118
119
120
121
122 public static <T extends Serializable> Flowable<T> read(final File file) {
123 return read(file, DEFAULT_BUFFER_SIZE);
124 }
125
126
127
128
129
130
131
132
133
134
135
136
137
138 public static <T extends Serializable> Flowable<T> write(Flowable<T> source, final ObjectOutputStream oos) {
139 return source.doOnNext(new Consumer<T>() {
140
141 @Override
142 public void accept(T t) throws IOException {
143 oos.writeObject(t);
144 }
145 });
146 }
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165 public static <T extends Serializable> Flowable<T> write(final Flowable<T> source, final File file,
166 final boolean append, final int bufferSize) {
167 Callable<ObjectOutputStream> resourceFactory = new Callable<ObjectOutputStream>() {
168 @Override
169 public ObjectOutputStream call() throws IOException {
170 return new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file, append), bufferSize));
171 }
172 };
173 Function<ObjectOutputStream, Flowable<T>> flowableFactory = new Function<ObjectOutputStream, Flowable<T>>() {
174
175 @Override
176 public Flowable<T> apply(ObjectOutputStream oos) {
177 return write(source, oos);
178 }
179 };
180 Consumer<ObjectOutputStream> disposeAction = Consumers.close();
181 return Flowable.using(resourceFactory, flowableFactory, disposeAction, true);
182 }
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199 public static <T extends Serializable> Flowable<T> write(final Flowable<T> source, final File file,
200 final boolean append) {
201 return write(source, file, append, DEFAULT_BUFFER_SIZE);
202 }
203
204
205
206
207
208
209
210
211
212
213
214
215
216 public static <T extends Serializable> Flowable<T> write(final Flowable<T> source, final File file) {
217 return write(source, file, false, DEFAULT_BUFFER_SIZE);
218 }
219
220 public static KryoBuilder kryo() {
221 return kryo(new Kryo());
222 }
223
224 public static KryoBuilder kryo(Kryo kryo) {
225 return new KryoBuilder(kryo);
226 }
227
228 public static class KryoBuilder {
229
230 private static final int DEFAULT_BUFFER_SIZE = 4096;
231
232 private final Kryo kryo;
233
234 private KryoBuilder(Kryo kryo) {
235 this.kryo = kryo;
236 }
237
238 public <T> Flowable<T> write(final Flowable<T> source, final File file) {
239 return write(source, file, false, DEFAULT_BUFFER_SIZE);
240 }
241
242 public <T> Flowable<T> write(final Flowable<T> source, final File file, boolean append) {
243 return write(source, file, append, DEFAULT_BUFFER_SIZE);
244 }
245
246 public <T> Flowable<T> write(final Flowable<T> source, final File file, final boolean append,
247 final int bufferSize) {
248 Callable<Output> resourceFactory = new Callable<Output>() {
249 @Override
250 public Output call() throws FileNotFoundException {
251 return new Output(new FileOutputStream(file, append), bufferSize);
252 }
253 };
254 Function<Output, Flowable<T>> flowableFactory = new Function<Output, Flowable<T>>() {
255
256 @Override
257 public Flowable<T> apply(final Output output) {
258 return source.doOnNext(new Consumer<T>() {
259 @Override
260 public void accept(T t) {
261 kryo.writeObject(output, t);
262 }
263 });
264 }
265 };
266 Consumer<Output> disposeAction = Consumers.close();
267 return Flowable.using(resourceFactory, flowableFactory, disposeAction, true);
268 }
269
270 public <T> Flowable<T> read(Class<T> cls, final File file) {
271 return read(cls, file, DEFAULT_BUFFER_SIZE);
272 }
273
274 public <T> Flowable<T> read(final Class<T> cls, final File file, final int bufferSize) {
275 Callable<Input> resourceFactory = new Callable<Input>() {
276 @Override
277 public Input call() throws FileNotFoundException {
278 return new Input(new FileInputStream(file), bufferSize);
279 }
280 };
281 Function<Input, Flowable<T>> flowableFactory = new Function<Input, Flowable<T>>() {
282
283 @Override
284 public Flowable<T> apply(final Input input) {
285 return read(cls, input);
286 }
287 };
288 Consumer<Input> disposeAction = Consumers.close();
289 return Flowable.using(resourceFactory, flowableFactory, disposeAction, true);
290 }
291
292 public <T> Flowable<T> read(final Class<T> cls, final Input input) {
293
294 return Flowable.generate(new Consumer<Emitter<T>>() {
295
296 @Override
297 public void accept(Emitter<T> emitter) throws Exception {
298 if (input.eof()) {
299 emitter.onComplete();
300 } else {
301 T t = kryo.readObject(input, cls);
302 emitter.onNext(t);
303 }
304 }
305
306 });
307 }
308 }
309
310 }