View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.io.File;
4   import java.util.concurrent.atomic.AtomicInteger;
5   import java.util.concurrent.atomic.AtomicLong;
6   import java.util.concurrent.atomic.AtomicReference;
7   
8   import com.github.davidmoten.rx.buffertofile.DataSerializer;
9   import com.github.davidmoten.rx.buffertofile.Options;
10  import com.github.davidmoten.util.Preconditions;
11  
12  import rx.Observable;
13  import rx.Observable.OnSubscribe;
14  import rx.Observable.Operator;
15  import rx.Producer;
16  import rx.Scheduler;
17  import rx.Scheduler.Worker;
18  import rx.Subscriber;
19  import rx.exceptions.Exceptions;
20  import rx.functions.Action0;
21  import rx.functions.Func0;
22  import rx.internal.operators.BackpressureUtils;
23  import rx.observers.Subscribers;
24  
25  public final class OperatorBufferToFile<T> implements Operator<T, T> {
26  
27      private final DataSerializer<T> dataSerializer;
28      private final Scheduler scheduler;
29      private final Options options;
30  
31      public OperatorBufferToFile(DataSerializer<T> dataSerializer, Scheduler scheduler,
32              Options options) {
33          Preconditions.checkNotNull(dataSerializer);
34          Preconditions.checkNotNull(scheduler);
35          Preconditions.checkNotNull(options);
36          this.scheduler = scheduler;
37          this.dataSerializer = dataSerializer;
38          this.options = options;
39      }
40  
41      @Override
42      public Subscriber<? super T> call(Subscriber<? super T> child) {
43  
44          // create the file based queue
45          final QueueWithSubscription<T> queue = createFileBasedQueue(dataSerializer, options);
46  
47          // hold a reference to the queueProducer which will be set on
48          // subscription to `source`
49          final AtomicReference<QueueProducer<T>> queueProducer = new AtomicReference<QueueProducer<T>>();
50  
51          // emissions will propagate to downstream via this worker
52          final Worker worker = scheduler.createWorker();
53  
54          // set up the observable to read from the file based queue
55          Observable<T> source = Observable
56                  .create(new OnSubscribeFromQueue<T>(queueProducer, queue, worker, options));
57  
58          // create the parent subscriber
59          Subscriber<T> parentSubscriber = new ParentSubscriber<T>(queueProducer);
60  
61          // link unsubscription
62          child.add(parentSubscriber);
63  
64          // close and delete file based queues in RollingQueue on unsubscription
65          child.add(queue);
66  
67          // ensure onStart not called twice
68          Subscriber<T> wrappedChild = Subscribers.wrap(child);
69  
70          // ensure worker gets unsubscribed (last)
71          child.add(worker);
72  
73          // subscribe to queue
74          source.unsafeSubscribe(wrappedChild);
75  
76          return parentSubscriber;
77      }
78  
79      private static final boolean MEMORY_MAPPED = "true"
80              .equals(System.getProperty("memory.mappped"));
81  
82      private static <T> QueueWithSubscription<T> createFileBasedQueue(
83              final DataSerializer<T> dataSerializer, final Options options) {
84          if (MEMORY_MAPPED) {
85              // warning: still in development!
86              final int size;
87              if (options.rolloverSizeBytes() > Integer.MAX_VALUE) {
88                  size = 20 * 1024 * 1024;// 20MB
89              } else {
90                  size = (int) options.rolloverSizeBytes();
91              }
92              return new FileBasedSPSCQueueMemoryMapped<T>(options.fileFactory(), size,
93                      dataSerializer);
94          }
95          if (options.rolloverEvery() == Long.MAX_VALUE
96                  && options.rolloverSizeBytes() == Long.MAX_VALUE) {
97              // skip the Rollover version
98              return new QueueWithResourcesNonBlockingUnsubscribe<T>(new FileBasedSPSCQueue<T>(
99                      options.bufferSizeBytes(), options.fileFactory().call(), dataSerializer));
100         } else {
101             final Func0<QueueWithResources<T>> queueFactory = new Func0<QueueWithResources<T>>() {
102                 @Override
103                 public QueueWithResources<T> call() {
104                     // create the file to be used for queue storage (and whose
105                     // file name will determine the names of other files used
106                     // for storage if multiple are required per queue)
107                     File file = options.fileFactory().call();
108 
109                     return new FileBasedSPSCQueue<T>(options.bufferSizeBytes(), file,
110                             dataSerializer);
111                 }
112             };
113             // the wrapping class ensures that unsubscribe happens in the same
114             // thread as the offer or poll which avoids the unsubscribe action
115             // not getting a time-slice so that the open file limit is not
116             // exceeded (new files are opened in the offer() call).
117             return new QueueWithResourcesNonBlockingUnsubscribe<T>(new RollingSPSCQueue<T>(
118                     queueFactory, options.rolloverSizeBytes(), options.rolloverEvery()));
119         }
120     }
121 
122     private static final class OnSubscribeFromQueue<T> implements OnSubscribe<T> {
123 
124         private final AtomicReference<QueueProducer<T>> queueProducer;
125         private final QueueWithSubscription<T> queue;
126         private final Worker worker;
127         private final Options options;
128 
129         OnSubscribeFromQueue(AtomicReference<QueueProducer<T>> queueProducer,
130                 QueueWithSubscription<T> queue, Worker worker, Options options) {
131             this.queueProducer = queueProducer;
132             this.queue = queue;
133             this.worker = worker;
134             this.options = options;
135         }
136 
137         @Override
138         public void call(Subscriber<? super T> child) {
139             QueueProducer<T> qp = new QueueProducer<T>(queue, child, worker, options.delayError());
140             queueProducer.set(qp);
141             child.setProducer(qp);
142         }
143     }
144 
145     private static final class ParentSubscriber<T> extends Subscriber<T> {
146 
147         private final AtomicReference<QueueProducer<T>> queueProducer;
148 
149         ParentSubscriber(AtomicReference<QueueProducer<T>> queueProducer) {
150             this.queueProducer = queueProducer;
151         }
152 
153         @Override
154         public void onStart() {
155             request(Long.MAX_VALUE);
156         }
157 
158         @Override
159         public void onCompleted() {
160             queueProducer.get().onCompleted();
161         }
162 
163         @Override
164         public void onError(Throwable e) {
165             queueProducer.get().onError(e);
166         }
167 
168         @Override
169         public void onNext(T t) {
170             queueProducer.get().onNext(t);
171         }
172 
173     }
174 
175     private static final class QueueProducer<T> extends AtomicLong implements Producer, Action0 {
176 
177         // inherits from AtomicLong to represent the oustanding requests count
178 
179         private static final long serialVersionUID = 2521533710633950102L;
180 
181         private final QueueWithSubscription<T> queue;
182         private final AtomicInteger drainRequested = new AtomicInteger(0);
183         private final Subscriber<? super T> child;
184         private final Worker worker;
185         private final boolean delayError;
186         private volatile boolean done;
187 
188         // Is set just before the volatile `done` is set and read just after
189         // `done` is read. Thus doesn't need to be volatile.
190         private Throwable error = null;
191 
192         QueueProducer(QueueWithSubscription<T> queue, Subscriber<? super T> child, Worker worker,
193                 boolean delayError) {
194             super();
195             this.queue = queue;
196             this.child = child;
197             this.worker = worker;
198             this.delayError = delayError;
199             this.done = false;
200         }
201 
202         void onNext(T t) {
203             try {
204                 if (!queue.offer(t)) {
205                     onError(new RuntimeException(
206                             "could not place item on queue (queue.offer(item) returned false), item= "
207                                     + t));
208                     return;
209                 } else {
210                     drain();
211                 }
212             } catch (Throwable e) {
213                 Exceptions.throwIfFatal(e);
214                 onError(e);
215             }
216         }
217 
218         void onError(Throwable e) {
219             // must assign error before assign done = true to avoid race
220             // condition in finished() and also so appropriate memory barrier in
221             // place given error is non-volatile
222             error = e;
223             done = true;
224             drain();
225         }
226 
227         void onCompleted() {
228             done = true;
229             drain();
230         }
231 
232         @Override
233         public void request(long n) {
234             if (n > 0) {
235                 BackpressureUtils.getAndAddRequest(this, n);
236                 drain();
237             }
238         }
239 
240         private void drain() {
241             // only schedule a drain if current drain has finished
242             // otherwise the drainRequested counter will be incremented
243             // and the drain loop will ensure that another drain cycle occurs if
244             // required
245             if (!child.isUnsubscribed() && drainRequested.getAndIncrement() == 0) {
246                 worker.schedule(this);
247             }
248         }
249 
250         // this method executed from drain() only
251         @Override
252         public void call() {
253             // catch exceptions related to file based queue in drainNow()
254             try {
255                 drainNow();
256             } catch (Throwable e) {
257                 child.onError(e);
258             }
259         }
260 
261         private void drainNow() {
262             if (child.isUnsubscribed()) {
263                 // leave drainRequested > 0 to prevent more
264                 // scheduling of drains
265                 return;
266             }
267             // get the number of unsatisfied requests
268             long requests = get();
269 
270             for (;;) {
271                 // reset drainRequested counter
272                 drainRequested.set(1);
273                 long emitted = 0;
274                 while (emitted < requests) {
275                     if (child.isUnsubscribed()) {
276                         // leave drainRequested > 0 to prevent more
277                         // scheduling of drains
278                         return;
279                     }
280                     T item = queue.poll();
281                     if (item == null) {
282                         // queue is empty
283                         if (finished()) {
284                             return;
285                         } else {
286                             // another drain was requested so go
287                             // round again but break out of this
288                             // while loop to the outer loop so we
289                             // can update requests and reset drainRequested
290                             break;
291                         }
292                     } else {
293                         // there was an item on the queue
294                         if (NullSentinel.isNullSentinel(item)) {
295                             child.onNext(null);
296                         } else {
297                             child.onNext(item);
298                         }
299                         emitted++;
300                     }
301                 }
302                 // update requests with emitted value and any new requests
303                 requests = BackpressureUtils.produced(this, emitted);
304                 if (child.isUnsubscribed() || (requests == 0L && finished())) {
305                     return;
306                 }
307             }
308         }
309 
310         private boolean finished() {
311         	//cannot pass queueKnownToBeEmpty flag to this method because 
312         	//to avoid a race condition we must do an actual check on queue.isEmpty()
313         	//after finding done is true
314             if (done) {
315                 Throwable t = error;
316                 if (queue.isEmpty()) {
317                     // first close the queue (which in this case though
318                     // empty also disposes of its resources)
319                     queue.unsubscribe();
320 
321                     if (t != null) {
322                         child.onError(t);
323                     } else {
324                         child.onCompleted();
325                     }
326                     // leave drainRequested > 0 so that further drain
327                     // requests are ignored
328                     return true;
329                 } else if (t != null && !delayError) {
330                     // queue is not empty but we are going to shortcut
331                     // that because delayError is false
332 
333                     // first close the queue (which in this case also
334                     // disposes of its resources)
335                     queue.unsubscribe();
336 
337                     // now report the error
338                     child.onError(t);
339 
340                     // leave drainRequested > 0 so that further drain
341                     // requests are ignored
342                     return true;
343                 } else {
344                     // otherwise we need to wait for all items waiting
345                     // on the queue to be requested and delivered
346                     // (delayError=true)
347                     return drainRequested.compareAndSet(1, 0);
348                 }
349             } else {
350                 return drainRequested.compareAndSet(1, 0);
351             }
352         }
353     }
354 }