1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.util.concurrent.atomic.AtomicInteger;
4 import java.util.concurrent.atomic.AtomicLong;
5
6 import org.reactivestreams.Subscriber;
7 import org.reactivestreams.Subscription;
8
9 import com.github.davidmoten.guavamini.Preconditions;
10
11 import io.reactivex.Flowable;
12 import io.reactivex.FlowableSubscriber;
13 import io.reactivex.internal.fuseable.SimplePlainQueue;
14 import io.reactivex.internal.queue.SpscLinkedArrayQueue;
15 import io.reactivex.internal.subscriptions.SubscriptionHelper;
16 import io.reactivex.internal.util.BackpressureHelper;
17
18 public final class FlowableMinRequest<T> extends Flowable<T> {
19
20 private final Flowable<T> source;
21 private final int[] minRequest;
22
23 public FlowableMinRequest(Flowable<T> source, int[] minRequests) {
24 Preconditions.checkArgument(minRequests.length > 0, "minRequests length must be > 0");
25 for (int i = 0; i < minRequests.length; i++) {
26 Preconditions.checkArgument(minRequests[i] > 0, "each item in minRequests must be > 0");
27 }
28 this.source = source;
29 this.minRequest = minRequests;
30 }
31
32 @Override
33 protected void subscribeActual(Subscriber<? super T> child) {
34 source.subscribe(new MinRequestSubscriber<T>(minRequest, child));
35 }
36
37 @SuppressWarnings("serial")
38 private static final class MinRequestSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
39
40 private final int[] minRequests;
41 private int requestNum;
42 private final Subscriber<? super T> child;
43 private final AtomicLong requested = new AtomicLong();
44 private final SimplePlainQueue<T> queue = new SpscLinkedArrayQueue<T>(16);
45
46 private Subscription parent;
47 private volatile boolean done;
48 private Throwable error;
49 private volatile boolean cancelled;
50 private long count;
51
52 MinRequestSubscriber(int[] minRequests, Subscriber<? super T> child) {
53 this.minRequests = minRequests;
54 this.child = child;
55 }
56
57 @Override
58 public void onSubscribe(Subscription parent) {
59 if (SubscriptionHelper.validate(this.parent, parent)) {
60 this.parent = parent;
61 child.onSubscribe(this);
62 }
63 }
64
65 @Override
66 public void request(long n) {
67 if (SubscriptionHelper.validate(n)) {
68 BackpressureHelper.add(requested, n);
69 drain();
70 }
71 }
72
73 @Override
74 public void cancel() {
75 cancelled = true;
76 parent.cancel();
77 }
78
79 @Override
80 public void onNext(T t) {
81 queue.offer(t);
82 drain();
83 }
84
85 @Override
86 public void onError(Throwable e) {
87 error = e;
88 done = true;
89 drain();
90 }
91
92 @Override
93 public void onComplete() {
94 done = true;
95 drain();
96 }
97
98 private void drain() {
99 if (getAndIncrement() == 0) {
100 int missed = 1;
101 while (true) {
102 long r = requested.get();
103 long e = 0;
104 boolean d = done;
105 while (e != r) {
106 if (cancelled) {
107 queue.clear();
108 return;
109 }
110 T t = queue.poll();
111 if (t == null) {
112 if (d) {
113 terminate();
114 return;
115 } else {
116 break;
117 }
118 } else {
119 child.onNext(t);
120 e++;
121 if (count != Long.MAX_VALUE) {
122 count--;
123 }
124 }
125 d = done;
126 }
127 if (d && queue.isEmpty()) {
128 terminate();
129 return;
130 }
131 if (e != 0 && r != Long.MAX_VALUE) {
132 r = requested.addAndGet(-e);
133 }
134 if (r != 0 && count == 0) {
135
136
137 int min = minRequests[requestNum];
138 if (requestNum != minRequests.length - 1) {
139 requestNum++;
140 }
141 count = Math.max(r, min);
142 parent.request(count);
143 }
144 missed = addAndGet(-missed);
145 if (missed == 0) {
146 return;
147 }
148 }
149 }
150 }
151
152 private void terminate() {
153 parent.cancel();
154 Throwable err = error;
155 if (err != null) {
156 error = null;
157 child.onError(err);
158 } else {
159 child.onComplete();
160 }
161 }
162 }
163
164 }