View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.util.concurrent.atomic.AtomicLong;
4   
5   import com.github.davidmoten.rx.Actions;
6   import com.github.davidmoten.rx.Transformers;
7   import com.github.davidmoten.rx.util.BackpressureUtils;
8   
9   import rx.Observable;
10  import rx.Observable.Operator;
11  import rx.Observable.Transformer;
12  import rx.Subscriber;
13  import rx.functions.Action0;
14  import rx.functions.Action1;
15  import rx.functions.Func0;
16  import rx.schedulers.Schedulers;
17  
18  public final class TransformerOnBackpressureBufferRequestLimiting<T> implements Transformer<T, T> {
19  
20      private static final TransformerOnBackpressureBufferRequestLimiting<Object> instance = new TransformerOnBackpressureBufferRequestLimiting<Object>();
21  
22      @SuppressWarnings("unchecked")
23      public static final <T> TransformerOnBackpressureBufferRequestLimiting<T> instance() {
24          return (TransformerOnBackpressureBufferRequestLimiting<T>) instance;
25      }
26  
27      @Override
28      public Observable<T> call(final Observable<T> o) {
29          return Observable.defer(new Func0<Observable<T>>() {
30              @Override
31              public Observable<T> call() {
32                  final OperatorPassThroughAdjustedRequest<T> op = new OperatorPassThroughAdjustedRequest<T>();
33                  return o.lift(op).onBackpressureBuffer().doOnRequest(new Action1<Long>() {
34  
35                      @Override
36                      public void call(Long n) {
37                          op.requestMore(n);
38                      }
39                  });
40              }
41          });
42      }
43  
44      /**
45       * Only used with an immediate downstream operator that requests
46       * {@code Long.MAX_VALUE} and should only be subscribed to once (use it
47       * within a {@code defer} block}.
48       *
49       * @param <T>
50       *            stream item type
51       */
52      private static final class OperatorPassThroughAdjustedRequest<T> implements Operator<T, T> {
53  
54          private volatile ParentSubscriber<T> parent;
55          private final AtomicLong requested = new AtomicLong();
56          private final Object lock = new Object();
57  
58          @Override
59          public Subscriber<? super T> call(Subscriber<? super T> child) {
60              // this method should only be called once for this instance
61              // assume child requests MAX_VALUE
62              ParentSubscriber<T> p = new ParentSubscriber<T>(child);
63              synchronized (lock) {
64                  parent = p;
65              }
66              p.requestMore(requested.get());
67              child.add(p);
68              return p;
69          }
70  
71          public void requestMore(long n) {
72              ParentSubscriber<T> p = parent;
73              if (p != null) {
74                  p.requestMore(n);
75              } else {
76                  synchronized (lock) {
77                      ParentSubscriber<T> par = parent;
78                      if (par == null) {
79                          BackpressureUtils.getAndAddRequest(requested, n);
80                      } else {
81                          par.requestMore(n);
82                      }
83                  }
84              }
85          }
86  
87      }
88  
89      private static final class ParentSubscriber<T> extends Subscriber<T> {
90  
91          private final Subscriber<? super T> child;
92          private final AtomicLong expected = new AtomicLong();
93  
94          public ParentSubscriber(Subscriber<? super T> child) {
95              this.child = child;
96              request(0);
97          }
98  
99          public void requestMore(long n) {
100             if (n <= 0) {
101                 return;
102             }
103             long r = expected.get();
104             if (r == Long.MAX_VALUE) {
105                 return;
106             } else {
107                 long u = r;
108                 while (true) {
109                     long sum = u + n;
110                     final long v;
111                     if (sum < 0) {
112                         v = Long.MAX_VALUE;
113                     } else {
114                         v = sum;
115                     }
116                     if (expected.compareAndSet(u, v)) {
117                         // if v negative (more have arrived than requested)
118                         long diff = Math.max(0, v);
119                         long req = Math.min(n, diff);
120                         if (req > 0) {
121                             request(req);
122                         }
123                         return;
124                     } else {
125                         u = expected.get();
126                     }
127                 }
128             }
129         }
130 
131         @Override
132         public void onCompleted() {
133             child.onCompleted();
134         }
135 
136         @Override
137         public void onError(Throwable e) {
138             child.onError(e);
139         }
140 
141         @Override
142         public void onNext(T t) {
143             expected.decrementAndGet();
144             child.onNext(t);
145         }
146 
147     }
148 
149     public static void main(String[] args) throws InterruptedException {
150         Observable.range(1, 10000) //
151                 .doOnRequest(new Action1<Long>() {
152                     @Override
153                     public void call(Long n) {
154                         System.out.println("requested " + n);
155                     }
156                 }).doOnUnsubscribe(new Action0() {
157                     @Override
158                     public void call() {
159                         System.out.println("unsubscribed");
160                     }
161                 }) //
162                 .compose(Transformers.<Integer> onBackpressureBufferRequestLimiting()) //
163                 .take(10) //
164                 .subscribeOn(Schedulers.io()) //
165                 .doOnNext(Actions.println()) //
166                 .count().toBlocking().single();
167         Thread.sleep(2000);
168     }
169 
170 }