View Javadoc
1   package com.github.davidmoten.rx2.buffertofile;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.io.Serializable;
6   import java.util.concurrent.Callable;
7   
8   import org.reactivestreams.Publisher;
9   
10  import com.github.davidmoten.guavamini.Preconditions;
11  import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
12  import com.github.davidmoten.rx2.internal.flowable.buffertofile.FlowableOnBackpressureBufferToFile;
13  
14  import io.reactivex.Flowable;
15  import io.reactivex.FlowableTransformer;
16  import io.reactivex.Observable;
17  import io.reactivex.Scheduler;
18  import io.reactivex.functions.Function;
19  import io.reactivex.schedulers.Schedulers;
20  
21  public final class Options {
22  
23      public static final String DEFAULT_FILE_PREFIX = "bufferToFile_";
24  
25      private final Callable<File> fileFactory;
26      private final int pageSizeBytes;
27      private final Scheduler scheduler;
28  
29      @VisibleForTesting
30      Options(Callable<File> filefactory, int pageSizeBytes, Scheduler scheduler) {
31          Preconditions.checkNotNull(filefactory);
32          Preconditions.checkArgument(pageSizeBytes > 0, "bufferSizeBytes must be greater than 0");
33          Preconditions.checkNotNull(scheduler);
34          this.fileFactory = filefactory;
35          this.pageSizeBytes = pageSizeBytes;
36          this.scheduler = scheduler;
37      }
38  
39      public Callable<File> fileFactory() {
40          return fileFactory;
41      }
42  
43      public int pageSizeBytes() {
44          return pageSizeBytes;
45      }
46  
47      public Scheduler scheduler() {
48          return scheduler;
49      }
50  
51      public static BuilderFlowable builderFlowable() {
52          return new BuilderFlowable();
53      }
54  
55      public static BuilderObservable builderObservable() {
56          return new BuilderObservable();
57      }
58  
59      public static final class BuilderFlowable {
60  
61          private Callable<File> fileFactory = FileFactoryHolder.INSTANCE;
62          private int pageSizeBytes = 1024 * 1024;
63          private Scheduler scheduler = Schedulers.io();
64  
65          BuilderFlowable() {
66          }
67  
68          /**
69           * Sets the page size in bytes. A page corresponds to a single memory
70           * mapped file. If this method is not called the default value is 1MB
71           * (1024*1024 bytes).
72           * 
73           * @param pageSizeBytes
74           *            the page size in bytes.
75           * @return this
76           */
77          public BuilderFlowable pageSizeBytes(int pageSizeBytes) {
78              this.pageSizeBytes = pageSizeBytes;
79              return this;
80          }
81  
82          /**
83           * Sets the scheduler to use for reading items from files and emitting
84           * them.
85           * 
86           * @param scheduler
87           *            for emitting items
88           * @return this
89           */
90          public BuilderFlowable scheduler(Scheduler scheduler) {
91              this.scheduler = scheduler;
92              return this;
93          }
94  
95          /**
96           * Sets the file factory to be used by the queue storage mechanism.
97           * Defaults to using {@code File.createTempFile("bufferToFileDb","")} if
98           * this method is not called.
99           * 
100          * @param fileFactory
101          *            the factory
102          * @return the current builder
103          */
104         public BuilderFlowable fileFactory(Callable<File> fileFactory) {
105             this.fileFactory = fileFactory;
106             return this;
107         }
108 
109         /**
110          * Returns a Flowable Transformer to buffer stream items to disk
111          * (files). The serialization method uses
112          * {@code java.io.ObjectOutputStream} for writing values to disk and
113          * {@code java.io.ObjectInputStream} for reading values from disk and
114          * emitting them. Note that this is a pretty verbose serialization
115          * method for small items in that a 4 byte Integer is serialized to an
116          * 81 byte array!
117          * 
118          * @param <T>
119          *            stream item type
120          * @return FlowableTransformer using the java io serializer
121          */
122         public <T extends Serializable> FlowableTransformer<T, T> serializerJavaIO() {
123             return serializer(Serializers.<T>javaIO());
124         }
125 
126         /**
127          * Returns a Flowable Transformer that buffers items to disk (files).
128          * The serialization method passes through byte arrays directly for both
129          * writing and reading.
130          * 
131          * @return FlowableTransformer using the bytes serializer
132          */
133         public FlowableTransformer<byte[], byte[]> serializerBytes() {
134             return serializer(Serializers.bytes());
135         }
136 
137         public FlowableTransformer<String, String> serializerUtf8() {
138             return serializer(Serializers.utf8());
139         }
140 
141         public <T> FlowableTransformer<T, T> serializer(final Serializer<T> serializer) {
142             final Options options = new Options(fileFactory, pageSizeBytes, scheduler);
143             return new FlowableTransformer<T, T>() {
144 
145                 @Override
146                 public Publisher<T> apply(Flowable<T> source) {
147                     return new FlowableOnBackpressureBufferToFile<T>(source, null, options, serializer);
148                 }
149             };
150         }
151 
152         public <T> FlowableTransformer<T, T> serializer(DataSerializer<T> ds) {
153             return serializer(Serializers.from(ds));
154         }
155     }
156 
157     public static final class BuilderObservable {
158 
159         private Callable<File> fileFactory = FileFactoryHolder.INSTANCE;
160         private int pageSizeBytes = 1024 * 1024;
161         private Scheduler scheduler = Schedulers.io();
162 
163         BuilderObservable() {
164         }
165 
166         /**
167          * Sets the page size in bytes. A page corresponds to a single memory
168          * mapped file. If this method is not called the default value is 1MB
169          * (1024*1024 bytes).
170          * 
171          * @param pageSizeBytes
172          *            the page size in bytes.
173          * @return this
174          */
175         public BuilderObservable pageSizeBytes(int pageSizeBytes) {
176             this.pageSizeBytes = pageSizeBytes;
177             return this;
178         }
179 
180         /**
181          * Sets the scheduler to use for reading items from files and emitting
182          * them.
183          * 
184          * @param scheduler
185          *            for emitting items
186          * @return this
187          */
188         public BuilderObservable scheduler(Scheduler scheduler) {
189             this.scheduler = scheduler;
190             return this;
191         }
192 
193         /**
194          * Sets the file factory to be used by the queue storage mechanism.
195          * Defaults to using {@code File.createTempFile("bufferToFileDb","")} if
196          * this method is not called.
197          * 
198          * @param fileFactory
199          *            the factory
200          * @return the current builder
201          */
202         public BuilderObservable fileFactory(Callable<File> fileFactory) {
203             this.fileFactory = fileFactory;
204             return this;
205         }
206 
207         /**
208          * Returns a Flowable Transformer to buffer stream items to disk
209          * (files). The serialization method uses
210          * {@code java.io.ObjectOutputStream} for writing values to disk and
211          * {@code java.io.ObjectInputStream} for reading values from disk and
212          * emitting them. Note that this is a pretty verbose serialization
213          * method for small items in that a 4 byte Integer is serialized to an
214          * 81 byte array!
215          * 
216          * @param <T>
217          *            stream item type
218          * @return FlowableTransformer using the java io serializer
219          */
220         public <T extends Serializable> Function<Observable<T>, Flowable<T>> serializerJavaIO() {
221             return serializer(Serializers.<T>javaIO());
222         }
223 
224         /**
225          * Returns a Flowable Transformer that buffers items to disk (files).
226          * The serialization method passes through byte arrays directly for both
227          * writing and reading.
228          * 
229          * @return FlowableTransformer using the bytes serializer
230          */
231         public Function<Observable<byte[]>, Flowable<byte[]>> serializerBytes() {
232             return serializer(Serializers.bytes());
233         }
234 
235         public Function<Observable<String>, Flowable<String>> serializerUtf8() {
236             return serializer(Serializers.utf8());
237         }
238 
239         public <T> Function<Observable<T>, Flowable<T>> serializer(final Serializer<T> serializer) {
240             final Options options = new Options(fileFactory, pageSizeBytes, scheduler);
241             return new Function<Observable<T>, Flowable<T>>() {
242 
243                 @Override
244                 public Flowable<T> apply(Observable<T> source) {
245                     return new FlowableOnBackpressureBufferToFile<T>(null, source, options, serializer);
246                 }
247             };
248         }
249 
250         public <T> Function<Observable<T>, Flowable<T>> serializer(DataSerializer<T> ds) {
251             return serializer(Serializers.from(ds));
252         }
253     }
254 
255     @VisibleForTesting
256     static final class FileFactoryHolder {
257         private static final Callable<File> INSTANCE = new Callable<File>() {
258             @Override
259             public File call() {
260                 try {
261                     return File.createTempFile(DEFAULT_FILE_PREFIX, ".obj");
262                 } catch (IOException e) {
263                     throw new RuntimeException(e);
264                 }
265             }
266         };
267     }
268 
269 }