1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.util.concurrent.TimeUnit;
4 import java.util.concurrent.atomic.AtomicInteger;
5 import java.util.concurrent.atomic.AtomicLong;
6 import java.util.concurrent.atomic.AtomicReference;
7
8 import org.reactivestreams.Subscriber;
9 import org.reactivestreams.Subscription;
10
11 import com.github.davidmoten.guavamini.Preconditions;
12
13 import io.reactivex.Flowable;
14 import io.reactivex.FlowableSubscriber;
15 import io.reactivex.Scheduler;
16 import io.reactivex.Scheduler.Worker;
17 import io.reactivex.disposables.Disposable;
18 import io.reactivex.exceptions.Exceptions;
19 import io.reactivex.functions.Function;
20 import io.reactivex.internal.disposables.DisposableHelper;
21 import io.reactivex.internal.fuseable.SimplePlainQueue;
22 import io.reactivex.internal.queue.MpscLinkedQueue;
23 import io.reactivex.internal.subscriptions.SubscriptionHelper;
24 import io.reactivex.internal.util.BackpressureHelper;
25 import io.reactivex.plugins.RxJavaPlugins;
26
27 public final class FlowableInsertTimeout<T> extends Flowable<T> {
28
29 private final Flowable<T> source;
30 private final Function<? super T, ? extends Long> timeout;
31 private final TimeUnit unit;
32 private final Function<? super T, ? extends T> value;
33 private final Scheduler scheduler;
34
35 public FlowableInsertTimeout(Flowable<T> source, Function<? super T, ? extends Long> timeout, TimeUnit unit,
36 Function<? super T, ? extends T> value, Scheduler scheduler) {
37 Preconditions.checkNotNull(timeout, "timeout cannot be null");
38 Preconditions.checkNotNull(unit, "unit cannot be null");
39 Preconditions.checkNotNull(value, "value cannot be null");
40 Preconditions.checkNotNull(scheduler, "scheduler cannot be null");
41 this.source = source;
42 this.timeout = timeout;
43 this.unit = unit;
44 this.value = value;
45 this.scheduler = scheduler;
46 }
47
48 @Override
49 protected void subscribeActual(Subscriber<? super T> downstream) {
50 source.subscribe(new InsertTimeoutSubscriber<T>(downstream, timeout, unit, value, scheduler));
51 }
52
53 static final class InsertTimeoutSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
54
55 private static final long serialVersionUID = 1812803226317104954L;
56
57 private static final Object TERMINATED = new Object();
58
59 private final Subscriber<? super T> downstream;
60 private final Function<? super T, ? extends Long> timeout;
61 private final TimeUnit unit;
62 private final Function<? super T, ? extends T> value;
63
64 private final SimplePlainQueue<T> queue;
65 private final AtomicLong requested;
66 private final AtomicLong inserted;
67
68
69
70
71
72 private final AtomicReference<Object> terminated;
73 private final AtomicReference<Disposable> scheduled;
74
75 private Subscription upstream;
76 private volatile boolean cancelled;
77
78
79
80 private boolean finished;
81
82 private final Worker worker;
83
84 InsertTimeoutSubscriber(Subscriber<? super T> downstream, Function<? super T, ? extends Long> timeout,
85 TimeUnit unit, Function<? super T, ? extends T> value, Scheduler scheduler) {
86 this.downstream = downstream;
87 this.timeout = timeout;
88 this.unit = unit;
89 this.value = value;
90 this.queue = new MpscLinkedQueue<T>();
91 this.requested = new AtomicLong();
92 this.inserted = new AtomicLong();
93 this.terminated = new AtomicReference<Object>();
94 this.scheduled = new AtomicReference<Disposable>();
95 this.worker = scheduler.createWorker();
96 }
97
98 @Override
99 public void onSubscribe(Subscription upstream) {
100 if (SubscriptionHelper.validate(this.upstream, upstream)) {
101 this.upstream = upstream;
102 downstream.onSubscribe(this);
103 }
104 }
105
106 @Override
107 public void onNext(final T t) {
108 if (finished) {
109 return;
110 }
111 queue.offer(t);
112 final long waitTime;
113 try {
114 waitTime = timeout.apply(t);
115 } catch (Throwable e) {
116 Exceptions.throwIfFatal(e);
117
118
119 upstream.cancel();
120 onError(e);
121 return;
122 }
123 TimeoutAction<T> action = new TimeoutAction<T>(this, t);
124 Disposable d = worker.schedule(action, waitTime, unit);
125 DisposableHelper.set(scheduled, d);
126 drain();
127 }
128
129 @Override
130 public void onError(Throwable e) {
131 if (finished) {
132 RxJavaPlugins.onError(e);
133 return;
134 }
135 finished = true;
136 if (terminated.compareAndSet(null, e)) {
137 dispose();
138 drain();
139 } else {
140 RxJavaPlugins.onError(e);
141 }
142 }
143
144 @Override
145 public void onComplete() {
146 if (finished) {
147 return;
148 }
149 finished = true;
150 if (terminated.compareAndSet(null, TERMINATED)) {
151 dispose();
152 drain();
153 }
154 }
155
156 private void drain() {
157 if (getAndIncrement() != 0) {
158 return;
159 }
160
161 int missed = 1;
162 while (true) {
163 long r = requested.get();
164 long e = 0;
165 while (e != r) {
166 if (cancelled) {
167 dispose();
168 queue.clear();
169 return;
170 }
171 Object d = terminated.get();
172 T t = queue.poll();
173 if (t == null) {
174 if (d != null) {
175
176 if (d == TERMINATED) {
177
178
179 downstream.onComplete();
180 } else {
181
182
183
184 terminated.set(TERMINATED);
185 dispose();
186 downstream.onError((Throwable) d);
187 }
188 return;
189 } else {
190
191 break;
192 }
193 } else {
194 downstream.onNext(t);
195 e++;
196 }
197 }
198 if (e != 0L && r != Long.MAX_VALUE) {
199 requested.addAndGet(-e);
200 }
201 missed = addAndGet(-missed);
202 if (missed == 0) {
203 return;
204 }
205 }
206 }
207
208 @Override
209 public void request(long n) {
210 if (SubscriptionHelper.validate(n)) {
211 BackpressureHelper.add(requested, n);
212
213
214 while (true) {
215 long ins = inserted.get();
216 long d = Math.min(ins, n);
217 if (inserted.compareAndSet(ins, ins - d)) {
218 if (n - d > 0) {
219 upstream.request(n - d);
220 }
221 break;
222 }
223 }
224 drain();
225 }
226 }
227
228 @Override
229 public void cancel() {
230 if (!cancelled) {
231 cancelled = true;
232 upstream.cancel();
233 dispose();
234 if (getAndIncrement() == 0) {
235
236
237
238
239
240
241 queue.clear();
242 }
243 }
244 }
245
246 private void dispose() {
247 DisposableHelper.dispose(scheduled);
248 worker.dispose();
249 }
250
251 void insert(T t) {
252 inserted.incrementAndGet();
253 queue.offer(t);
254 drain();
255 }
256
257 void insertError(Throwable e) {
258 if (terminated.compareAndSet(null, e)) {
259 upstream.cancel();
260 dispose();
261 drain();
262 } else {
263 RxJavaPlugins.onError(e);
264 }
265 }
266
267 T calculateValueToInsert(T t) throws Exception {
268 return value.apply(t);
269 }
270
271 }
272
273 private static final class TimeoutAction<T> implements Runnable {
274
275 private final InsertTimeoutSubscriber<T> subscriber;
276 private final T t;
277
278 TimeoutAction(InsertTimeoutSubscriber<T> subscriber, T t) {
279 this.subscriber = subscriber;
280 this.t = t;
281 }
282
283 @Override
284 public void run() {
285 final T v;
286 try {
287 v = subscriber.calculateValueToInsert(t);
288 } catch (Throwable e) {
289 subscriber.insertError(e);
290 return;
291 }
292 subscriber.insert(v);
293 }
294
295 }
296
297 }