View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.util.concurrent.atomic.AtomicInteger;
4   import java.util.concurrent.atomic.AtomicLong;
5   
6   import org.reactivestreams.Subscriber;
7   import org.reactivestreams.Subscription;
8   
9   import com.github.davidmoten.guavamini.Preconditions;
10  
11  import io.reactivex.Flowable;
12  import io.reactivex.FlowableSubscriber;
13  import io.reactivex.internal.subscriptions.SubscriptionHelper;
14  import io.reactivex.internal.util.BackpressureHelper;
15  
16  public final class FlowableMaxRequest<T> extends Flowable<T> {
17  
18      private final Flowable<T> source;
19      private final long[] maxRequests;
20  
21      public FlowableMaxRequest(Flowable<T> source, long[] maxRequests) {
22          Preconditions.checkArgument(maxRequests.length > 0, "maxRequests length must be greater than 0");
23          for (int i = 0; i < maxRequests.length; i++) {
24              Preconditions.checkArgument(maxRequests[i] > 0, "maxRequests items must be greater than zero");
25          }
26          this.source = source;
27          this.maxRequests = maxRequests;
28      }
29  
30      @Override
31      protected void subscribeActual(Subscriber<? super T> child) {
32          source.subscribe(new MaxRequestSubscriber<T>(maxRequests, child));
33      }
34  
35      @SuppressWarnings("serial")
36      private static final class MaxRequestSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
37  
38          private final long[] maxRequests;
39          private int requestNum;
40          private final Subscriber<? super T> child;
41  
42          // the number of requests from downstream that have not been requested
43          // from upstream yet
44          private final AtomicLong requested = new AtomicLong();
45  
46          // the upstream subscription (which allows us to request from upstream
47          // and cancel it)
48          private Subscription parent;
49  
50          // the number of items still to be emitted from
51          // upstream out of the last request to parent
52          private long count;
53  
54          // when request made from `requestMore` this value is used to set the
55          // next value of `count` in the `onNext` method
56          private volatile long nextRequest;
57  
58          // indicates to the `requestMore` method that all items from the last
59          // request to parent have arrived
60          private volatile boolean allArrived = true;
61  
62          MaxRequestSubscriber(long[] maxRequests, Subscriber<? super T> child) {
63              this.maxRequests = maxRequests;
64              this.child = child;
65          }
66  
67          @Override
68          public void onSubscribe(Subscription parent) {
69              if (SubscriptionHelper.validate(this.parent, parent)) {
70                  this.parent = parent;
71                  child.onSubscribe(this);
72              }
73          }
74  
75          @Override
76          public void request(long n) {
77              if (SubscriptionHelper.validate(n)) {
78                  BackpressureHelper.add(requested, n);
79                  requestMore();
80              }
81          }
82  
83          @Override
84          public void cancel() {
85              parent.cancel();
86          }
87  
88          @Override
89          public void onNext(T t) {
90              if (count != Long.MAX_VALUE) {
91                  count--;
92                  if (count == -1) {
93                      // request didn't happen from this onNext method so refresh
94                      // count from the volatile set in `requestMore`
95                      long nr = nextRequest;
96                      if (nr == Long.MAX_VALUE) {
97                          count = nr;
98                      } else {
99                          count = nr - 1;
100                     }
101                 }
102                 if (count == 0) {
103                     // All items from the last request made to parent have
104                     // arrived
105 
106                     // CAS loop to update `requested`
107                     long mr = peekNextMaxRequest();
108                     while (true) {
109                         long r = requested.get();
110                         if (r == 0) {
111                             // now must rely on dowstream requests to request
112                             // more from upstream via `requestMore`
113                             allArrived = true;
114                             requestMore();
115                             break;
116                         } else if (r == Long.MAX_VALUE) {
117                             nextMaxRequest();
118                             count = mr;
119                             parent.request(mr);
120                             break;
121                         } else {
122                             long req = Math.min(r, mr);
123                             if (requested.compareAndSet(r, r - req)) {
124                                 nextMaxRequest();
125                                 count = req;
126                                 parent.request(req);
127                                 break;
128                             }
129                         }
130                     }
131                 }
132             }
133             child.onNext(t);
134         }
135 
136         @Override
137         public void onError(Throwable t) {
138             child.onError(t);
139 
140         }
141 
142         @Override
143         public void onComplete() {
144             child.onComplete();
145         }
146 
147         private void requestMore() {
148             if (getAndIncrement() == 0) {
149                 int missed = 1;
150                 while (true) {
151                     if (allArrived) {
152                         // CAS loop to update requested
153                         long mr = peekNextMaxRequest();
154                         while (true) {
155                             long r = requested.get();
156                             long req = Math.min(r, mr);
157                             if (r == 0) {
158                                 break;
159                             } else if (r == Long.MAX_VALUE || requested.compareAndSet(r, r - req)) {
160                                 nextMaxRequest();
161                                 allArrived = false;
162                                 nextRequest = req;
163                                 parent.request(req);
164                                 break;
165                             }
166                         }
167                     }
168                     missed = addAndGet(-missed);
169                     if (missed == 0) {
170                         return;
171                     }
172                 }
173             }
174         }
175 
176         private long peekNextMaxRequest() {
177             return maxRequests[requestNum];
178         }
179 
180         private void nextMaxRequest() {
181             if (requestNum != maxRequests.length - 1) {
182                 requestNum++;
183             }
184         }
185 
186     }
187 
188 }