1 package com.github.davidmoten.rx2.internal.flowable.buffertofile;
2
3 import java.util.concurrent.atomic.AtomicInteger;
4 import java.util.concurrent.atomic.AtomicLong;
5
6 import org.reactivestreams.Subscriber;
7 import org.reactivestreams.Subscription;
8
9 import com.github.davidmoten.guavamini.Preconditions;
10 import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
11 import com.github.davidmoten.rx2.buffertofile.Options;
12 import com.github.davidmoten.rx2.buffertofile.Serializer;
13
14 import io.reactivex.Flowable;
15 import io.reactivex.FlowableSubscriber;
16 import io.reactivex.Observable;
17 import io.reactivex.Observer;
18 import io.reactivex.Scheduler.Worker;
19 import io.reactivex.disposables.Disposable;
20 import io.reactivex.exceptions.Exceptions;
21 import io.reactivex.internal.functions.ObjectHelper;
22 import io.reactivex.internal.subscriptions.SubscriptionHelper;
23 import io.reactivex.internal.util.BackpressureHelper;
24 import io.reactivex.plugins.RxJavaPlugins;
25
26 public final class FlowableOnBackpressureBufferToFile<T> extends Flowable<T> {
27
28 private final Flowable<T> source;
29 private final Observable<T> source2;
30 private final Options options;
31 private final Serializer<T> serializer;
32
33 public FlowableOnBackpressureBufferToFile(Flowable<T> source, Observable<T> source2,
34 Options options, Serializer<T> serializer) {
35
36 Preconditions.checkArgument((source != null) ^ (source2 != null));
37 this.source = source;
38 this.source2 = source2;
39 this.options = options;
40 this.serializer = serializer;
41 }
42
43 @Override
44 protected void subscribeActual(Subscriber<? super T> child) {
45 PagedQueue queue = new PagedQueue(options.fileFactory(), options.pageSizeBytes());
46 Worker worker = options.scheduler().createWorker();
47 if (source != null) {
48 source.subscribe(
49 new BufferToFileSubscriberFlowable<T>(child, queue, serializer, worker));
50 } else {
51 source2.subscribe(
52 new BufferToFileSubscriberObservable<T>(child, queue, serializer, worker));
53 }
54 }
55
56 @SuppressWarnings("serial")
57 @VisibleForTesting
58 public static final class BufferToFileSubscriberFlowable<T> extends BufferToFileSubscriber<T>
59 implements FlowableSubscriber<T>, Subscription {
60
61 private Subscription parent;
62
63 @VisibleForTesting
64 public BufferToFileSubscriberFlowable(Subscriber<? super T> child, PagedQueue queue,
65 Serializer<T> serializer, Worker worker) {
66 super(child, queue, serializer, worker);
67 }
68
69 @Override
70 public void onSubscribe(Subscription parent) {
71 if (SubscriptionHelper.validate(this.parent, parent)) {
72 this.parent = parent;
73 child.onSubscribe(this);
74 }
75 }
76
77 @Override
78 public void request(long n) {
79 if (SubscriptionHelper.validate(n)) {
80 BackpressureHelper.add(requested, n);
81 parent.request(n);
82 scheduleDrain();
83 }
84 }
85
86 @Override
87 public void cancel() {
88 cancelled = true;
89 parent.cancel();
90
91
92 scheduleDrain();
93 }
94
95 @Override
96 public void onNext(T t) {
97 super.onNext(t);
98 }
99
100 @Override
101 public void onError(Throwable e) {
102 super.onError(e);
103 }
104
105 @Override
106 public void onComplete() {
107 super.onComplete();
108 }
109
110 @Override
111 public void cancelUpstream() {
112 parent.cancel();
113 }
114 }
115
116 @SuppressWarnings("serial")
117 private static final class BufferToFileSubscriberObservable<T> extends BufferToFileSubscriber<T>
118 implements Observer<T>, Subscription {
119
120 private Disposable parent;
121
122 BufferToFileSubscriberObservable(Subscriber<? super T> child, PagedQueue queue,
123 Serializer<T> serializer, Worker worker) {
124 super(child, queue, serializer, worker);
125 }
126
127 @Override
128 public void onSubscribe(Disposable d) {
129 this.parent = d;
130 child.onSubscribe(this);
131 }
132
133 @Override
134 public void onNext(T t) {
135 super.onNext(t);
136 }
137
138 @Override
139 public void onError(Throwable e) {
140 super.onError(e);
141 }
142
143 @Override
144 public void onComplete() {
145 super.onComplete();
146 }
147
148 @Override
149 public void cancelUpstream() {
150 parent.dispose();
151 }
152
153 @Override
154 public void request(long n) {
155 if (SubscriptionHelper.validate(n)) {
156 BackpressureHelper.add(requested, n);
157 scheduleDrain();
158 }
159 }
160
161 @Override
162 public void cancel() {
163 cancelled = true;
164 parent.dispose();
165
166
167 scheduleDrain();
168 }
169 }
170
171 @SuppressWarnings({ "serial" })
172 @VisibleForTesting
173 static abstract class BufferToFileSubscriber<T> extends AtomicInteger implements Runnable {
174
175 protected final Subscriber<? super T> child;
176 private final PagedQueue queue;
177 private final Serializer<T> serializer;
178 private final Worker worker;
179 protected final AtomicLong requested = new AtomicLong();
180
181 protected volatile boolean cancelled;
182 private volatile boolean done;
183
184
185
186 private Throwable error;
187
188 BufferToFileSubscriber(Subscriber<? super T> child, PagedQueue queue,
189 Serializer<T> serializer, Worker worker) {
190 this.child = child;
191 this.queue = queue;
192 this.serializer = serializer;
193 this.worker = worker;
194 }
195
196 public void onNext(T t) {
197 try {
198 queue.offer(serializer.serialize(t));
199 } catch (Throwable e) {
200 Exceptions.throwIfFatal(e);
201 onError(e);
202 return;
203 }
204 scheduleDrain();
205 }
206
207 public void onError(Throwable e) {
208
209
210
211 error = e;
212 done = true;
213 scheduleDrain();
214 }
215
216 public void onComplete() {
217 done = true;
218 scheduleDrain();
219 }
220
221 protected void scheduleDrain() {
222
223
224
225
226 if (getAndIncrement() == 0) {
227 worker.schedule(this);
228 }
229 }
230
231 @Override
232 public void run() {
233 drain();
234 }
235
236 private void drain() {
237
238
239
240
241 if (cancelled) {
242 close(queue);
243 worker.dispose();
244 return;
245 }
246 int missed = 1;
247 while (true) {
248 long r = requested.get();
249 long e = 0;
250 while (e != r) {
251 if (cancelled) {
252 close(queue);
253 worker.dispose();
254 return;
255 }
256
257
258 boolean isDone = done;
259
260
261
262 if (isDone && error != null) {
263 cancelNow();
264 child.onError(error);
265 return;
266 }
267 byte[] bytes;
268 try {
269 bytes = queue.poll();
270 } catch (Throwable err) {
271 Exceptions.throwIfFatal(err);
272 cancelNow();
273 child.onError(err);
274 return;
275 }
276 if (bytes != null) {
277
278
279 T t;
280 try {
281 t = ObjectHelper.requireNonNull(
282 serializer.deserialize(bytes),
283 "Serializer.deserialize should not return null (because RxJava 2 does not support streams with null items");
284 } catch (Throwable err) {
285 Exceptions.throwIfFatal(err);
286 cancelNow();
287 child.onError(err);
288 return;
289 }
290 child.onNext(t);
291 e++;
292 } else if (isDone) {
293 cancelNow();
294 child.onComplete();
295 return;
296 } else {
297 break;
298 }
299 }
300 if (e != 0L && r != Long.MAX_VALUE) {
301 requested.addAndGet(-e);
302 }
303 missed = addAndGet(-missed);
304 if (missed == 0) {
305 return;
306 }
307 }
308 }
309
310 private void cancelNow() {
311 cancelled = true;
312 cancelUpstream();
313 close(queue);
314 worker.dispose();
315 }
316
317 abstract public void cancelUpstream();
318
319 }
320
321 @VisibleForTesting
322 public static void close(PagedQueue queue) {
323 try {
324 queue.close();
325 } catch (Throwable err) {
326 Exceptions.throwIfFatal(err);
327 RxJavaPlugins.onError(err);
328 }
329 }
330
331 }