View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.util.concurrent.TimeUnit;
4   import java.util.concurrent.atomic.AtomicInteger;
5   import java.util.concurrent.atomic.AtomicLong;
6   import java.util.concurrent.atomic.AtomicReference;
7   
8   import org.reactivestreams.Subscriber;
9   import org.reactivestreams.Subscription;
10  
11  import com.github.davidmoten.guavamini.Preconditions;
12  
13  import io.reactivex.Flowable;
14  import io.reactivex.FlowableSubscriber;
15  import io.reactivex.Scheduler;
16  import io.reactivex.Scheduler.Worker;
17  import io.reactivex.disposables.Disposable;
18  import io.reactivex.exceptions.Exceptions;
19  import io.reactivex.functions.Function;
20  import io.reactivex.internal.disposables.DisposableHelper;
21  import io.reactivex.internal.fuseable.SimplePlainQueue;
22  import io.reactivex.internal.queue.MpscLinkedQueue;
23  import io.reactivex.internal.subscriptions.SubscriptionHelper;
24  import io.reactivex.internal.util.BackpressureHelper;
25  import io.reactivex.plugins.RxJavaPlugins;
26  
27  public final class FlowableInsertTimeout<T> extends Flowable<T> {
28  
29      private final Flowable<T> source;
30      private final Function<? super T, ? extends Long> timeout;
31      private final TimeUnit unit;
32      private final Function<? super T, ? extends T> value;
33      private final Scheduler scheduler;
34  
35      public FlowableInsertTimeout(Flowable<T> source, Function<? super T, ? extends Long> timeout, TimeUnit unit,
36              Function<? super T, ? extends T> value, Scheduler scheduler) {
37          Preconditions.checkNotNull(timeout, "timeout cannot be null");
38          Preconditions.checkNotNull(unit, "unit cannot be null");
39          Preconditions.checkNotNull(value, "value cannot be null");
40          Preconditions.checkNotNull(scheduler, "scheduler cannot be null");
41          this.source = source;
42          this.timeout = timeout;
43          this.unit = unit;
44          this.value = value;
45          this.scheduler = scheduler;
46      }
47  
48      @Override
49      protected void subscribeActual(Subscriber<? super T> downstream) {
50          source.subscribe(new InsertTimeoutSubscriber<T>(downstream, timeout, unit, value, scheduler));
51      }
52  
53      static final class InsertTimeoutSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
54  
55          private static final long serialVersionUID = 1812803226317104954L;
56  
57          private static final Object TERMINATED = new Object();
58  
59          private final Subscriber<? super T> downstream;
60          private final Function<? super T, ? extends Long> timeout;
61          private final TimeUnit unit;
62          private final Function<? super T, ? extends T> value;
63  
64          private final SimplePlainQueue<T> queue;
65          private final AtomicLong requested;
66          private final AtomicLong inserted;
67  
68          // set either with null (not terminated), Throwable (error), or
69          // TERMINATED (completion). TERMINATED is also used to replace an 
70          // error after it is reported downstream so that the local copy of the 
71          // error can be gc'd
72          private final AtomicReference<Object> terminated;
73          private final AtomicReference<Disposable> scheduled;
74  
75          private Subscription upstream;
76          private volatile boolean cancelled;
77  
78          // used to prevent emission of events after a terminal event
79          // does not need to be volatile
80          private boolean finished;
81  
82          private final Worker worker;
83  
84          InsertTimeoutSubscriber(Subscriber<? super T> downstream, Function<? super T, ? extends Long> timeout,
85                  TimeUnit unit, Function<? super T, ? extends T> value, Scheduler scheduler) {
86              this.downstream = downstream;
87              this.timeout = timeout;
88              this.unit = unit;
89              this.value = value;
90              this.queue = new MpscLinkedQueue<T>();
91              this.requested = new AtomicLong();
92              this.inserted = new AtomicLong();
93              this.terminated = new AtomicReference<Object>();
94              this.scheduled = new AtomicReference<Disposable>();
95              this.worker = scheduler.createWorker();
96          }
97  
98          @Override
99          public void onSubscribe(Subscription upstream) {
100             if (SubscriptionHelper.validate(this.upstream, upstream)) {
101                 this.upstream = upstream;
102                 downstream.onSubscribe(this);
103             }
104         }
105 
106         @Override
107         public void onNext(final T t) {
108             if (finished) {
109                 return;
110             }
111             queue.offer(t);
112             final long waitTime;
113             try {
114                 waitTime = timeout.apply(t);
115             } catch (Throwable e) {
116                 Exceptions.throwIfFatal(e);
117                 // we cancel upstream ourselves because the
118                 // error did not originate from source
119                 upstream.cancel();
120                 onError(e);
121                 return;
122             }
123             TimeoutAction<T> action = new TimeoutAction<T>(this, t);
124             Disposable d = worker.schedule(action, waitTime, unit);
125             DisposableHelper.set(scheduled, d);
126             drain();
127         }
128 
129         @Override
130         public void onError(Throwable e) {
131             if (finished) {
132                 RxJavaPlugins.onError(e);
133                 return;
134             }
135             finished = true;
136             if (terminated.compareAndSet(null, e)) {
137                 dispose();
138                 drain();
139             } else {
140                 RxJavaPlugins.onError(e);
141             }
142         }
143 
144         @Override
145         public void onComplete() {
146             if (finished) {
147                 return;
148             }
149             finished = true;
150             if (terminated.compareAndSet(null, TERMINATED)) {
151                 dispose();
152                 drain();
153             }
154         }
155 
156         private void drain() {
157             if (getAndIncrement() != 0) {
158                 return;
159             }
160             // note that this drain loop does not shortcut errors
161             int missed = 1;
162             while (true) {
163                 long r = requested.get();
164                 long e = 0;
165                 while (e != r) {
166                     if (cancelled) {
167                         dispose();
168                         queue.clear();
169                         return;
170                     }
171                     Object d = terminated.get();
172                     T t = queue.poll();
173                     if (t == null) {
174                         if (d != null) {
175                             // d will be either TERMINATED or a Throwable
176                             if (d == TERMINATED) {
177                                 // don't need to dispose scheduled because already done in
178                                 // onComplete
179                                 downstream.onComplete();
180                             } else {
181                                 // clear the exception so can be gc'd
182                                 // setting the value to TERMINATED just prevents it getting set again in a race
183                                 // because the other setters which use CAS assume initial value of null
184                                 terminated.set(TERMINATED);
185                                 dispose();
186                                 downstream.onError((Throwable) d);
187                             }
188                             return;
189                         } else {
190                             // nothing to emit and not done
191                             break;
192                         }
193                     } else {
194                         downstream.onNext(t);
195                         e++;
196                     }
197                 }
198                 if (e != 0L && r != Long.MAX_VALUE) {
199                     requested.addAndGet(-e);
200                 }
201                 missed = addAndGet(-missed);
202                 if (missed == 0) {
203                     return;
204                 }
205             }
206         }
207 
208         @Override
209         public void request(long n) {
210             if (SubscriptionHelper.validate(n)) {
211                 BackpressureHelper.add(requested, n);
212                 // modify request to upstream to account for inserted values
213                 // use a CAS loop because request can be called from any thread
214                 while (true) {
215                     long ins = inserted.get();
216                     long d = Math.min(ins, n);
217                     if (inserted.compareAndSet(ins, ins - d)) {
218                         if (n - d > 0) {
219                             upstream.request(n - d);
220                         }
221                         break;
222                     }
223                 }
224                 drain();
225             }
226         }
227 
228         @Override
229         public void cancel() {
230             if (!cancelled) {
231                 cancelled = true;
232                 upstream.cancel();
233                 dispose();
234                 if (getAndIncrement() == 0) {
235                     // use the same access control to queue as drain method
236                     // because `clear` just calls `queue.poll()` repeatedly till nothing left on the
237                     // queue (ignoring the dequeued items).
238                     //
239                     // this is best endeavours, there still exists a race with onNext and drain
240                     // where items could be left on the queue after cancel
241                     queue.clear();
242                 }
243             }
244         }
245 
246         private void dispose() {
247             DisposableHelper.dispose(scheduled);
248             worker.dispose();
249         }
250         
251         void insert(T t) {
252             inserted.incrementAndGet();
253             queue.offer(t);
254             drain();
255         }
256 
257         void insertError(Throwable e) {
258             if (terminated.compareAndSet(null, e)) {
259                 upstream.cancel();
260                 dispose();
261                 drain();
262             } else {
263                 RxJavaPlugins.onError(e);
264             }
265         }
266 
267         T calculateValueToInsert(T t) throws Exception {
268             return value.apply(t);
269         }
270 
271     }
272 
273     private static final class TimeoutAction<T> implements Runnable {
274 
275         private final InsertTimeoutSubscriber<T> subscriber;
276         private final T t;
277 
278         TimeoutAction(InsertTimeoutSubscriber<T> subscriber, T t) {
279             this.subscriber = subscriber;
280             this.t = t;
281         }
282 
283         @Override
284         public void run() {
285             final T v;
286             try {
287                 v = subscriber.calculateValueToInsert(t);
288             } catch (Throwable e) {
289                 subscriber.insertError(e);
290                 return;
291             }
292             subscriber.insert(v);
293         }
294 
295     }
296 
297 }