View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable.buffertofile;
2   
3   import java.util.concurrent.atomic.AtomicInteger;
4   import java.util.concurrent.atomic.AtomicLong;
5   
6   import org.reactivestreams.Subscriber;
7   import org.reactivestreams.Subscription;
8   
9   import com.github.davidmoten.guavamini.Preconditions;
10  import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
11  import com.github.davidmoten.rx2.buffertofile.Options;
12  import com.github.davidmoten.rx2.buffertofile.Serializer;
13  
14  import io.reactivex.Flowable;
15  import io.reactivex.FlowableSubscriber;
16  import io.reactivex.Observable;
17  import io.reactivex.Observer;
18  import io.reactivex.Scheduler.Worker;
19  import io.reactivex.disposables.Disposable;
20  import io.reactivex.exceptions.Exceptions;
21  import io.reactivex.internal.functions.ObjectHelper;
22  import io.reactivex.internal.subscriptions.SubscriptionHelper;
23  import io.reactivex.internal.util.BackpressureHelper;
24  import io.reactivex.plugins.RxJavaPlugins;
25  
26  public final class FlowableOnBackpressureBufferToFile<T> extends Flowable<T> {
27  
28      private final Flowable<T> source;
29      private final Observable<T> source2;
30      private final Options options;
31      private final Serializer<T> serializer;
32  
33      public FlowableOnBackpressureBufferToFile(Flowable<T> source, Observable<T> source2,
34              Options options, Serializer<T> serializer) {
35          // only one source should be defined
36          Preconditions.checkArgument((source != null) ^ (source2 != null));
37          this.source = source;
38          this.source2 = source2;
39          this.options = options;
40          this.serializer = serializer;
41      }
42  
43      @Override
44      protected void subscribeActual(Subscriber<? super T> child) {
45          PagedQueue queue = new PagedQueue(options.fileFactory(), options.pageSizeBytes());
46          Worker worker = options.scheduler().createWorker();
47          if (source != null) {
48              source.subscribe(
49                      new BufferToFileSubscriberFlowable<T>(child, queue, serializer, worker));
50          } else {
51              source2.subscribe(
52                      new BufferToFileSubscriberObservable<T>(child, queue, serializer, worker));
53          }
54      }
55  
56      @SuppressWarnings("serial")
57      @VisibleForTesting
58      public static final class BufferToFileSubscriberFlowable<T> extends BufferToFileSubscriber<T>
59              implements FlowableSubscriber<T>, Subscription {
60  
61          private Subscription parent;
62  
63          @VisibleForTesting
64          public BufferToFileSubscriberFlowable(Subscriber<? super T> child, PagedQueue queue,
65                  Serializer<T> serializer, Worker worker) {
66              super(child, queue, serializer, worker);
67          }
68  
69          @Override
70          public void onSubscribe(Subscription parent) {
71              if (SubscriptionHelper.validate(this.parent, parent)) {
72                  this.parent = parent;
73                  child.onSubscribe(this);
74              }
75          }
76  
77          @Override
78          public void request(long n) {
79              if (SubscriptionHelper.validate(n)) {
80                  BackpressureHelper.add(requested, n);
81                  parent.request(n);
82                  scheduleDrain();
83              }
84          }
85  
86          @Override
87          public void cancel() {
88              cancelled = true;
89              parent.cancel();
90              // ensure queue is closed from the worker thread
91              // to simplify concurrency controls in PagedQueue
92              scheduleDrain();
93          }
94  
95          @Override
96          public void onNext(T t) {
97              super.onNext(t);
98          }
99  
100         @Override
101         public void onError(Throwable e) {
102             super.onError(e);
103         }
104 
105         @Override
106         public void onComplete() {
107             super.onComplete();
108         }
109 
110         @Override
111         public void cancelUpstream() {
112             parent.cancel();
113         }
114     }
115 
116     @SuppressWarnings("serial")
117     private static final class BufferToFileSubscriberObservable<T> extends BufferToFileSubscriber<T>
118             implements Observer<T>, Subscription {
119 
120         private Disposable parent;
121 
122         BufferToFileSubscriberObservable(Subscriber<? super T> child, PagedQueue queue,
123                 Serializer<T> serializer, Worker worker) {
124             super(child, queue, serializer, worker);
125         }
126 
127         @Override
128         public void onSubscribe(Disposable d) {
129             this.parent = d;
130             child.onSubscribe(this);
131         }
132 
133         @Override
134         public void onNext(T t) {
135             super.onNext(t);
136         }
137 
138         @Override
139         public void onError(Throwable e) {
140             super.onError(e);
141         }
142 
143         @Override
144         public void onComplete() {
145             super.onComplete();
146         }
147 
148         @Override
149         public void cancelUpstream() {
150             parent.dispose();
151         }
152 
153         @Override
154         public void request(long n) {
155             if (SubscriptionHelper.validate(n)) {
156                 BackpressureHelper.add(requested, n);
157                 scheduleDrain();
158             }
159         }
160 
161         @Override
162         public void cancel() {
163             cancelled = true;
164             parent.dispose();
165             // ensure queue is closed from the worker thread
166             // to simplify concurrency controls in PagedQueue
167             scheduleDrain();
168         }
169     }
170 
171     @SuppressWarnings({ "serial" })
172     @VisibleForTesting
173     static abstract class BufferToFileSubscriber<T> extends AtomicInteger implements Runnable {
174 
175         protected final Subscriber<? super T> child;
176         private final PagedQueue queue;
177         private final Serializer<T> serializer;
178         private final Worker worker;
179         protected final AtomicLong requested = new AtomicLong();
180 
181         protected volatile boolean cancelled;
182         private volatile boolean done;
183 
184         // `error` set just before the volatile `done` is set and read just
185         // after `done` is read. Thus doesn't need to be volatile.
186         private Throwable error;
187 
188         BufferToFileSubscriber(Subscriber<? super T> child, PagedQueue queue,
189                 Serializer<T> serializer, Worker worker) {
190             this.child = child;
191             this.queue = queue;
192             this.serializer = serializer;
193             this.worker = worker;
194         }
195 
196         public void onNext(T t) {
197             try {
198                 queue.offer(serializer.serialize(t));
199             } catch (Throwable e) {
200                 Exceptions.throwIfFatal(e);
201                 onError(e);
202                 return;
203             }
204             scheduleDrain();
205         }
206 
207         public void onError(Throwable e) {
208             // must assign error before assign done = true to avoid race
209             // condition in drain() and also so appropriate memory barrier in
210             // place given error is non-volatile
211             error = e;
212             done = true;
213             scheduleDrain();
214         }
215 
216         public void onComplete() {
217             done = true;
218             scheduleDrain();
219         }
220 
221         protected void scheduleDrain() {
222             // only schedule a drain if current drain has finished
223             // otherwise the drain requested counter (`this`) will be
224             // incremented and the drain loop will ensure that another drain
225             // cycle occurs if required
226             if (getAndIncrement() == 0) {
227                 worker.schedule(this);
228             }
229         }
230 
231         @Override
232         public void run() {
233             drain();
234         }
235 
236         private void drain() {
237             // check cancel outside of request drain loop because the drain
238             // method is also used to serialize read with cancellation (closing
239             // the queue) and we still want it to happen if there are no
240             // requests
241             if (cancelled) {
242                 close(queue);
243                 worker.dispose();
244                 return;
245             }
246             int missed = 1;
247             while (true) {
248                 long r = requested.get();
249                 long e = 0; // emitted
250                 while (e != r) {
251                     if (cancelled) {
252                         close(queue);
253                         worker.dispose();
254                         return;
255                     }
256                     // for visibility purposes must read error AFTER reading
257                     // done (done is volatile and error is non-volatile)
258                     boolean isDone = done;
259                     // must check isDone and error because don't want to emit an
260                     // error that is only partially visible to the current
261                     // thread
262                     if (isDone && error != null) {
263                         cancelNow();
264                         child.onError(error);
265                         return;
266                     }
267                     byte[] bytes;
268                     try {
269                         bytes = queue.poll();
270                     } catch (Throwable err) {
271                         Exceptions.throwIfFatal(err);
272                         cancelNow();
273                         child.onError(err);
274                         return;
275                     }
276                     if (bytes != null) {
277                         // assumed to be fast so we don't check cancelled
278                         // after this call
279                         T t;
280                         try {
281                             t = ObjectHelper.requireNonNull( //
282                                     serializer.deserialize(bytes),
283                                     "Serializer.deserialize should not return null (because RxJava 2 does not support streams with null items");
284                         } catch (Throwable err) {
285                             Exceptions.throwIfFatal(err);
286                             cancelNow();
287                             child.onError(err);
288                             return;
289                         }
290                         child.onNext(t);
291                         e++;
292                     } else if (isDone) {
293                         cancelNow();
294                         child.onComplete();
295                         return;
296                     } else {
297                         break;
298                     }
299                 }
300                 if (e != 0L && r != Long.MAX_VALUE) {
301                     requested.addAndGet(-e);
302                 }
303                 missed = addAndGet(-missed);
304                 if (missed == 0) {
305                     return;
306                 }
307             }
308         }
309 
310         private void cancelNow() {
311             cancelled = true;
312             cancelUpstream();
313             close(queue);
314             worker.dispose();
315         }
316 
317         abstract public void cancelUpstream();
318 
319     }
320 
321     @VisibleForTesting
322     public static void close(PagedQueue queue) {
323         try {
324             queue.close();
325         } catch (Throwable err) {
326             Exceptions.throwIfFatal(err);
327             RxJavaPlugins.onError(err);
328         }
329     }
330 
331 }