View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.util.concurrent.Callable;
4   import java.util.concurrent.atomic.AtomicLong;
5   
6   import io.reactivex.BackpressureStrategy;
7   import io.reactivex.Flowable;
8   import io.reactivex.exceptions.Exceptions;
9   import io.reactivex.functions.Action;
10  import io.reactivex.functions.BiFunction;
11  import io.reactivex.functions.Consumer;
12  import io.reactivex.functions.LongConsumer;
13  import io.reactivex.internal.subscriptions.SubscriptionHelper;
14  import io.reactivex.subjects.ReplaySubject;
15  import io.reactivex.subjects.Subject;
16  
17  public final class FlowableFetchPagesByRequest {
18  
19      private FlowableFetchPagesByRequest() {
20          // prevent instantiation
21      }
22  
23      public static <T> Flowable<T> create(final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch,
24              final long start, final int maxConcurrency) {
25          return Flowable.defer(new Callable<Flowable<T>>() {
26              @Override
27              public Flowable<T> call() throws Exception {
28                  // need a ReplaySubject because multiple requests can come
29                  // through before concatEager has established subscriptions to
30                  // the subject
31                  final ReplaySubject<Flowable<T>> subject = ReplaySubject.create();
32                  final AtomicLong position = new AtomicLong(start);
33                  LongConsumer request = new LongConsumer() {
34                      @Override
35                      public void accept(final long n) throws Exception {
36                          final long pos = position.getAndAdd(n);
37                          if (SubscriptionHelper.validate(n)) {
38                              Flowable<T> flowable;
39                              try {
40                                  flowable = fetch.apply(pos, n);
41                              } catch (Throwable e) {
42                                  Exceptions.throwIfFatal(e);
43                                  subject.onError(e);
44                                  return;
45                              }
46                              // reduce allocations by incorporating the onNext
47                              // and onComplete actions into the mutable count
48                              // object
49                              final Count count = new Count(subject, n);
50                              flowable = flowable //
51                                      .doOnNext(count) //
52                                      .doOnComplete(count);
53                              subject.onNext(flowable);
54                          }
55                      }
56                  };
57                  return Flowable //
58                          .concatEager(subject.serialize() //
59                                  .toFlowable(BackpressureStrategy.BUFFER), maxConcurrency, 128) //
60                          .doOnRequest(request);
61              }
62          });
63      }
64  
65      private static final class Count implements Consumer<Object>, Action {
66          private final Subject<?> subject;
67          private final long n;
68  
69          // mutable
70          private long count;
71  
72          Count(Subject<?> subject, long n) {
73              this.subject = subject;
74              this.n = n;
75          }
76  
77          @Override
78          public void accept(Object t) throws Exception {
79              count++;
80          }
81  
82          @Override
83          public void run() throws Exception {
84              if (count < n) {
85                  subject.onComplete();
86              }
87          }
88      }
89  
90  }