View Javadoc
1   package com.github.davidmoten.rx;
2   
3   import java.io.BufferedInputStream;
4   import java.io.BufferedOutputStream;
5   import java.io.EOFException;
6   import java.io.File;
7   import java.io.FileInputStream;
8   import java.io.FileNotFoundException;
9   import java.io.FileOutputStream;
10  import java.io.IOException;
11  import java.io.InputStream;
12  import java.io.ObjectInputStream;
13  import java.io.ObjectOutputStream;
14  import java.io.Serializable;
15  
16  import com.esotericsoftware.kryo.Kryo;
17  import com.esotericsoftware.kryo.io.Input;
18  import com.esotericsoftware.kryo.io.Output;
19  
20  import rx.Observable;
21  import rx.Observer;
22  import rx.functions.Action1;
23  import rx.functions.Func0;
24  import rx.functions.Func1;
25  import rx.observables.SyncOnSubscribe;
26  
27  /**
28   * Utility class for writing Observable streams to ObjectOutputStreams and
29   * reading Observable streams of indeterminate size from ObjectInputStreams.
30   */
31  public final class Serialized {
32  
33      private static final int DEFAULT_BUFFER_SIZE = 8192;
34  
35      /**
36       * Returns the deserialized objects from the given {@link InputStream} as an
37       * {@link Observable} stream.
38       * 
39       * @param ois
40       *            the {@link ObjectInputStream}
41       * @param <T>
42       *            the generic type of the returned stream
43       * @return the stream of deserialized objects from the {@link InputStream}
44       *         as an {@link Observable}.
45       */
46      public static <T extends Serializable> Observable<T> read(final ObjectInputStream ois) {
47          return Observable.create(new SyncOnSubscribe<ObjectInputStream,T>() {
48  
49              @Override
50              protected ObjectInputStream generateState() {
51                 return ois;
52              }
53  
54              @Override
55              protected ObjectInputStream next(ObjectInputStream ois, Observer<? super T> observer) {
56                  try {
57                      @SuppressWarnings("unchecked")
58                      T t = (T) ois.readObject();
59                      observer.onNext(t);
60                  } catch (EOFException e) {
61                      observer.onCompleted();
62                  } catch (ClassNotFoundException e) {
63                      observer.onError(e);
64                  } catch (IOException e) {
65                      observer.onError(e);
66                  }
67                  return ois;
68              }
69          });
70      }
71  
72      /**
73       * Returns the deserialized objects from the given {@link File} as an
74       * {@link Observable} stream. Uses buffer of size <code>bufferSize</code>
75       * buffer reads from the File.
76       * 
77       * @param file
78       *            the input file
79       * @param bufferSize
80       *            the buffer size for reading bytes from the file.
81       * @param <T>
82       *            the generic type of the deserialized objects returned in the
83       *            stream
84       * @return the stream of deserialized objects from the {@link InputStream}
85       *         as an {@link Observable}.
86       */
87      public static <T extends Serializable> Observable<T> read(final File file,
88              final int bufferSize) {
89          Func0<ObjectInputStream> resourceFactory = new Func0<ObjectInputStream>() {
90              @Override
91              public ObjectInputStream call() {
92                  try {
93                      return new ObjectInputStream(
94                              new BufferedInputStream(new FileInputStream(file), bufferSize));
95                  } catch (FileNotFoundException e) {
96                      throw new RuntimeException(e);
97                  } catch (IOException e) {
98                      throw new RuntimeException(e);
99                  }
100             }
101         };
102         Func1<ObjectInputStream, Observable<? extends T>> observableFactory = new Func1<ObjectInputStream, Observable<? extends T>>() {
103 
104             @Override
105             public Observable<? extends T> call(ObjectInputStream is) {
106                 return read(is);
107             }
108         };
109         Action1<ObjectInputStream> disposeAction = new Action1<ObjectInputStream>() {
110 
111             @Override
112             public void call(ObjectInputStream ois) {
113                 try {
114                     ois.close();
115                 } catch (IOException e) {
116                     throw new RuntimeException(e);
117                 }
118             }
119         };
120         return Observable.using(resourceFactory, observableFactory, disposeAction, true);
121     }
122 
123     /**
124      * Returns the deserialized objects from the given {@link File} as an
125      * {@link Observable} stream. A buffer size of 8192 bytes is used by
126      * default.
127      * 
128      * @param file
129      *            the input file containing serialized java objects
130      * @param <T>
131      *            the generic type of the deserialized objects returned in the
132      *            stream
133      * @return the stream of deserialized objects from the {@link InputStream}
134      *         as an {@link Observable}.
135      */
136     public static <T extends Serializable> Observable<T> read(final File file) {
137         return read(file, DEFAULT_BUFFER_SIZE);
138     }
139 
140     /**
141      * Returns a duplicate of the input stream but with the side effect that
142      * emissions from the source are written to the {@link ObjectOutputStream}.
143      * 
144      * @param source
145      *            the source of objects to write
146      * @param oos
147      *            the output stream to write to
148      * @param <T>
149      *            the generic type of the objects being serialized
150      * @return re-emits the input stream
151      */
152     public static <T extends Serializable> Observable<T> write(Observable<T> source,
153             final ObjectOutputStream oos) {
154         return source.doOnNext(new Action1<T>() {
155 
156             @Override
157             public void call(T t) {
158                 try {
159                     oos.writeObject(t);
160                 } catch (IOException e) {
161                     throw new RuntimeException(e);
162                 }
163             }
164         });
165     }
166 
167     /**
168      * Writes the source stream to the given file in given append mode and using
169      * the given buffer size.
170      * 
171      * @param source
172      *            observable stream to write
173      * @param file
174      *            file to write to
175      * @param append
176      *            if true writes are appended to file otherwise overwrite the
177      *            file
178      * @param bufferSize
179      *            the buffer size in bytes to use.
180      * @param <T>
181      *            the generic type of the input stream
182      * @return re-emits the input stream
183      */
184     public static <T extends Serializable> Observable<T> write(final Observable<T> source,
185             final File file, final boolean append, final int bufferSize) {
186         Func0<ObjectOutputStream> resourceFactory = new Func0<ObjectOutputStream>() {
187             @Override
188             public ObjectOutputStream call() {
189                 try {
190                     return new ObjectOutputStream(new BufferedOutputStream(
191                             new FileOutputStream(file, append), bufferSize));
192                 } catch (FileNotFoundException e) {
193                     throw new RuntimeException(e);
194                 } catch (IOException e) {
195                     throw new RuntimeException(e);
196                 }
197             }
198         };
199         Func1<ObjectOutputStream, Observable<? extends T>> observableFactory = new Func1<ObjectOutputStream, Observable<? extends T>>() {
200 
201             @Override
202             public Observable<? extends T> call(ObjectOutputStream oos) {
203                 return write(source, oos);
204             }
205         };
206         Action1<ObjectOutputStream> disposeAction = new Action1<ObjectOutputStream>() {
207 
208             @Override
209             public void call(ObjectOutputStream oos) {
210                 try {
211                     oos.close();
212                 } catch (IOException e) {
213                     throw new RuntimeException(e);
214                 }
215             }
216         };
217         return Observable.using(resourceFactory, observableFactory, disposeAction, true);
218     }
219 
220     /**
221      * Writes the source stream to the given file in given append mode and using
222      * the a buffer size of 8192 bytes.
223      * 
224      * @param source
225      *            observable stream to write
226      * @param file
227      *            file to write to
228      * @param append
229      *            if true writes are appended to file otherwise overwrite the
230      *            file
231      * @param <T>
232      *            the generic type of the input stream
233      * @return re-emits the input stream
234      */
235     public static <T extends Serializable> Observable<T> write(final Observable<T> source,
236             final File file, final boolean append) {
237         return write(source, file, append, DEFAULT_BUFFER_SIZE);
238     }
239 
240     /**
241      * Writes the source stream to the given file in given append mode and using
242      * the a buffer size of 8192 bytes.
243      * 
244      * @param source
245      *            observable stream to write
246      * @param file
247      *            file to write to
248      * @param <T>
249      *            the generic type of the input stream
250      * @return re-emits the input stream
251      */
252     public static <T extends Serializable> Observable<T> write(final Observable<T> source,
253             final File file) {
254         return write(source, file, false, DEFAULT_BUFFER_SIZE);
255     }
256 
257     public static KryoBuilder kryo() {
258         return kryo(new Kryo());
259     }
260 
261     public static KryoBuilder kryo(Kryo kryo) {
262         return new KryoBuilder(kryo);
263     }
264 
265     public static class KryoBuilder {
266 
267         private static final int DEFAULT_BUFFER_SIZE = 4096;
268 
269         private final Kryo kryo;
270 
271         private KryoBuilder(Kryo kryo) {
272             this.kryo = kryo;
273         }
274 
275         public <T> Observable<T> write(final Observable<T> source, final File file) {
276             return write(source, file, false, DEFAULT_BUFFER_SIZE);
277         }
278 
279         public <T> Observable<T> write(final Observable<T> source, final File file,
280                 boolean append) {
281             return write(source, file, append, DEFAULT_BUFFER_SIZE);
282         }
283 
284         public <T> Observable<T> write(final Observable<T> source, final File file,
285                 final boolean append, final int bufferSize) {
286             Func0<Output> resourceFactory = new Func0<Output>() {
287                 @Override
288                 public Output call() {
289                     try {
290                         return new Output(new FileOutputStream(file, append), bufferSize);
291                     } catch (FileNotFoundException e) {
292                         throw new RuntimeException(e);
293                     }
294                 }
295             };
296             Func1<Output, Observable<? extends T>> observableFactory = new Func1<Output, Observable<? extends T>>() {
297 
298                 @Override
299                 public Observable<? extends T> call(final Output output) {
300                     return source.doOnNext(new Action1<T>() {
301                         @Override
302                         public void call(T t) {
303                             kryo.writeObject(output, t);
304                         }
305                     });
306                 }
307             };
308             Action1<Output> disposeAction = new Action1<Output>() {
309 
310                 @Override
311                 public void call(Output output) {
312                     output.close();
313                 }
314             };
315             return Observable.using(resourceFactory, observableFactory, disposeAction, true);
316         }
317 
318         public <T> Observable<T> read(Class<T> cls, final File file) {
319             return read(cls, file, DEFAULT_BUFFER_SIZE);
320         }
321 
322         public <T> Observable<T> read(final Class<T> cls, final File file, final int bufferSize) {
323             Func0<Input> resourceFactory = new Func0<Input>() {
324                 @Override
325                 public Input call() {
326                     try {
327                         return new Input(new FileInputStream(file), bufferSize);
328                     } catch (FileNotFoundException e) {
329                         throw new RuntimeException(e);
330                     }
331                 }
332             };
333             Func1<Input, Observable<? extends T>> observableFactory = new Func1<Input, Observable<? extends T>>() {
334 
335                 @Override
336                 public Observable<? extends T> call(final Input input) {
337                     return read(cls, input, bufferSize);
338                 }
339             };
340             Action1<Input> disposeAction = new Action1<Input>() {
341 
342                 @Override
343                 public void call(Input input) {
344                     input.close();
345                 }
346             };
347             return Observable.using(resourceFactory, observableFactory, disposeAction, true);
348         }
349 
350         public <T> Observable<T> read(final Class<T> cls, final Input input, final int bufferSize) {
351 
352             return Observable.create(new SyncOnSubscribe<Input,T>() {
353 
354                 @Override
355                 protected Input generateState() {
356                     return input;
357                 }
358 
359                 @Override
360                 protected Input next(Input arg0, Observer<? super T> observer) {
361                     if (input.eof()) {
362                         observer.onCompleted();
363                     } else {
364                         T t = kryo.readObject(input, cls);
365                         observer.onNext(t);
366                     }
367                     return input;
368                 }
369             });
370         }
371     }
372 
373 }