1 package com.github.davidmoten.rx;
2
3 import java.io.BufferedInputStream;
4 import java.io.BufferedOutputStream;
5 import java.io.EOFException;
6 import java.io.File;
7 import java.io.FileInputStream;
8 import java.io.FileNotFoundException;
9 import java.io.FileOutputStream;
10 import java.io.IOException;
11 import java.io.InputStream;
12 import java.io.ObjectInputStream;
13 import java.io.ObjectOutputStream;
14 import java.io.Serializable;
15
16 import com.esotericsoftware.kryo.Kryo;
17 import com.esotericsoftware.kryo.io.Input;
18 import com.esotericsoftware.kryo.io.Output;
19
20 import rx.Observable;
21 import rx.Observer;
22 import rx.functions.Action1;
23 import rx.functions.Func0;
24 import rx.functions.Func1;
25 import rx.observables.SyncOnSubscribe;
26
27
28
29
30
31 public final class Serialized {
32
33 private static final int DEFAULT_BUFFER_SIZE = 8192;
34
35
36
37
38
39
40
41
42
43
44
45
46 public static <T extends Serializable> Observable<T> read(final ObjectInputStream ois) {
47 return Observable.create(new SyncOnSubscribe<ObjectInputStream,T>() {
48
49 @Override
50 protected ObjectInputStream generateState() {
51 return ois;
52 }
53
54 @Override
55 protected ObjectInputStream next(ObjectInputStream ois, Observer<? super T> observer) {
56 try {
57 @SuppressWarnings("unchecked")
58 T t = (T) ois.readObject();
59 observer.onNext(t);
60 } catch (EOFException e) {
61 observer.onCompleted();
62 } catch (ClassNotFoundException e) {
63 observer.onError(e);
64 } catch (IOException e) {
65 observer.onError(e);
66 }
67 return ois;
68 }
69 });
70 }
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 public static <T extends Serializable> Observable<T> read(final File file,
88 final int bufferSize) {
89 Func0<ObjectInputStream> resourceFactory = new Func0<ObjectInputStream>() {
90 @Override
91 public ObjectInputStream call() {
92 try {
93 return new ObjectInputStream(
94 new BufferedInputStream(new FileInputStream(file), bufferSize));
95 } catch (FileNotFoundException e) {
96 throw new RuntimeException(e);
97 } catch (IOException e) {
98 throw new RuntimeException(e);
99 }
100 }
101 };
102 Func1<ObjectInputStream, Observable<? extends T>> observableFactory = new Func1<ObjectInputStream, Observable<? extends T>>() {
103
104 @Override
105 public Observable<? extends T> call(ObjectInputStream is) {
106 return read(is);
107 }
108 };
109 Action1<ObjectInputStream> disposeAction = new Action1<ObjectInputStream>() {
110
111 @Override
112 public void call(ObjectInputStream ois) {
113 try {
114 ois.close();
115 } catch (IOException e) {
116 throw new RuntimeException(e);
117 }
118 }
119 };
120 return Observable.using(resourceFactory, observableFactory, disposeAction, true);
121 }
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 public static <T extends Serializable> Observable<T> read(final File file) {
137 return read(file, DEFAULT_BUFFER_SIZE);
138 }
139
140
141
142
143
144
145
146
147
148
149
150
151
152 public static <T extends Serializable> Observable<T> write(Observable<T> source,
153 final ObjectOutputStream oos) {
154 return source.doOnNext(new Action1<T>() {
155
156 @Override
157 public void call(T t) {
158 try {
159 oos.writeObject(t);
160 } catch (IOException e) {
161 throw new RuntimeException(e);
162 }
163 }
164 });
165 }
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 public static <T extends Serializable> Observable<T> write(final Observable<T> source,
185 final File file, final boolean append, final int bufferSize) {
186 Func0<ObjectOutputStream> resourceFactory = new Func0<ObjectOutputStream>() {
187 @Override
188 public ObjectOutputStream call() {
189 try {
190 return new ObjectOutputStream(new BufferedOutputStream(
191 new FileOutputStream(file, append), bufferSize));
192 } catch (FileNotFoundException e) {
193 throw new RuntimeException(e);
194 } catch (IOException e) {
195 throw new RuntimeException(e);
196 }
197 }
198 };
199 Func1<ObjectOutputStream, Observable<? extends T>> observableFactory = new Func1<ObjectOutputStream, Observable<? extends T>>() {
200
201 @Override
202 public Observable<? extends T> call(ObjectOutputStream oos) {
203 return write(source, oos);
204 }
205 };
206 Action1<ObjectOutputStream> disposeAction = new Action1<ObjectOutputStream>() {
207
208 @Override
209 public void call(ObjectOutputStream oos) {
210 try {
211 oos.close();
212 } catch (IOException e) {
213 throw new RuntimeException(e);
214 }
215 }
216 };
217 return Observable.using(resourceFactory, observableFactory, disposeAction, true);
218 }
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235 public static <T extends Serializable> Observable<T> write(final Observable<T> source,
236 final File file, final boolean append) {
237 return write(source, file, append, DEFAULT_BUFFER_SIZE);
238 }
239
240
241
242
243
244
245
246
247
248
249
250
251
252 public static <T extends Serializable> Observable<T> write(final Observable<T> source,
253 final File file) {
254 return write(source, file, false, DEFAULT_BUFFER_SIZE);
255 }
256
257 public static KryoBuilder kryo() {
258 return kryo(new Kryo());
259 }
260
261 public static KryoBuilder kryo(Kryo kryo) {
262 return new KryoBuilder(kryo);
263 }
264
265 public static class KryoBuilder {
266
267 private static final int DEFAULT_BUFFER_SIZE = 4096;
268
269 private final Kryo kryo;
270
271 private KryoBuilder(Kryo kryo) {
272 this.kryo = kryo;
273 }
274
275 public <T> Observable<T> write(final Observable<T> source, final File file) {
276 return write(source, file, false, DEFAULT_BUFFER_SIZE);
277 }
278
279 public <T> Observable<T> write(final Observable<T> source, final File file,
280 boolean append) {
281 return write(source, file, append, DEFAULT_BUFFER_SIZE);
282 }
283
284 public <T> Observable<T> write(final Observable<T> source, final File file,
285 final boolean append, final int bufferSize) {
286 Func0<Output> resourceFactory = new Func0<Output>() {
287 @Override
288 public Output call() {
289 try {
290 return new Output(new FileOutputStream(file, append), bufferSize);
291 } catch (FileNotFoundException e) {
292 throw new RuntimeException(e);
293 }
294 }
295 };
296 Func1<Output, Observable<? extends T>> observableFactory = new Func1<Output, Observable<? extends T>>() {
297
298 @Override
299 public Observable<? extends T> call(final Output output) {
300 return source.doOnNext(new Action1<T>() {
301 @Override
302 public void call(T t) {
303 kryo.writeObject(output, t);
304 }
305 });
306 }
307 };
308 Action1<Output> disposeAction = new Action1<Output>() {
309
310 @Override
311 public void call(Output output) {
312 output.close();
313 }
314 };
315 return Observable.using(resourceFactory, observableFactory, disposeAction, true);
316 }
317
318 public <T> Observable<T> read(Class<T> cls, final File file) {
319 return read(cls, file, DEFAULT_BUFFER_SIZE);
320 }
321
322 public <T> Observable<T> read(final Class<T> cls, final File file, final int bufferSize) {
323 Func0<Input> resourceFactory = new Func0<Input>() {
324 @Override
325 public Input call() {
326 try {
327 return new Input(new FileInputStream(file), bufferSize);
328 } catch (FileNotFoundException e) {
329 throw new RuntimeException(e);
330 }
331 }
332 };
333 Func1<Input, Observable<? extends T>> observableFactory = new Func1<Input, Observable<? extends T>>() {
334
335 @Override
336 public Observable<? extends T> call(final Input input) {
337 return read(cls, input, bufferSize);
338 }
339 };
340 Action1<Input> disposeAction = new Action1<Input>() {
341
342 @Override
343 public void call(Input input) {
344 input.close();
345 }
346 };
347 return Observable.using(resourceFactory, observableFactory, disposeAction, true);
348 }
349
350 public <T> Observable<T> read(final Class<T> cls, final Input input, final int bufferSize) {
351
352 return Observable.create(new SyncOnSubscribe<Input,T>() {
353
354 @Override
355 protected Input generateState() {
356 return input;
357 }
358
359 @Override
360 protected Input next(Input arg0, Observer<? super T> observer) {
361 if (input.eof()) {
362 observer.onCompleted();
363 } else {
364 T t = kryo.readObject(input, cls);
365 observer.onNext(t);
366 }
367 return input;
368 }
369 });
370 }
371 }
372
373 }