1 package com.github.davidmoten.rx2.buffertofile;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.Serializable;
6 import java.util.concurrent.Callable;
7
8 import org.reactivestreams.Publisher;
9
10 import com.github.davidmoten.guavamini.Preconditions;
11 import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
12 import com.github.davidmoten.rx2.internal.flowable.buffertofile.FlowableOnBackpressureBufferToFile;
13
14 import io.reactivex.Flowable;
15 import io.reactivex.FlowableTransformer;
16 import io.reactivex.Observable;
17 import io.reactivex.Scheduler;
18 import io.reactivex.functions.Function;
19 import io.reactivex.schedulers.Schedulers;
20
21 public final class Options {
22
23 public static final String DEFAULT_FILE_PREFIX = "bufferToFile_";
24
25 private final Callable<File> fileFactory;
26 private final int pageSizeBytes;
27 private final Scheduler scheduler;
28
29 @VisibleForTesting
30 Options(Callable<File> filefactory, int pageSizeBytes, Scheduler scheduler) {
31 Preconditions.checkNotNull(filefactory);
32 Preconditions.checkArgument(pageSizeBytes > 0, "bufferSizeBytes must be greater than 0");
33 Preconditions.checkNotNull(scheduler);
34 this.fileFactory = filefactory;
35 this.pageSizeBytes = pageSizeBytes;
36 this.scheduler = scheduler;
37 }
38
39 public Callable<File> fileFactory() {
40 return fileFactory;
41 }
42
43 public int pageSizeBytes() {
44 return pageSizeBytes;
45 }
46
47 public Scheduler scheduler() {
48 return scheduler;
49 }
50
51 public static BuilderFlowable builderFlowable() {
52 return new BuilderFlowable();
53 }
54
55 public static BuilderObservable builderObservable() {
56 return new BuilderObservable();
57 }
58
59 public static final class BuilderFlowable {
60
61 private Callable<File> fileFactory = FileFactoryHolder.INSTANCE;
62 private int pageSizeBytes = 1024 * 1024;
63 private Scheduler scheduler = Schedulers.io();
64
65 BuilderFlowable() {
66 }
67
68
69
70
71
72
73
74
75
76
77 public BuilderFlowable pageSizeBytes(int pageSizeBytes) {
78 this.pageSizeBytes = pageSizeBytes;
79 return this;
80 }
81
82
83
84
85
86
87
88
89
90 public BuilderFlowable scheduler(Scheduler scheduler) {
91 this.scheduler = scheduler;
92 return this;
93 }
94
95
96
97
98
99
100
101
102
103
104 public BuilderFlowable fileFactory(Callable<File> fileFactory) {
105 this.fileFactory = fileFactory;
106 return this;
107 }
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 public <T extends Serializable> FlowableTransformer<T, T> serializerJavaIO() {
123 return serializer(Serializers.<T>javaIO());
124 }
125
126
127
128
129
130
131
132
133 public FlowableTransformer<byte[], byte[]> serializerBytes() {
134 return serializer(Serializers.bytes());
135 }
136
137 public FlowableTransformer<String, String> serializerUtf8() {
138 return serializer(Serializers.utf8());
139 }
140
141 public <T> FlowableTransformer<T, T> serializer(final Serializer<T> serializer) {
142 final Options options = new Options(fileFactory, pageSizeBytes, scheduler);
143 return new FlowableTransformer<T, T>() {
144
145 @Override
146 public Publisher<T> apply(Flowable<T> source) {
147 return new FlowableOnBackpressureBufferToFile<T>(source, null, options, serializer);
148 }
149 };
150 }
151
152 public <T> FlowableTransformer<T, T> serializer(DataSerializer<T> ds) {
153 return serializer(Serializers.from(ds));
154 }
155 }
156
157 public static final class BuilderObservable {
158
159 private Callable<File> fileFactory = FileFactoryHolder.INSTANCE;
160 private int pageSizeBytes = 1024 * 1024;
161 private Scheduler scheduler = Schedulers.io();
162
163 BuilderObservable() {
164 }
165
166
167
168
169
170
171
172
173
174
175 public BuilderObservable pageSizeBytes(int pageSizeBytes) {
176 this.pageSizeBytes = pageSizeBytes;
177 return this;
178 }
179
180
181
182
183
184
185
186
187
188 public BuilderObservable scheduler(Scheduler scheduler) {
189 this.scheduler = scheduler;
190 return this;
191 }
192
193
194
195
196
197
198
199
200
201
202 public BuilderObservable fileFactory(Callable<File> fileFactory) {
203 this.fileFactory = fileFactory;
204 return this;
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220 public <T extends Serializable> Function<Observable<T>, Flowable<T>> serializerJavaIO() {
221 return serializer(Serializers.<T>javaIO());
222 }
223
224
225
226
227
228
229
230
231 public Function<Observable<byte[]>, Flowable<byte[]>> serializerBytes() {
232 return serializer(Serializers.bytes());
233 }
234
235 public Function<Observable<String>, Flowable<String>> serializerUtf8() {
236 return serializer(Serializers.utf8());
237 }
238
239 public <T> Function<Observable<T>, Flowable<T>> serializer(final Serializer<T> serializer) {
240 final Options options = new Options(fileFactory, pageSizeBytes, scheduler);
241 return new Function<Observable<T>, Flowable<T>>() {
242
243 @Override
244 public Flowable<T> apply(Observable<T> source) {
245 return new FlowableOnBackpressureBufferToFile<T>(null, source, options, serializer);
246 }
247 };
248 }
249
250 public <T> Function<Observable<T>, Flowable<T>> serializer(DataSerializer<T> ds) {
251 return serializer(Serializers.from(ds));
252 }
253 }
254
255 @VisibleForTesting
256 static final class FileFactoryHolder {
257 private static final Callable<File> INSTANCE = new Callable<File>() {
258 @Override
259 public File call() {
260 try {
261 return File.createTempFile(DEFAULT_FILE_PREFIX, ".obj");
262 } catch (IOException e) {
263 throw new RuntimeException(e);
264 }
265 }
266 };
267 }
268
269 }