View Javadoc
1   package com.github.davidmoten.rx2.flowable;
2   
3   import java.io.BufferedInputStream;
4   import java.io.BufferedOutputStream;
5   import java.io.EOFException;
6   import java.io.File;
7   import java.io.FileInputStream;
8   import java.io.FileNotFoundException;
9   import java.io.FileOutputStream;
10  import java.io.IOException;
11  import java.io.InputStream;
12  import java.io.ObjectInputStream;
13  import java.io.ObjectOutputStream;
14  import java.io.Serializable;
15  import java.util.concurrent.Callable;
16  
17  import com.esotericsoftware.kryo.Kryo;
18  import com.esotericsoftware.kryo.io.Input;
19  import com.esotericsoftware.kryo.io.Output;
20  import com.github.davidmoten.rx2.Consumers;
21  
22  import io.reactivex.Emitter;
23  import io.reactivex.Flowable;
24  import io.reactivex.functions.Consumer;
25  import io.reactivex.functions.Function;
26  
27  /**
28   * Utility class for writing {@link Flowable} streams to
29   * {@link ObjectOutputStream}s and reading {@link Flowable} streams of
30   * indeterminate size from {@link ObjectInputStream}s.
31   */
32  public final class Serialized {
33  
34      private static final int DEFAULT_BUFFER_SIZE = 8192;
35  
36      private Serialized() {
37          // prevent instantiation
38      }
39  
40      /**
41       * Returns the deserialized objects from the given {@link InputStream} as a
42       * {@link Flowable} stream.
43       * 
44       * @param ois
45       *            the {@link ObjectInputStream}
46       * @param <T>
47       *            the generic type of the returned stream
48       * @return the stream of deserialized objects from the {@link InputStream}
49       *         as an {@link Flowable}.
50       */
51      public static <T extends Serializable> Flowable<T> read(final ObjectInputStream ois) {
52          return Flowable.generate(new Consumer<Emitter<T>>() {
53              @Override
54              public void accept(Emitter<T> emitter) throws Exception {
55                  try {
56                      @SuppressWarnings("unchecked")
57                      T t = (T) ois.readObject();
58                      emitter.onNext(t);
59                  } catch (EOFException e) {
60                      emitter.onComplete();
61                  } catch (ClassNotFoundException e) {
62                      emitter.onError(e);
63                  } catch (IOException e) {
64                      emitter.onError(e);
65                  }
66              }
67          });
68      }
69  
70      /**
71       * Returns the deserialized objects from the given {@link File} as a
72       * {@link Flowable} stream. Uses buffer of size <code>bufferSize</code>
73       * buffer reads from the File.
74       * 
75       * @param file
76       *            the input file
77       * @param bufferSize
78       *            the buffer size for reading bytes from the file.
79       * @param <T>
80       *            the generic type of the deserialized objects returned in the
81       *            stream
82       * @return the stream of deserialized objects from the {@link InputStream}
83       *         as a {@link Flowable}.
84       */
85      public static <T extends Serializable> Flowable<T> read(final File file, final int bufferSize) {
86          Callable<ObjectInputStream> resourceFactory = new Callable<ObjectInputStream>() {
87              @Override
88              public ObjectInputStream call() throws IOException {
89                  return new ObjectInputStream(new BufferedInputStream(new FileInputStream(file), bufferSize));
90              }
91          };
92          @SuppressWarnings("unchecked")
93          Function<ObjectInputStream, Flowable<T>> flowableFactory = (Function<ObjectInputStream, Flowable<T>>) (Function<?, ?>) ObjectInputStreamFlowableFactoryHolder.INSTANCE;
94          Consumer<ObjectInputStream> disposeAction = Consumers.close();
95          return Flowable.using(resourceFactory, flowableFactory, disposeAction, true);
96      }
97  
98      // singleton instance using Holder pattern
99      private static final class ObjectInputStreamFlowableFactoryHolder {
100         static final Function<ObjectInputStream, Flowable<Serializable>> INSTANCE = new Function<ObjectInputStream, Flowable<Serializable>>() {
101 
102             @Override
103             public Flowable<Serializable> apply(ObjectInputStream is) throws Exception {
104                 return read(is);
105             }
106 
107         };
108     }
109 
110     /**
111      * Returns the deserialized objects from the given {@link File} as a
112      * {@link Flowable} stream. A buffer size of 8192 bytes is used by default.
113      * 
114      * @param file
115      *            the input file containing serialized java objects
116      * @param <T>
117      *            the generic type of the deserialized objects returned in the
118      *            stream
119      * @return the stream of deserialized objects from the {@link InputStream}
120      *         as an {@link Flowable}.
121      */
122     public static <T extends Serializable> Flowable<T> read(final File file) {
123         return read(file, DEFAULT_BUFFER_SIZE);
124     }
125 
126     /**
127      * Returns a duplicate of the input stream but with the side effect that
128      * emissions from the source are written to the {@link ObjectOutputStream}.
129      * 
130      * @param source
131      *            the source of objects to write
132      * @param oos
133      *            the output stream to write to
134      * @param <T>
135      *            the generic type of the objects being serialized
136      * @return re-emits the input stream
137      */
138     public static <T extends Serializable> Flowable<T> write(Flowable<T> source, final ObjectOutputStream oos) {
139         return source.doOnNext(new Consumer<T>() {
140 
141             @Override
142             public void accept(T t) throws IOException {
143                 oos.writeObject(t);
144             }
145         });
146     }
147 
148     /**
149      * Writes the source stream to the given file in given append mode and using
150      * the given buffer size.
151      * 
152      * @param source
153      *            flowable stream to write
154      * @param file
155      *            file to write to
156      * @param append
157      *            if true writes are appended to file otherwise overwrite the
158      *            file
159      * @param bufferSize
160      *            the buffer size in bytes to use.
161      * @param <T>
162      *            the generic type of the input stream
163      * @return re-emits the input stream
164      */
165     public static <T extends Serializable> Flowable<T> write(final Flowable<T> source, final File file,
166             final boolean append, final int bufferSize) {
167         Callable<ObjectOutputStream> resourceFactory = new Callable<ObjectOutputStream>() {
168             @Override
169             public ObjectOutputStream call() throws IOException {
170                 return new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file, append), bufferSize));
171             }
172         };
173         Function<ObjectOutputStream, Flowable<T>> flowableFactory = new Function<ObjectOutputStream, Flowable<T>>() {
174 
175             @Override
176             public Flowable<T> apply(ObjectOutputStream oos) {
177                 return write(source, oos);
178             }
179         };
180         Consumer<ObjectOutputStream> disposeAction = Consumers.close();
181         return Flowable.using(resourceFactory, flowableFactory, disposeAction, true);
182     }
183 
184     /**
185      * Writes the source stream to the given file in given append mode and using
186      * the a buffer size of 8192 bytes.
187      * 
188      * @param source
189      *            flowable stream to write
190      * @param file
191      *            file to write to
192      * @param append
193      *            if true writes are appended to file otherwise overwrite the
194      *            file
195      * @param <T>
196      *            the generic type of the input stream
197      * @return re-emits the input stream
198      */
199     public static <T extends Serializable> Flowable<T> write(final Flowable<T> source, final File file,
200             final boolean append) {
201         return write(source, file, append, DEFAULT_BUFFER_SIZE);
202     }
203 
204     /**
205      * Writes the source stream to the given file in given append mode and using
206      * the a buffer size of 8192 bytes.
207      * 
208      * @param source
209      *            flowable stream to write
210      * @param file
211      *            file to write to
212      * @param <T>
213      *            the generic type of the input stream
214      * @return re-emits the input stream
215      */
216     public static <T extends Serializable> Flowable<T> write(final Flowable<T> source, final File file) {
217         return write(source, file, false, DEFAULT_BUFFER_SIZE);
218     }
219 
220     public static KryoBuilder kryo() {
221         return kryo(new Kryo());
222     }
223 
224     public static KryoBuilder kryo(Kryo kryo) {
225         return new KryoBuilder(kryo);
226     }
227 
228     public static class KryoBuilder {
229 
230         private static final int DEFAULT_BUFFER_SIZE = 4096;
231 
232         private final Kryo kryo;
233 
234         private KryoBuilder(Kryo kryo) {
235             this.kryo = kryo;
236         }
237 
238         public <T> Flowable<T> write(final Flowable<T> source, final File file) {
239             return write(source, file, false, DEFAULT_BUFFER_SIZE);
240         }
241 
242         public <T> Flowable<T> write(final Flowable<T> source, final File file, boolean append) {
243             return write(source, file, append, DEFAULT_BUFFER_SIZE);
244         }
245 
246         public <T> Flowable<T> write(final Flowable<T> source, final File file, final boolean append,
247                 final int bufferSize) {
248             Callable<Output> resourceFactory = new Callable<Output>() {
249                 @Override
250                 public Output call() throws FileNotFoundException {
251                     return new Output(new FileOutputStream(file, append), bufferSize);
252                 }
253             };
254             Function<Output, Flowable<T>> flowableFactory = new Function<Output, Flowable<T>>() {
255 
256                 @Override
257                 public Flowable<T> apply(final Output output) {
258                     return source.doOnNext(new Consumer<T>() {
259                         @Override
260                         public void accept(T t) {
261                             kryo.writeObject(output, t);
262                         }
263                     });
264                 }
265             };
266             Consumer<Output> disposeAction = Consumers.close();
267             return Flowable.using(resourceFactory, flowableFactory, disposeAction, true);
268         }
269 
270         public <T> Flowable<T> read(Class<T> cls, final File file) {
271             return read(cls, file, DEFAULT_BUFFER_SIZE);
272         }
273 
274         public <T> Flowable<T> read(final Class<T> cls, final File file, final int bufferSize) {
275             Callable<Input> resourceFactory = new Callable<Input>() {
276                 @Override
277                 public Input call() throws FileNotFoundException {
278                     return new Input(new FileInputStream(file), bufferSize);
279                 }
280             };
281             Function<Input, Flowable<T>> flowableFactory = new Function<Input, Flowable<T>>() {
282 
283                 @Override
284                 public Flowable<T> apply(final Input input) {
285                     return read(cls, input);
286                 }
287             };
288             Consumer<Input> disposeAction = Consumers.close();
289             return Flowable.using(resourceFactory, flowableFactory, disposeAction, true);
290         }
291 
292         public <T> Flowable<T> read(final Class<T> cls, final Input input) {
293 
294             return Flowable.generate(new Consumer<Emitter<T>>() {
295 
296                 @Override
297                 public void accept(Emitter<T> emitter) throws Exception {
298                     if (input.eof()) {
299                         emitter.onComplete();
300                     } else {
301                         T t = kryo.readObject(input, cls);
302                         emitter.onNext(t);
303                     }
304                 }
305 
306             });
307         }
308     }
309 
310 }