1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.io.File;
4 import java.util.concurrent.atomic.AtomicInteger;
5 import java.util.concurrent.atomic.AtomicLong;
6 import java.util.concurrent.atomic.AtomicReference;
7
8 import com.github.davidmoten.rx.buffertofile.DataSerializer;
9 import com.github.davidmoten.rx.buffertofile.Options;
10 import com.github.davidmoten.util.Preconditions;
11
12 import rx.Observable;
13 import rx.Observable.OnSubscribe;
14 import rx.Observable.Operator;
15 import rx.Producer;
16 import rx.Scheduler;
17 import rx.Scheduler.Worker;
18 import rx.Subscriber;
19 import rx.exceptions.Exceptions;
20 import rx.functions.Action0;
21 import rx.functions.Func0;
22 import rx.internal.operators.BackpressureUtils;
23 import rx.observers.Subscribers;
24
25 public final class OperatorBufferToFile<T> implements Operator<T, T> {
26
27 private final DataSerializer<T> dataSerializer;
28 private final Scheduler scheduler;
29 private final Options options;
30
31 public OperatorBufferToFile(DataSerializer<T> dataSerializer, Scheduler scheduler,
32 Options options) {
33 Preconditions.checkNotNull(dataSerializer);
34 Preconditions.checkNotNull(scheduler);
35 Preconditions.checkNotNull(options);
36 this.scheduler = scheduler;
37 this.dataSerializer = dataSerializer;
38 this.options = options;
39 }
40
41 @Override
42 public Subscriber<? super T> call(Subscriber<? super T> child) {
43
44
45 final QueueWithSubscription<T> queue = createFileBasedQueue(dataSerializer, options);
46
47
48
49 final AtomicReference<QueueProducer<T>> queueProducer = new AtomicReference<QueueProducer<T>>();
50
51
52 final Worker worker = scheduler.createWorker();
53
54
55 Observable<T> source = Observable
56 .create(new OnSubscribeFromQueue<T>(queueProducer, queue, worker, options));
57
58
59 Subscriber<T> parentSubscriber = new ParentSubscriber<T>(queueProducer);
60
61
62 child.add(parentSubscriber);
63
64
65 child.add(queue);
66
67
68 Subscriber<T> wrappedChild = Subscribers.wrap(child);
69
70
71 child.add(worker);
72
73
74 source.unsafeSubscribe(wrappedChild);
75
76 return parentSubscriber;
77 }
78
79 private static final boolean MEMORY_MAPPED = "true"
80 .equals(System.getProperty("memory.mappped"));
81
82 private static <T> QueueWithSubscription<T> createFileBasedQueue(
83 final DataSerializer<T> dataSerializer, final Options options) {
84 if (MEMORY_MAPPED) {
85
86 final int size;
87 if (options.rolloverSizeBytes() > Integer.MAX_VALUE) {
88 size = 20 * 1024 * 1024;
89 } else {
90 size = (int) options.rolloverSizeBytes();
91 }
92 return new FileBasedSPSCQueueMemoryMapped<T>(options.fileFactory(), size,
93 dataSerializer);
94 }
95 if (options.rolloverEvery() == Long.MAX_VALUE
96 && options.rolloverSizeBytes() == Long.MAX_VALUE) {
97
98 return new QueueWithResourcesNonBlockingUnsubscribe<T>(new FileBasedSPSCQueue<T>(
99 options.bufferSizeBytes(), options.fileFactory().call(), dataSerializer));
100 } else {
101 final Func0<QueueWithResources<T>> queueFactory = new Func0<QueueWithResources<T>>() {
102 @Override
103 public QueueWithResources<T> call() {
104
105
106
107 File file = options.fileFactory().call();
108
109 return new FileBasedSPSCQueue<T>(options.bufferSizeBytes(), file,
110 dataSerializer);
111 }
112 };
113
114
115
116
117 return new QueueWithResourcesNonBlockingUnsubscribe<T>(new RollingSPSCQueue<T>(
118 queueFactory, options.rolloverSizeBytes(), options.rolloverEvery()));
119 }
120 }
121
122 private static final class OnSubscribeFromQueue<T> implements OnSubscribe<T> {
123
124 private final AtomicReference<QueueProducer<T>> queueProducer;
125 private final QueueWithSubscription<T> queue;
126 private final Worker worker;
127 private final Options options;
128
129 OnSubscribeFromQueue(AtomicReference<QueueProducer<T>> queueProducer,
130 QueueWithSubscription<T> queue, Worker worker, Options options) {
131 this.queueProducer = queueProducer;
132 this.queue = queue;
133 this.worker = worker;
134 this.options = options;
135 }
136
137 @Override
138 public void call(Subscriber<? super T> child) {
139 QueueProducer<T> qp = new QueueProducer<T>(queue, child, worker, options.delayError());
140 queueProducer.set(qp);
141 child.setProducer(qp);
142 }
143 }
144
145 private static final class ParentSubscriber<T> extends Subscriber<T> {
146
147 private final AtomicReference<QueueProducer<T>> queueProducer;
148
149 ParentSubscriber(AtomicReference<QueueProducer<T>> queueProducer) {
150 this.queueProducer = queueProducer;
151 }
152
153 @Override
154 public void onStart() {
155 request(Long.MAX_VALUE);
156 }
157
158 @Override
159 public void onCompleted() {
160 queueProducer.get().onCompleted();
161 }
162
163 @Override
164 public void onError(Throwable e) {
165 queueProducer.get().onError(e);
166 }
167
168 @Override
169 public void onNext(T t) {
170 queueProducer.get().onNext(t);
171 }
172
173 }
174
175 private static final class QueueProducer<T> extends AtomicLong implements Producer, Action0 {
176
177
178
179 private static final long serialVersionUID = 2521533710633950102L;
180
181 private final QueueWithSubscription<T> queue;
182 private final AtomicInteger drainRequested = new AtomicInteger(0);
183 private final Subscriber<? super T> child;
184 private final Worker worker;
185 private final boolean delayError;
186 private volatile boolean done;
187
188
189
190 private Throwable error = null;
191
192 QueueProducer(QueueWithSubscription<T> queue, Subscriber<? super T> child, Worker worker,
193 boolean delayError) {
194 super();
195 this.queue = queue;
196 this.child = child;
197 this.worker = worker;
198 this.delayError = delayError;
199 this.done = false;
200 }
201
202 void onNext(T t) {
203 try {
204 if (!queue.offer(t)) {
205 onError(new RuntimeException(
206 "could not place item on queue (queue.offer(item) returned false), item= "
207 + t));
208 return;
209 } else {
210 drain();
211 }
212 } catch (Throwable e) {
213 Exceptions.throwIfFatal(e);
214 onError(e);
215 }
216 }
217
218 void onError(Throwable e) {
219
220
221
222 error = e;
223 done = true;
224 drain();
225 }
226
227 void onCompleted() {
228 done = true;
229 drain();
230 }
231
232 @Override
233 public void request(long n) {
234 if (n > 0) {
235 BackpressureUtils.getAndAddRequest(this, n);
236 drain();
237 }
238 }
239
240 private void drain() {
241
242
243
244
245 if (!child.isUnsubscribed() && drainRequested.getAndIncrement() == 0) {
246 worker.schedule(this);
247 }
248 }
249
250
251 @Override
252 public void call() {
253
254 try {
255 drainNow();
256 } catch (Throwable e) {
257 child.onError(e);
258 }
259 }
260
261 private void drainNow() {
262 if (child.isUnsubscribed()) {
263
264
265 return;
266 }
267
268 long requests = get();
269
270 for (;;) {
271
272 drainRequested.set(1);
273 long emitted = 0;
274 while (emitted < requests) {
275 if (child.isUnsubscribed()) {
276
277
278 return;
279 }
280 T item = queue.poll();
281 if (item == null) {
282
283 if (finished()) {
284 return;
285 } else {
286
287
288
289
290 break;
291 }
292 } else {
293
294 if (NullSentinel.isNullSentinel(item)) {
295 child.onNext(null);
296 } else {
297 child.onNext(item);
298 }
299 emitted++;
300 }
301 }
302
303 requests = BackpressureUtils.produced(this, emitted);
304 if (child.isUnsubscribed() || (requests == 0L && finished())) {
305 return;
306 }
307 }
308 }
309
310 private boolean finished() {
311
312
313
314 if (done) {
315 Throwable t = error;
316 if (queue.isEmpty()) {
317
318
319 queue.unsubscribe();
320
321 if (t != null) {
322 child.onError(t);
323 } else {
324 child.onCompleted();
325 }
326
327
328 return true;
329 } else if (t != null && !delayError) {
330
331
332
333
334
335 queue.unsubscribe();
336
337
338 child.onError(t);
339
340
341
342 return true;
343 } else {
344
345
346
347 return drainRequested.compareAndSet(1, 0);
348 }
349 } else {
350 return drainRequested.compareAndSet(1, 0);
351 }
352 }
353 }
354 }