1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.util.concurrent.atomic.AtomicInteger;
4 import java.util.concurrent.atomic.AtomicLong;
5 import java.util.concurrent.atomic.AtomicReference;
6
7 import org.reactivestreams.Subscriber;
8 import org.reactivestreams.Subscription;
9
10 import com.github.davidmoten.guavamini.Preconditions;
11
12 import io.reactivex.Flowable;
13 import io.reactivex.FlowableSubscriber;
14 import io.reactivex.Maybe;
15 import io.reactivex.MaybeObserver;
16 import io.reactivex.disposables.Disposable;
17 import io.reactivex.exceptions.Exceptions;
18 import io.reactivex.functions.Function;
19 import io.reactivex.internal.disposables.DisposableHelper;
20 import io.reactivex.internal.fuseable.SimplePlainQueue;
21 import io.reactivex.internal.queue.MpscLinkedQueue;
22 import io.reactivex.internal.subscriptions.SubscriptionHelper;
23 import io.reactivex.internal.util.BackpressureHelper;
24 import io.reactivex.plugins.RxJavaPlugins;
25
26 public final class FlowableInsertMaybe<T> extends Flowable<T> {
27
28 private final Flowable<T> source;
29 private final Function<? super T, ? extends Maybe<? extends T>> valueToInsert;
30
31 public FlowableInsertMaybe(Flowable<T> source, Function<? super T, ? extends Maybe<? extends T>> valueToInsert) {
32 Preconditions.checkNotNull(valueToInsert, "valueToInsert cannot be null");
33 this.source = source;
34 this.valueToInsert = valueToInsert;
35 }
36
37 @Override
38 protected void subscribeActual(Subscriber<? super T> downstream) {
39 source.subscribe(new InsertSubscriber<T>(downstream, valueToInsert));
40 }
41
42 static final class InsertSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
43
44 private static final long serialVersionUID = -15415234346097063L;
45
46 private final Subscriber<? super T> downstream;
47 private final Function<? super T, ? extends Maybe<? extends T>> valueToInsert;
48 private final SimplePlainQueue<T> queue;
49 private final AtomicLong requested;
50 private final AtomicLong inserted;
51
52
53
54
55 private final AtomicReference<Object> error;
56 private final AtomicReference<Disposable> valueToInsertObserver;
57
58 private Subscription upstream;
59 private volatile boolean done;
60 private volatile boolean cancelled;
61
62
63
64 private boolean finished;
65
66 InsertSubscriber(Subscriber<? super T> downstream,
67 Function<? super T, ? extends Maybe<? extends T>> valueToInsert) {
68 this.downstream = downstream;
69 this.valueToInsert = valueToInsert;
70 this.queue = new MpscLinkedQueue<T>();
71 this.requested = new AtomicLong();
72 this.inserted = new AtomicLong();
73 this.error = new AtomicReference<Object>();
74 this.valueToInsertObserver = new AtomicReference<Disposable>();
75 }
76
77 @Override
78 public void onSubscribe(Subscription upstream) {
79 if (SubscriptionHelper.validate(this.upstream, upstream)) {
80 this.upstream = upstream;
81 downstream.onSubscribe(this);
82 }
83 }
84
85 @Override
86 public void onNext(T t) {
87 if (finished) {
88 return;
89 }
90 queue.offer(t);
91 Maybe<? extends T> maybe;
92 try {
93 maybe = valueToInsert.apply(t);
94 } catch (Throwable e) {
95 Exceptions.throwIfFatal(e);
96
97
98 upstream.cancel();
99 onError(e);
100 return;
101 }
102 ValueToInsertObserver<T> o = new ValueToInsertObserver<T>(this);
103 if (DisposableHelper.set(valueToInsertObserver, o)) {
104
105
106
107 maybe.subscribe(o);
108 }
109 drain();
110 }
111
112 @Override
113 public void onError(Throwable e) {
114 if (finished) {
115 RxJavaPlugins.onError(e);
116 return;
117 }
118 finished = true;
119 if (error.compareAndSet(null, e)) {
120 DisposableHelper.dispose(valueToInsertObserver);
121 done = true;
122 drain();
123 } else {
124 RxJavaPlugins.onError(e);
125 }
126 }
127
128 @Override
129 public void onComplete() {
130 if (finished) {
131 return;
132 }
133 finished = true;
134 DisposableHelper.dispose(valueToInsertObserver);
135 done = true;
136 drain();
137 }
138
139 private void drain() {
140 if (getAndIncrement() != 0) {
141 return;
142 }
143
144 int missed = 1;
145 while (true) {
146 long r = requested.get();
147 long e = 0;
148 while (e != r) {
149 if (cancelled) {
150 DisposableHelper.dispose(valueToInsertObserver);
151 queue.clear();
152 return;
153 }
154
155 boolean d = done;
156 T t = queue.poll();
157 if (t == null) {
158 if (d) {
159 Object err = error.get();
160 if (err != null) {
161
162
163
164
165 error.set(this);
166 DisposableHelper.dispose(valueToInsertObserver);
167 downstream.onError((Throwable) err);
168 } else {
169
170
171 downstream.onComplete();
172 }
173 return;
174 } else {
175
176 break;
177 }
178 } else {
179 downstream.onNext(t);
180 e++;
181 }
182 }
183 if (e != 0L && r != Long.MAX_VALUE) {
184 requested.addAndGet(-e);
185 }
186 missed = addAndGet(-missed);
187 if (missed == 0) {
188 return;
189 }
190 }
191 }
192
193 @Override
194 public void request(long n) {
195 if (SubscriptionHelper.validate(n)) {
196 BackpressureHelper.add(requested, n);
197
198
199 while (true) {
200 long ins = inserted.get();
201 long d = Math.min(ins, n);
202 if (inserted.compareAndSet(ins, ins - d)) {
203 if (n - d > 0) {
204 upstream.request(n - d);
205 }
206 break;
207 }
208 }
209 drain();
210 }
211 }
212
213 @Override
214 public void cancel() {
215 if (!cancelled) {
216 cancelled = true;
217 upstream.cancel();
218 DisposableHelper.dispose(valueToInsertObserver);
219 if (getAndIncrement() == 0) {
220
221
222
223
224
225
226 queue.clear();
227 }
228 }
229 }
230
231 void insert(T t) {
232 inserted.incrementAndGet();
233 queue.offer(t);
234 drain();
235 }
236
237 void insertError(Throwable e) {
238 if (error.compareAndSet(null, e)) {
239 upstream.cancel();
240 DisposableHelper.dispose(valueToInsertObserver);
241 done = true;
242 drain();
243 } else {
244 RxJavaPlugins.onError(e);
245 }
246 }
247
248 }
249
250 static final class ValueToInsertObserver<T> extends AtomicReference<Disposable>
251 implements MaybeObserver<T>, Disposable {
252
253 private static final long serialVersionUID = 41384726414575403L;
254
255 private final InsertSubscriber<T> downstream;
256
257 ValueToInsertObserver(InsertSubscriber<T> downstream) {
258 this.downstream = downstream;
259 }
260
261 @Override
262 public void onSubscribe(Disposable upstream) {
263
264
265
266 DisposableHelper.setOnce(this, upstream);
267 }
268
269 @Override
270 public void onSuccess(T t) {
271 lazySet(DisposableHelper.DISPOSED);
272 downstream.insert(t);
273 }
274
275 @Override
276 public void onError(Throwable e) {
277 lazySet(DisposableHelper.DISPOSED);
278 downstream.insertError(e);
279 }
280
281 @Override
282 public void onComplete() {
283 lazySet(DisposableHelper.DISPOSED);
284
285 }
286
287 @Override
288 public void dispose() {
289 DisposableHelper.dispose(this);
290 }
291
292 @Override
293 public boolean isDisposed() {
294 return get() == DisposableHelper.DISPOSED;
295 }
296
297 }
298 }