1 package com.github.davidmoten.rx2;
2
3 import java.util.concurrent.TimeUnit;
4 import java.util.concurrent.atomic.AtomicReference;
5
6 import org.reactivestreams.Publisher;
7 import org.reactivestreams.Subscription;
8
9 import com.github.davidmoten.guavamini.Optional;
10 import com.github.davidmoten.rx2.flowable.CachedFlowable;
11 import com.github.davidmoten.rx2.flowable.CloseableFlowableWithReset;
12 import com.github.davidmoten.rx2.internal.flowable.FlowableFetchPagesByRequest;
13 import com.github.davidmoten.rx2.internal.flowable.FlowableMatch;
14 import com.github.davidmoten.rx2.internal.flowable.FlowableMergeInterleave;
15 import com.github.davidmoten.rx2.internal.flowable.FlowableRepeat;
16
17 import io.reactivex.Flowable;
18 import io.reactivex.Scheduler;
19 import io.reactivex.functions.BiFunction;
20 import io.reactivex.functions.Consumer;
21 import io.reactivex.functions.Function;
22
23 public final class Flowables {
24
25 private static final int DEFAULT_MATCH_BATCH_SIZE = 128;
26
27 private Flowables() {
28 // prevent instantiation
29 }
30
31 public static <A, B, K, C> Flowable<C> match(Flowable<A> a, Flowable<B> b,
32 Function<? super A, K> aKey, Function<? super B, K> bKey,
33 BiFunction<? super A, ? super B, C> combiner, int requestSize) {
34 return new FlowableMatch<A, B, K, C>(a, b, aKey, bKey, combiner, requestSize);
35 }
36
37 public static <A, B, K, C> Flowable<C> match(Flowable<A> a, Flowable<B> b,
38 Function<? super A, K> aKey, Function<? super B, K> bKey,
39 BiFunction<? super A, ? super B, C> combiner) {
40 return match(a, b, aKey, bKey, combiner, DEFAULT_MATCH_BATCH_SIZE);
41 }
42
43 public static <T> Flowable<T> repeat(T t) {
44 return new FlowableRepeat<T>(t, -1);
45 }
46
47 public static <T> Flowable<T> repeat(T t, long count) {
48 return new FlowableRepeat<T>(t, count);
49 }
50
51 /**
52 * <p>
53 * Creates a Flowable that is aimed at supporting calls to a service that
54 * provides data in pages where the page sizes are determined by requests from
55 * downstream (requests are a part of the backpressure machinery of RxJava).
56 *
57 * <p>
58 * <img src=
59 * "https://raw.githubusercontent.com/davidmoten/rxjava2-extras/master/src/docs/fetchPagesByRequest.png"
60 * alt="image">
61 *
62 * <p>
63 * Here's an example.
64 *
65 * <p>
66 * Suppose you have a stateless web service, say a rest service that returns
67 * JSON/XML and supplies you with
68 *
69 * <ul>
70 * <li>the most popular movies of the last 24 hours sorted by descending
71 * popularity</li>
72 * </ul>
73 *
74 * <p>
75 * The service supports paging in that you can pass it a start number and a page
76 * size and it will return just that slice from the list.
77 *
78 * <p>
79 * Now I want to give a library with a Flowable definition of this service to my
80 * colleagues that they can call in their applications whatever they may be. For
81 * example,
82 *
83 * <ul>
84 * <li>Fred may just want to know the most popular movie each day,</li>
85 * <li>Greta wants to get the top 20 and then have the ability to keep scrolling
86 * down the list in her UI.</li>
87 * </ul>
88 *
89 * <p>
90 * Let's see how we can efficiently support those use cases. I'm going to assume
91 * that the movie data returned by the service are mapped conveniently to
92 * objects by whatever framework I'm using (JAXB, Jersey, etc.). The fetch
93 * method looks like this:
94 *
95 * <pre>
96 * {@code
97 * // note that start is 0-based
98 * List<Movie> mostPopularMovies(int start, int size);
99 * }
100 * </pre>
101 *
102 * <p>
103 * Now I'm going to wrap this synchronous call as a Flowable to give to my
104 * colleagues:
105 *
106 * <pre>
107 * {@code
108 * Flowable<Movie> mostPopularMovies(int start) {
109 * return Flowables.fetchPagesByRequest(
110 * (position, n) -> Flowable.fromIterable(mostPopular(position, n)),
111 * start)
112 * // rebatch requests so that they are always between
113 * // 5 and 100 except for the first request
114 * .compose(Transformers.rebatchRequests(5, 100, false));
115 * }
116 *
117 * Flowable<Movie> mostPopularMovies() {
118 * return mostPopularMovies(0);
119 * }
120 * }
121 * </pre>
122 * <p>
123 * Note particularly that the method above uses a variant of rebatchRequests to
124 * limit both minimum and maximum requests. We particularly don't want to allow
125 * a single call requesting the top 100,000 popular movies because of the memory
126 * and network pressures that arise from that call.
127 *
128 * <p>
129 * Righto, Fred now uses the new API like this:
130 *
131 * <pre>
132 * {
133 * @code
134 * Movie top = mostPopularMovies().compose(Transformers.maxRequest(1)).first()
135 * .blockingFirst();
136 * }
137 * </pre>
138 * <p>
139 * The use of maxRequest above may seem unnecessary but strangely enough the
140 * first operator requests Long.MAX_VALUE of upstream and cancels as soon as one
141 * arrives. The take, elemnentAt and firstXXX operators all have this
142 * counter-intuitive characteristic.
143 *
144 * <p>
145 * Greta uses the new API like this:
146 *
147 * <pre>
148 * {@code
149 * mostPopularMovies()
150 * .rebatchRequests(20)
151 * .doOnNext(movie -> addToUI(movie))
152 * .subscribe(subscriber);
153 * }
154 * </pre>
155 * <p>
156 * A bit more detail about fetchPagesByRequest:
157 *
158 * <p>
159 * If the fetch function returns a Flowable that delivers fewer than the
160 * requested number of items then the overall stream completes.
161 *
162 * @param fetch
163 * a function that takes a position index and a length and returns a
164 * Flowable
165 * @param start
166 * the start index
167 * @param maxConcurrent
168 * how many pages to request concurrently
169 * @param <T>
170 * item type
171 * @return Flowable that fetches pages based on request amounts
172 */
173 public static <T> Flowable<T> fetchPagesByRequest(
174 final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch, long start,
175 int maxConcurrent) {
176 return FlowableFetchPagesByRequest.create(fetch, start, maxConcurrent);
177 }
178
179 public static <T> Flowable<T> fetchPagesByRequest(
180 final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch, long start) {
181 return fetchPagesByRequest(fetch, start, 2);
182 }
183
184 public static <T> Flowable<T> fetchPagesByRequest(
185 final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch) {
186 return fetchPagesByRequest(fetch, 0, 2);
187 }
188
189 /**
190 * Returns a cached {@link Flowable} like {@link Flowable#cache()} except that
191 * the cache can be reset by calling {@link CachedFlowable#reset()}.
192 *
193 * @param source
194 * the observable to be cached.
195 * @param <T>
196 * the generic type of the source
197 * @return a cached observable whose cache can be reset.
198 */
199 public static <T> CachedFlowable<T> cache(Flowable<T> source) {
200 return new CachedFlowable<T>(source);
201 }
202
203 /**
204 * Returns a cached {@link Flowable} like {@link Flowable#cache()} except that
205 * the cache can be reset by calling {@link CachedFlowable#reset()} and the
206 * cache will be automatically reset an interval after first subscription (or
207 * first subscription after reset). The interval is defined by {@code duration}
208 * and {@code unit} .
209 *
210 * @param source
211 * the source observable
212 * @param duration
213 * duration till next reset
214 * @param unit
215 * units corresponding to the duration
216 * @param worker
217 * worker to use for scheduling reset. Don't forget to unsubscribe
218 * the worker when no longer required.
219 * @param <T>
220 * the generic type of the source
221 * @return cached observable that resets regularly on a time interval
222 */
223 public static <T> Flowable<T> cache(final Flowable<T> source, final long duration,
224 final TimeUnit unit, final Scheduler.Worker worker) {
225 final AtomicReference<CachedFlowable<T>> cacheRef = new AtomicReference<CachedFlowable<T>>();
226 CachedFlowable<T> cache = new CachedFlowable<T>(source);
227 cacheRef.set(cache);
228 return cache.doOnSubscribe(new Consumer<Subscription>() {
229 @Override
230 public void accept(Subscription d) {
231 Runnable action = new Runnable() {
232 @Override
233 public void run() {
234 cacheRef.get().reset();
235 }
236 };
237 worker.schedule(action, duration, unit);
238 }
239 });
240 }
241
242 /**
243 * Returns a cached {@link Flowable} like {@link Flowable#cache()} except that
244 * the cache may be reset by the user calling
245 * {@link CloseableFlowableWithReset#reset}.
246 *
247 * @param source
248 * the source observable
249 * @param duration
250 * duration till next reset
251 * @param unit
252 * units corresponding to the duration
253 * @param scheduler
254 * scheduler to use for scheduling reset.
255 * @param <T>
256 * generic type of source observable
257 * @return {@link CloseableFlowableWithReset} that should be closed once
258 * finished to prevent worker memory leak.
259 */
260 public static <T> CloseableFlowableWithReset<T> cache(final Flowable<T> source,
261 final long duration, final TimeUnit unit, final Scheduler scheduler) {
262 final AtomicReference<CachedFlowable<T>> cacheRef = new AtomicReference<CachedFlowable<T>>();
263 final AtomicReference<Optional<Scheduler.Worker>> workerRef = new AtomicReference<Optional<Scheduler.Worker>>(
264 Optional.<Scheduler.Worker>absent());
265 CachedFlowable<T> cache = new CachedFlowable<T>(source);
266 cacheRef.set(cache);
267 Runnable closeAction = new Runnable() {
268 @Override
269 public void run() {
270 while (true) {
271 Optional<Scheduler.Worker> w = workerRef.get();
272 if (w == null) {
273 // we are finished
274 break;
275 } else {
276 if (workerRef.compareAndSet(w, null)) {
277 if (w.isPresent()) {
278 w.get().dispose();
279 }
280 // we are finished
281 workerRef.set(null);
282 break;
283 }
284 }
285 // if not finished then try again
286 }
287 }
288 };
289 Runnable resetAction = new Runnable() {
290
291 @Override
292 public void run() {
293 startScheduledResetAgain(duration, unit, scheduler, cacheRef, workerRef);
294 }
295 };
296 return new CloseableFlowableWithReset<T>(cache, closeAction, resetAction);
297 }
298
299 private static <T> void startScheduledResetAgain(final long duration, final TimeUnit unit,
300 final Scheduler scheduler, final AtomicReference<CachedFlowable<T>> cacheRef,
301 final AtomicReference<Optional<Scheduler.Worker>> workerRef) {
302
303 Runnable action = new Runnable() {
304 @Override
305 public void run() {
306 cacheRef.get().reset();
307 }
308 };
309 // CAS loop to cancel the current worker and create a new one
310 while (true) {
311 Optional<Scheduler.Worker> wOld = workerRef.get();
312 if (wOld == null) {
313 // we are finished
314 return;
315 }
316 Optional<Scheduler.Worker> w = Optional.of(scheduler.createWorker());
317 if (workerRef.compareAndSet(wOld, w)) {
318 if (wOld.isPresent())
319 wOld.get().dispose();
320 w.get().schedule(action, duration, unit);
321 break;
322 }
323 }
324 }
325
326 private static final int DEFAULT_RING_BUFFER_SIZE = 128;
327
328 public static <T> Flowable<T> mergeInterleaved(Publisher<? extends Publisher<? extends T>> publishers,
329 int maxConcurrency, int batchSize, boolean delayErrors) {
330 return new FlowableMergeInterleave<T>(publishers, maxConcurrency, batchSize, delayErrors);
331 }
332
333 public static <T> Flowable<T> mergeInterleaved(Publisher<? extends Publisher<? extends T>> publishers,
334 int maxConcurrency) {
335 return mergeInterleaved(publishers, maxConcurrency, 128, false);
336 }
337
338 public static <T> MergeInterleaveBuilder<T> mergeInterleaved(Publisher<? extends Publisher<? extends T>> publishers) {
339 return new MergeInterleaveBuilder<T>(publishers);
340 }
341
342 public static final class MergeInterleaveBuilder<T> {
343
344 private final Publisher<? extends Publisher<? extends T>> publishers;
345 private int maxConcurrency = 4;
346 private int batchSize = DEFAULT_RING_BUFFER_SIZE;
347 private boolean delayErrors = false;
348
349 MergeInterleaveBuilder(Publisher<? extends Publisher<? extends T>> publishers) {
350 this.publishers = publishers;
351 }
352
353 public MergeInterleaveBuilder<T> maxConcurrency(int maxConcurrency) {
354 this.maxConcurrency = maxConcurrency;
355 return this;
356 }
357
358 public MergeInterleaveBuilder<T> batchSize(int batchSize) {
359 this.batchSize = batchSize;
360 return this;
361 }
362
363 public MergeInterleaveBuilder<T> delayErrors(boolean delayErrors) {
364 this.delayErrors = delayErrors;
365 return this;
366 }
367
368 public Flowable<T> build() {
369 return mergeInterleaved(publishers, maxConcurrency, batchSize, delayErrors);
370 }
371 }
372 }