View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.util.concurrent.atomic.AtomicInteger;
4   import java.util.concurrent.atomic.AtomicLong;
5   
6   import org.reactivestreams.Subscriber;
7   import org.reactivestreams.Subscription;
8   
9   import com.github.davidmoten.guavamini.Preconditions;
10  
11  import io.reactivex.Flowable;
12  import io.reactivex.FlowableSubscriber;
13  import io.reactivex.internal.fuseable.SimplePlainQueue;
14  import io.reactivex.internal.queue.SpscLinkedArrayQueue;
15  import io.reactivex.internal.subscriptions.SubscriptionHelper;
16  import io.reactivex.internal.util.BackpressureHelper;
17  
18  public final class FlowableMinRequest<T> extends Flowable<T> {
19  
20      private final Flowable<T> source;
21      private final int[] minRequest;
22  
23      public FlowableMinRequest(Flowable<T> source, int[] minRequests) {
24          Preconditions.checkArgument(minRequests.length > 0, "minRequests length must be > 0");
25          for (int i = 0; i < minRequests.length; i++) {
26              Preconditions.checkArgument(minRequests[i] > 0, "each item in minRequests must be > 0");
27          }
28          this.source = source;
29          this.minRequest = minRequests;
30      }
31  
32      @Override
33      protected void subscribeActual(Subscriber<? super T> child) {
34          source.subscribe(new MinRequestSubscriber<T>(minRequest, child));
35      }
36  
37      @SuppressWarnings("serial")
38      private static final class MinRequestSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
39  
40          private final int[] minRequests;
41          private int requestNum;
42          private final Subscriber<? super T> child;
43          private final AtomicLong requested = new AtomicLong();
44          private final SimplePlainQueue<T> queue = new SpscLinkedArrayQueue<T>(16);
45  
46          private Subscription parent;
47          private volatile boolean done;
48          private Throwable error;
49          private volatile boolean cancelled;
50          private long count;
51  
52          MinRequestSubscriber(int[] minRequests, Subscriber<? super T> child) {
53              this.minRequests = minRequests;
54              this.child = child;
55          }
56  
57          @Override
58          public void onSubscribe(Subscription parent) {
59              if (SubscriptionHelper.validate(this.parent, parent)) {
60                  this.parent = parent;
61                  child.onSubscribe(this);
62              }
63          }
64  
65          @Override
66          public void request(long n) {
67              if (SubscriptionHelper.validate(n)) {
68                  BackpressureHelper.add(requested, n);
69                  drain();
70              }
71          }
72  
73          @Override
74          public void cancel() {
75              cancelled = true;
76              parent.cancel();
77          }
78  
79          @Override
80          public void onNext(T t) {
81              queue.offer(t);
82              drain();
83          }
84  
85          @Override
86          public void onError(Throwable e) {
87              error = e;
88              done = true;
89              drain();
90          }
91  
92          @Override
93          public void onComplete() {
94              done = true;
95              drain();
96          }
97  
98          private void drain() {
99              if (getAndIncrement() == 0) {
100                 int missed = 1;
101                 while (true) {
102                     long r = requested.get();
103                     long e = 0;
104                     boolean d = done;
105                     while (e != r) {
106                         if (cancelled) {
107                             queue.clear();
108                             return;
109                         }
110                         T t = queue.poll();
111                         if (t == null) {
112                             if (d) {
113                                 terminate();
114                                 return;
115                             } else {
116                                 break;
117                             }
118                         } else {
119                             child.onNext(t);
120                             e++;
121                             if (count != Long.MAX_VALUE) {
122                                 count--;
123                             }
124                         }
125                         d = done;
126                     }
127                     if (d && queue.isEmpty()) {
128                         terminate();
129                         return;
130                     }
131                     if (e != 0 && r != Long.MAX_VALUE) {
132                         r = requested.addAndGet(-e);
133                     }
134                     if (r != 0 && count == 0) {
135                         // requests from parent have arrived so request some
136                         // more
137                         int min = minRequests[requestNum];
138                         if (requestNum != minRequests.length - 1) {
139                             requestNum++;
140                         }
141                         count = Math.max(r, min);
142                         parent.request(count);
143                     }
144                     missed = addAndGet(-missed);
145                     if (missed == 0) {
146                         return;
147                     }
148                 }
149             }
150         }
151 
152         private void terminate() {
153             parent.cancel();
154             Throwable err = error;
155             if (err != null) {
156                 error = null;
157                 child.onError(err);
158             } else {
159                 child.onComplete();
160             }
161         }
162     }
163 
164 }