1 package com.github.davidmoten.rx2;
2
3 import java.util.concurrent.TimeUnit;
4 import java.util.concurrent.atomic.AtomicReference;
5
6 import org.reactivestreams.Publisher;
7 import org.reactivestreams.Subscription;
8
9 import com.github.davidmoten.guavamini.Optional;
10 import com.github.davidmoten.rx2.flowable.CachedFlowable;
11 import com.github.davidmoten.rx2.flowable.CloseableFlowableWithReset;
12 import com.github.davidmoten.rx2.internal.flowable.FlowableFetchPagesByRequest;
13 import com.github.davidmoten.rx2.internal.flowable.FlowableMatch;
14 import com.github.davidmoten.rx2.internal.flowable.FlowableMergeInterleave;
15 import com.github.davidmoten.rx2.internal.flowable.FlowableRepeat;
16
17 import io.reactivex.Flowable;
18 import io.reactivex.Scheduler;
19 import io.reactivex.functions.BiFunction;
20 import io.reactivex.functions.Consumer;
21 import io.reactivex.functions.Function;
22
23 public final class Flowables {
24
25 private static final int DEFAULT_MATCH_BATCH_SIZE = 128;
26
27 private Flowables() {
28
29 }
30
31 public static <A, B, K, C> Flowable<C> match(Flowable<A> a, Flowable<B> b,
32 Function<? super A, K> aKey, Function<? super B, K> bKey,
33 BiFunction<? super A, ? super B, C> combiner, int requestSize) {
34 return new FlowableMatch<A, B, K, C>(a, b, aKey, bKey, combiner, requestSize);
35 }
36
37 public static <A, B, K, C> Flowable<C> match(Flowable<A> a, Flowable<B> b,
38 Function<? super A, K> aKey, Function<? super B, K> bKey,
39 BiFunction<? super A, ? super B, C> combiner) {
40 return match(a, b, aKey, bKey, combiner, DEFAULT_MATCH_BATCH_SIZE);
41 }
42
43 public static <T> Flowable<T> repeat(T t) {
44 return new FlowableRepeat<T>(t, -1);
45 }
46
47 public static <T> Flowable<T> repeat(T t, long count) {
48 return new FlowableRepeat<T>(t, count);
49 }
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173 public static <T> Flowable<T> fetchPagesByRequest(
174 final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch, long start,
175 int maxConcurrent) {
176 return FlowableFetchPagesByRequest.create(fetch, start, maxConcurrent);
177 }
178
179 public static <T> Flowable<T> fetchPagesByRequest(
180 final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch, long start) {
181 return fetchPagesByRequest(fetch, start, 2);
182 }
183
184 public static <T> Flowable<T> fetchPagesByRequest(
185 final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch) {
186 return fetchPagesByRequest(fetch, 0, 2);
187 }
188
189
190
191
192
193
194
195
196
197
198
199 public static <T> CachedFlowable<T> cache(Flowable<T> source) {
200 return new CachedFlowable<T>(source);
201 }
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223 public static <T> Flowable<T> cache(final Flowable<T> source, final long duration,
224 final TimeUnit unit, final Scheduler.Worker worker) {
225 final AtomicReference<CachedFlowable<T>> cacheRef = new AtomicReference<CachedFlowable<T>>();
226 CachedFlowable<T> cache = new CachedFlowable<T>(source);
227 cacheRef.set(cache);
228 return cache.doOnSubscribe(new Consumer<Subscription>() {
229 @Override
230 public void accept(Subscription d) {
231 Runnable action = new Runnable() {
232 @Override
233 public void run() {
234 cacheRef.get().reset();
235 }
236 };
237 worker.schedule(action, duration, unit);
238 }
239 });
240 }
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260 public static <T> CloseableFlowableWithReset<T> cache(final Flowable<T> source,
261 final long duration, final TimeUnit unit, final Scheduler scheduler) {
262 final AtomicReference<CachedFlowable<T>> cacheRef = new AtomicReference<CachedFlowable<T>>();
263 final AtomicReference<Optional<Scheduler.Worker>> workerRef = new AtomicReference<Optional<Scheduler.Worker>>(
264 Optional.<Scheduler.Worker>absent());
265 CachedFlowable<T> cache = new CachedFlowable<T>(source);
266 cacheRef.set(cache);
267 Runnable closeAction = new Runnable() {
268 @Override
269 public void run() {
270 while (true) {
271 Optional<Scheduler.Worker> w = workerRef.get();
272 if (w == null) {
273
274 break;
275 } else {
276 if (workerRef.compareAndSet(w, null)) {
277 if (w.isPresent()) {
278 w.get().dispose();
279 }
280
281 workerRef.set(null);
282 break;
283 }
284 }
285
286 }
287 }
288 };
289 Runnable resetAction = new Runnable() {
290
291 @Override
292 public void run() {
293 startScheduledResetAgain(duration, unit, scheduler, cacheRef, workerRef);
294 }
295 };
296 return new CloseableFlowableWithReset<T>(cache, closeAction, resetAction);
297 }
298
299 private static <T> void startScheduledResetAgain(final long duration, final TimeUnit unit,
300 final Scheduler scheduler, final AtomicReference<CachedFlowable<T>> cacheRef,
301 final AtomicReference<Optional<Scheduler.Worker>> workerRef) {
302
303 Runnable action = new Runnable() {
304 @Override
305 public void run() {
306 cacheRef.get().reset();
307 }
308 };
309
310 while (true) {
311 Optional<Scheduler.Worker> wOld = workerRef.get();
312 if (wOld == null) {
313
314 return;
315 }
316 Optional<Scheduler.Worker> w = Optional.of(scheduler.createWorker());
317 if (workerRef.compareAndSet(wOld, w)) {
318 if (wOld.isPresent())
319 wOld.get().dispose();
320 w.get().schedule(action, duration, unit);
321 break;
322 }
323 }
324 }
325
326 private static final int DEFAULT_RING_BUFFER_SIZE = 128;
327
328 public static <T> Flowable<T> mergeInterleaved(Publisher<? extends Publisher<? extends T>> publishers,
329 int maxConcurrency, int batchSize, boolean delayErrors) {
330 return new FlowableMergeInterleave<T>(publishers, maxConcurrency, batchSize, delayErrors);
331 }
332
333 public static <T> Flowable<T> mergeInterleaved(Publisher<? extends Publisher<? extends T>> publishers,
334 int maxConcurrency) {
335 return mergeInterleaved(publishers, maxConcurrency, 128, false);
336 }
337
338 public static <T> MergeInterleaveBuilder<T> mergeInterleaved(Publisher<? extends Publisher<? extends T>> publishers) {
339 return new MergeInterleaveBuilder<T>(publishers);
340 }
341
342 public static final class MergeInterleaveBuilder<T> {
343
344 private final Publisher<? extends Publisher<? extends T>> publishers;
345 private int maxConcurrency = 4;
346 private int batchSize = DEFAULT_RING_BUFFER_SIZE;
347 private boolean delayErrors = false;
348
349 MergeInterleaveBuilder(Publisher<? extends Publisher<? extends T>> publishers) {
350 this.publishers = publishers;
351 }
352
353 public MergeInterleaveBuilder<T> maxConcurrency(int maxConcurrency) {
354 this.maxConcurrency = maxConcurrency;
355 return this;
356 }
357
358 public MergeInterleaveBuilder<T> batchSize(int batchSize) {
359 this.batchSize = batchSize;
360 return this;
361 }
362
363 public MergeInterleaveBuilder<T> delayErrors(boolean delayErrors) {
364 this.delayErrors = delayErrors;
365 return this;
366 }
367
368 public Flowable<T> build() {
369 return mergeInterleaved(publishers, maxConcurrency, batchSize, delayErrors);
370 }
371 }
372 }