1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.util.concurrent.Callable;
4 import java.util.concurrent.atomic.AtomicLong;
5
6 import io.reactivex.BackpressureStrategy;
7 import io.reactivex.Flowable;
8 import io.reactivex.exceptions.Exceptions;
9 import io.reactivex.functions.Action;
10 import io.reactivex.functions.BiFunction;
11 import io.reactivex.functions.Consumer;
12 import io.reactivex.functions.LongConsumer;
13 import io.reactivex.internal.subscriptions.SubscriptionHelper;
14 import io.reactivex.subjects.ReplaySubject;
15 import io.reactivex.subjects.Subject;
16
17 public final class FlowableFetchPagesByRequest {
18
19 private FlowableFetchPagesByRequest() {
20
21 }
22
23 public static <T> Flowable<T> create(final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch,
24 final long start, final int maxConcurrency) {
25 return Flowable.defer(new Callable<Flowable<T>>() {
26 @Override
27 public Flowable<T> call() throws Exception {
28
29
30
31 final ReplaySubject<Flowable<T>> subject = ReplaySubject.create();
32 final AtomicLong position = new AtomicLong(start);
33 LongConsumer request = new LongConsumer() {
34 @Override
35 public void accept(final long n) throws Exception {
36 final long pos = position.getAndAdd(n);
37 if (SubscriptionHelper.validate(n)) {
38 Flowable<T> flowable;
39 try {
40 flowable = fetch.apply(pos, n);
41 } catch (Throwable e) {
42 Exceptions.throwIfFatal(e);
43 subject.onError(e);
44 return;
45 }
46
47
48
49 final Count count = new Count(subject, n);
50 flowable = flowable
51 .doOnNext(count)
52 .doOnComplete(count);
53 subject.onNext(flowable);
54 }
55 }
56 };
57 return Flowable
58 .concatEager(subject.serialize()
59 .toFlowable(BackpressureStrategy.BUFFER), maxConcurrency, 128)
60 .doOnRequest(request);
61 }
62 });
63 }
64
65 private static final class Count implements Consumer<Object>, Action {
66 private final Subject<?> subject;
67 private final long n;
68
69
70 private long count;
71
72 Count(Subject<?> subject, long n) {
73 this.subject = subject;
74 this.n = n;
75 }
76
77 @Override
78 public void accept(Object t) throws Exception {
79 count++;
80 }
81
82 @Override
83 public void run() throws Exception {
84 if (count < n) {
85 subject.onComplete();
86 }
87 }
88 }
89
90 }