1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.util.concurrent.atomic.AtomicInteger;
4 import java.util.concurrent.atomic.AtomicLong;
5
6 import org.reactivestreams.Subscriber;
7 import org.reactivestreams.Subscription;
8
9 import com.github.davidmoten.guavamini.Preconditions;
10
11 import io.reactivex.Flowable;
12 import io.reactivex.FlowableSubscriber;
13 import io.reactivex.internal.subscriptions.SubscriptionHelper;
14 import io.reactivex.internal.util.BackpressureHelper;
15
16 public final class FlowableMaxRequest<T> extends Flowable<T> {
17
18 private final Flowable<T> source;
19 private final long[] maxRequests;
20
21 public FlowableMaxRequest(Flowable<T> source, long[] maxRequests) {
22 Preconditions.checkArgument(maxRequests.length > 0, "maxRequests length must be greater than 0");
23 for (int i = 0; i < maxRequests.length; i++) {
24 Preconditions.checkArgument(maxRequests[i] > 0, "maxRequests items must be greater than zero");
25 }
26 this.source = source;
27 this.maxRequests = maxRequests;
28 }
29
30 @Override
31 protected void subscribeActual(Subscriber<? super T> child) {
32 source.subscribe(new MaxRequestSubscriber<T>(maxRequests, child));
33 }
34
35 @SuppressWarnings("serial")
36 private static final class MaxRequestSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
37
38 private final long[] maxRequests;
39 private int requestNum;
40 private final Subscriber<? super T> child;
41
42
43
44 private final AtomicLong requested = new AtomicLong();
45
46
47
48 private Subscription parent;
49
50
51
52 private long count;
53
54
55
56 private volatile long nextRequest;
57
58
59
60 private volatile boolean allArrived = true;
61
62 MaxRequestSubscriber(long[] maxRequests, Subscriber<? super T> child) {
63 this.maxRequests = maxRequests;
64 this.child = child;
65 }
66
67 @Override
68 public void onSubscribe(Subscription parent) {
69 if (SubscriptionHelper.validate(this.parent, parent)) {
70 this.parent = parent;
71 child.onSubscribe(this);
72 }
73 }
74
75 @Override
76 public void request(long n) {
77 if (SubscriptionHelper.validate(n)) {
78 BackpressureHelper.add(requested, n);
79 requestMore();
80 }
81 }
82
83 @Override
84 public void cancel() {
85 parent.cancel();
86 }
87
88 @Override
89 public void onNext(T t) {
90 if (count != Long.MAX_VALUE) {
91 count--;
92 if (count == -1) {
93
94
95 long nr = nextRequest;
96 if (nr == Long.MAX_VALUE) {
97 count = nr;
98 } else {
99 count = nr - 1;
100 }
101 }
102 if (count == 0) {
103
104
105
106
107 long mr = peekNextMaxRequest();
108 while (true) {
109 long r = requested.get();
110 if (r == 0) {
111
112
113 allArrived = true;
114 requestMore();
115 break;
116 } else if (r == Long.MAX_VALUE) {
117 nextMaxRequest();
118 count = mr;
119 parent.request(mr);
120 break;
121 } else {
122 long req = Math.min(r, mr);
123 if (requested.compareAndSet(r, r - req)) {
124 nextMaxRequest();
125 count = req;
126 parent.request(req);
127 break;
128 }
129 }
130 }
131 }
132 }
133 child.onNext(t);
134 }
135
136 @Override
137 public void onError(Throwable t) {
138 child.onError(t);
139
140 }
141
142 @Override
143 public void onComplete() {
144 child.onComplete();
145 }
146
147 private void requestMore() {
148 if (getAndIncrement() == 0) {
149 int missed = 1;
150 while (true) {
151 if (allArrived) {
152
153 long mr = peekNextMaxRequest();
154 while (true) {
155 long r = requested.get();
156 long req = Math.min(r, mr);
157 if (r == 0) {
158 break;
159 } else if (r == Long.MAX_VALUE || requested.compareAndSet(r, r - req)) {
160 nextMaxRequest();
161 allArrived = false;
162 nextRequest = req;
163 parent.request(req);
164 break;
165 }
166 }
167 }
168 missed = addAndGet(-missed);
169 if (missed == 0) {
170 return;
171 }
172 }
173 }
174 }
175
176 private long peekNextMaxRequest() {
177 return maxRequests[requestNum];
178 }
179
180 private void nextMaxRequest() {
181 if (requestNum != maxRequests.length - 1) {
182 requestNum++;
183 }
184 }
185
186 }
187
188 }