1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.util.concurrent.atomic.AtomicLong;
4
5 import com.github.davidmoten.rx.Actions;
6 import com.github.davidmoten.rx.Transformers;
7 import com.github.davidmoten.rx.util.BackpressureUtils;
8
9 import rx.Observable;
10 import rx.Observable.Operator;
11 import rx.Observable.Transformer;
12 import rx.Subscriber;
13 import rx.functions.Action0;
14 import rx.functions.Action1;
15 import rx.functions.Func0;
16 import rx.schedulers.Schedulers;
17
18 public final class TransformerOnBackpressureBufferRequestLimiting<T> implements Transformer<T, T> {
19
20 private static final TransformerOnBackpressureBufferRequestLimiting<Object> instance = new TransformerOnBackpressureBufferRequestLimiting<Object>();
21
22 @SuppressWarnings("unchecked")
23 public static final <T> TransformerOnBackpressureBufferRequestLimiting<T> instance() {
24 return (TransformerOnBackpressureBufferRequestLimiting<T>) instance;
25 }
26
27 @Override
28 public Observable<T> call(final Observable<T> o) {
29 return Observable.defer(new Func0<Observable<T>>() {
30 @Override
31 public Observable<T> call() {
32 final OperatorPassThroughAdjustedRequest<T> op = new OperatorPassThroughAdjustedRequest<T>();
33 return o.lift(op).onBackpressureBuffer().doOnRequest(new Action1<Long>() {
34
35 @Override
36 public void call(Long n) {
37 op.requestMore(n);
38 }
39 });
40 }
41 });
42 }
43
44
45
46
47
48
49
50
51
52 private static final class OperatorPassThroughAdjustedRequest<T> implements Operator<T, T> {
53
54 private volatile ParentSubscriber<T> parent;
55 private final AtomicLong requested = new AtomicLong();
56 private final Object lock = new Object();
57
58 @Override
59 public Subscriber<? super T> call(Subscriber<? super T> child) {
60
61
62 ParentSubscriber<T> p = new ParentSubscriber<T>(child);
63 synchronized (lock) {
64 parent = p;
65 }
66 p.requestMore(requested.get());
67 child.add(p);
68 return p;
69 }
70
71 public void requestMore(long n) {
72 ParentSubscriber<T> p = parent;
73 if (p != null) {
74 p.requestMore(n);
75 } else {
76 synchronized (lock) {
77 ParentSubscriber<T> par = parent;
78 if (par == null) {
79 BackpressureUtils.getAndAddRequest(requested, n);
80 } else {
81 par.requestMore(n);
82 }
83 }
84 }
85 }
86
87 }
88
89 private static final class ParentSubscriber<T> extends Subscriber<T> {
90
91 private final Subscriber<? super T> child;
92 private final AtomicLong expected = new AtomicLong();
93
94 public ParentSubscriber(Subscriber<? super T> child) {
95 this.child = child;
96 request(0);
97 }
98
99 public void requestMore(long n) {
100 if (n <= 0) {
101 return;
102 }
103 long r = expected.get();
104 if (r == Long.MAX_VALUE) {
105 return;
106 } else {
107 long u = r;
108 while (true) {
109 long sum = u + n;
110 final long v;
111 if (sum < 0) {
112 v = Long.MAX_VALUE;
113 } else {
114 v = sum;
115 }
116 if (expected.compareAndSet(u, v)) {
117
118 long diff = Math.max(0, v);
119 long req = Math.min(n, diff);
120 if (req > 0) {
121 request(req);
122 }
123 return;
124 } else {
125 u = expected.get();
126 }
127 }
128 }
129 }
130
131 @Override
132 public void onCompleted() {
133 child.onCompleted();
134 }
135
136 @Override
137 public void onError(Throwable e) {
138 child.onError(e);
139 }
140
141 @Override
142 public void onNext(T t) {
143 expected.decrementAndGet();
144 child.onNext(t);
145 }
146
147 }
148
149 public static void main(String[] args) throws InterruptedException {
150 Observable.range(1, 10000)
151 .doOnRequest(new Action1<Long>() {
152 @Override
153 public void call(Long n) {
154 System.out.println("requested " + n);
155 }
156 }).doOnUnsubscribe(new Action0() {
157 @Override
158 public void call() {
159 System.out.println("unsubscribed");
160 }
161 })
162 .compose(Transformers.<Integer> onBackpressureBufferRequestLimiting())
163 .take(10)
164 .subscribeOn(Schedulers.io())
165 .doOnNext(Actions.println())
166 .count().toBlocking().single();
167 Thread.sleep(2000);
168 }
169
170 }