View Javadoc
1   package com.github.davidmoten.rx2;
2   
3   import java.util.concurrent.TimeUnit;
4   import java.util.concurrent.atomic.AtomicReference;
5   
6   import org.reactivestreams.Publisher;
7   import org.reactivestreams.Subscription;
8   
9   import com.github.davidmoten.guavamini.Optional;
10  import com.github.davidmoten.rx2.flowable.CachedFlowable;
11  import com.github.davidmoten.rx2.flowable.CloseableFlowableWithReset;
12  import com.github.davidmoten.rx2.internal.flowable.FlowableFetchPagesByRequest;
13  import com.github.davidmoten.rx2.internal.flowable.FlowableMatch;
14  import com.github.davidmoten.rx2.internal.flowable.FlowableMergeInterleave;
15  import com.github.davidmoten.rx2.internal.flowable.FlowableRepeat;
16  
17  import io.reactivex.Flowable;
18  import io.reactivex.Scheduler;
19  import io.reactivex.functions.BiFunction;
20  import io.reactivex.functions.Consumer;
21  import io.reactivex.functions.Function;
22  
23  public final class Flowables {
24  
25      private static final int DEFAULT_MATCH_BATCH_SIZE = 128;
26  
27      private Flowables() {
28          // prevent instantiation
29      }
30  
31      public static <A, B, K, C> Flowable<C> match(Flowable<A> a, Flowable<B> b,
32              Function<? super A, K> aKey, Function<? super B, K> bKey,
33              BiFunction<? super A, ? super B, C> combiner, int requestSize) {
34          return new FlowableMatch<A, B, K, C>(a, b, aKey, bKey, combiner, requestSize);
35      }
36  
37      public static <A, B, K, C> Flowable<C> match(Flowable<A> a, Flowable<B> b,
38              Function<? super A, K> aKey, Function<? super B, K> bKey,
39              BiFunction<? super A, ? super B, C> combiner) {
40          return match(a, b, aKey, bKey, combiner, DEFAULT_MATCH_BATCH_SIZE);
41      }
42  
43      public static <T> Flowable<T> repeat(T t) {
44          return new FlowableRepeat<T>(t, -1);
45      }
46  
47      public static <T> Flowable<T> repeat(T t, long count) {
48          return new FlowableRepeat<T>(t, count);
49      }
50  
51      /**
52       * <p>
53       * Creates a Flowable that is aimed at supporting calls to a service that
54       * provides data in pages where the page sizes are determined by requests from
55       * downstream (requests are a part of the backpressure machinery of RxJava).
56       * 
57       * <p>
58       * <img src=
59       * "https://raw.githubusercontent.com/davidmoten/rxjava2-extras/master/src/docs/fetchPagesByRequest.png"
60       * alt="image">
61       * 
62       * <p>
63       * Here's an example.
64       * 
65       * <p>
66       * Suppose you have a stateless web service, say a rest service that returns
67       * JSON/XML and supplies you with
68       * 
69       * <ul>
70       * <li>the most popular movies of the last 24 hours sorted by descending
71       * popularity</li>
72       * </ul>
73       * 
74       * <p>
75       * The service supports paging in that you can pass it a start number and a page
76       * size and it will return just that slice from the list.
77       * 
78       * <p>
79       * Now I want to give a library with a Flowable definition of this service to my
80       * colleagues that they can call in their applications whatever they may be. For
81       * example,
82       * 
83       * <ul>
84       * <li>Fred may just want to know the most popular movie each day,</li>
85       * <li>Greta wants to get the top 20 and then have the ability to keep scrolling
86       * down the list in her UI.</li>
87       * </ul>
88       * 
89       * <p>
90       * Let's see how we can efficiently support those use cases. I'm going to assume
91       * that the movie data returned by the service are mapped conveniently to
92       * objects by whatever framework I'm using (JAXB, Jersey, etc.). The fetch
93       * method looks like this:
94       * 
95       * <pre>
96       * {@code
97       * // note that start is 0-based
98       * List<Movie> mostPopularMovies(int start, int size);
99       * }
100      * </pre>
101      * 
102      * <p>
103      * Now I'm going to wrap this synchronous call as a Flowable to give to my
104      * colleagues:
105      * 
106      * <pre>
107      * {@code
108      * Flowable<Movie> mostPopularMovies(int start) {
109      *     return Flowables.fetchPagesByRequest(
110      *           (position, n) -> Flowable.fromIterable(mostPopular(position, n)),
111      *           start)
112      *         // rebatch requests so that they are always between 
113      *         // 5 and 100 except for the first request
114      *       .compose(Transformers.rebatchRequests(5, 100, false));
115      * }
116      * 
117      * Flowable<Movie> mostPopularMovies() {
118      *     return mostPopularMovies(0);
119      * }
120      * }
121      * </pre>
122      * <p>
123      * Note particularly that the method above uses a variant of rebatchRequests to
124      * limit both minimum and maximum requests. We particularly don't want to allow
125      * a single call requesting the top 100,000 popular movies because of the memory
126      * and network pressures that arise from that call.
127      * 
128      * <p>
129      * Righto, Fred now uses the new API like this:
130      * 
131      * <pre>
132      * {
133      *     &#64;code
134      *     Movie top = mostPopularMovies().compose(Transformers.maxRequest(1)).first()
135      *             .blockingFirst();
136      * }
137      * </pre>
138      * <p>
139      * The use of maxRequest above may seem unnecessary but strangely enough the
140      * first operator requests Long.MAX_VALUE of upstream and cancels as soon as one
141      * arrives. The take, elemnentAt and firstXXX operators all have this
142      * counter-intuitive characteristic.
143      * 
144      * <p>
145      * Greta uses the new API like this:
146      * 
147      * <pre>
148      * {@code
149      * mostPopularMovies()
150      *     .rebatchRequests(20)
151      *     .doOnNext(movie -> addToUI(movie))
152      *     .subscribe(subscriber);
153      * }
154      * </pre>
155      * <p>
156      * A bit more detail about fetchPagesByRequest:
157      * 
158      * <p>
159      * If the fetch function returns a Flowable that delivers fewer than the
160      * requested number of items then the overall stream completes.
161      * 
162      * @param fetch
163      *            a function that takes a position index and a length and returns a
164      *            Flowable
165      * @param start
166      *            the start index
167      * @param maxConcurrent
168      *            how many pages to request concurrently
169      * @param <T>
170      *            item type
171      * @return Flowable that fetches pages based on request amounts
172      */
173     public static <T> Flowable<T> fetchPagesByRequest(
174             final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch, long start,
175             int maxConcurrent) {
176         return FlowableFetchPagesByRequest.create(fetch, start, maxConcurrent);
177     }
178 
179     public static <T> Flowable<T> fetchPagesByRequest(
180             final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch, long start) {
181         return fetchPagesByRequest(fetch, start, 2);
182     }
183 
184     public static <T> Flowable<T> fetchPagesByRequest(
185             final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch) {
186         return fetchPagesByRequest(fetch, 0, 2);
187     }
188 
189     /**
190      * Returns a cached {@link Flowable} like {@link Flowable#cache()} except that
191      * the cache can be reset by calling {@link CachedFlowable#reset()}.
192      *
193      * @param source
194      *            the observable to be cached.
195      * @param <T>
196      *            the generic type of the source
197      * @return a cached observable whose cache can be reset.
198      */
199     public static <T> CachedFlowable<T> cache(Flowable<T> source) {
200         return new CachedFlowable<T>(source);
201     }
202 
203     /**
204      * Returns a cached {@link Flowable} like {@link Flowable#cache()} except that
205      * the cache can be reset by calling {@link CachedFlowable#reset()} and the
206      * cache will be automatically reset an interval after first subscription (or
207      * first subscription after reset). The interval is defined by {@code duration}
208      * and {@code unit} .
209      *
210      * @param source
211      *            the source observable
212      * @param duration
213      *            duration till next reset
214      * @param unit
215      *            units corresponding to the duration
216      * @param worker
217      *            worker to use for scheduling reset. Don't forget to unsubscribe
218      *            the worker when no longer required.
219      * @param <T>
220      *            the generic type of the source
221      * @return cached observable that resets regularly on a time interval
222      */
223     public static <T> Flowable<T> cache(final Flowable<T> source, final long duration,
224             final TimeUnit unit, final Scheduler.Worker worker) {
225         final AtomicReference<CachedFlowable<T>> cacheRef = new AtomicReference<CachedFlowable<T>>();
226         CachedFlowable<T> cache = new CachedFlowable<T>(source);
227         cacheRef.set(cache);
228         return cache.doOnSubscribe(new Consumer<Subscription>() {
229             @Override
230             public void accept(Subscription d) {
231                 Runnable action = new Runnable() {
232                     @Override
233                     public void run() {
234                         cacheRef.get().reset();
235                     }
236                 };
237                 worker.schedule(action, duration, unit);
238             }
239         });
240     }
241 
242     /**
243      * Returns a cached {@link Flowable} like {@link Flowable#cache()} except that
244      * the cache may be reset by the user calling
245      * {@link CloseableFlowableWithReset#reset}.
246      *
247      * @param source
248      *            the source observable
249      * @param duration
250      *            duration till next reset
251      * @param unit
252      *            units corresponding to the duration
253      * @param scheduler
254      *            scheduler to use for scheduling reset.
255      * @param <T>
256      *            generic type of source observable
257      * @return {@link CloseableFlowableWithReset} that should be closed once
258      *         finished to prevent worker memory leak.
259      */
260     public static <T> CloseableFlowableWithReset<T> cache(final Flowable<T> source,
261             final long duration, final TimeUnit unit, final Scheduler scheduler) {
262         final AtomicReference<CachedFlowable<T>> cacheRef = new AtomicReference<CachedFlowable<T>>();
263         final AtomicReference<Optional<Scheduler.Worker>> workerRef = new AtomicReference<Optional<Scheduler.Worker>>(
264                 Optional.<Scheduler.Worker>absent());
265         CachedFlowable<T> cache = new CachedFlowable<T>(source);
266         cacheRef.set(cache);
267         Runnable closeAction = new Runnable() {
268             @Override
269             public void run() {
270                 while (true) {
271                     Optional<Scheduler.Worker> w = workerRef.get();
272                     if (w == null) {
273                         // we are finished
274                         break;
275                     } else {
276                         if (workerRef.compareAndSet(w, null)) {
277                             if (w.isPresent()) {
278                                 w.get().dispose();
279                             }
280                             // we are finished
281                             workerRef.set(null);
282                             break;
283                         }
284                     }
285                     // if not finished then try again
286                 }
287             }
288         };
289         Runnable resetAction = new Runnable() {
290 
291             @Override
292             public void run() {
293                 startScheduledResetAgain(duration, unit, scheduler, cacheRef, workerRef);
294             }
295         };
296         return new CloseableFlowableWithReset<T>(cache, closeAction, resetAction);
297     }
298 
299     private static <T> void startScheduledResetAgain(final long duration, final TimeUnit unit,
300             final Scheduler scheduler, final AtomicReference<CachedFlowable<T>> cacheRef,
301             final AtomicReference<Optional<Scheduler.Worker>> workerRef) {
302 
303         Runnable action = new Runnable() {
304             @Override
305             public void run() {
306                 cacheRef.get().reset();
307             }
308         };
309         // CAS loop to cancel the current worker and create a new one
310         while (true) {
311             Optional<Scheduler.Worker> wOld = workerRef.get();
312             if (wOld == null) {
313                 // we are finished
314                 return;
315             }
316             Optional<Scheduler.Worker> w = Optional.of(scheduler.createWorker());
317             if (workerRef.compareAndSet(wOld, w)) {
318                 if (wOld.isPresent())
319                     wOld.get().dispose();
320                 w.get().schedule(action, duration, unit);
321                 break;
322             }
323         }
324     }
325 
326     private static final int DEFAULT_RING_BUFFER_SIZE = 128;
327 
328     public static <T> Flowable<T> mergeInterleaved(Publisher<? extends Publisher<? extends T>> publishers,
329             int maxConcurrency, int batchSize, boolean delayErrors) {
330         return new FlowableMergeInterleave<T>(publishers, maxConcurrency, batchSize, delayErrors);
331     }
332 
333     public static <T> Flowable<T> mergeInterleaved(Publisher<? extends Publisher<? extends T>> publishers,
334             int maxConcurrency) {
335         return mergeInterleaved(publishers, maxConcurrency, 128, false);
336     }
337 
338     public static <T> MergeInterleaveBuilder<T> mergeInterleaved(Publisher<? extends Publisher<? extends T>> publishers) {
339         return new MergeInterleaveBuilder<T>(publishers);
340     }
341 
342     public static final class MergeInterleaveBuilder<T> {
343 
344         private final Publisher<? extends Publisher<? extends T>> publishers;
345         private int maxConcurrency = 4;
346         private int batchSize = DEFAULT_RING_BUFFER_SIZE;
347         private boolean delayErrors = false;
348 
349         MergeInterleaveBuilder(Publisher<? extends Publisher<? extends T>> publishers) {
350             this.publishers = publishers;
351         }
352 
353         public MergeInterleaveBuilder<T> maxConcurrency(int maxConcurrency) {
354             this.maxConcurrency = maxConcurrency;
355             return this;
356         }
357 
358         public MergeInterleaveBuilder<T> batchSize(int batchSize) {
359             this.batchSize = batchSize;
360             return this;
361         }
362 
363         public MergeInterleaveBuilder<T> delayErrors(boolean delayErrors) {
364             this.delayErrors = delayErrors;
365             return this;
366         }
367 
368         public Flowable<T> build() {
369             return mergeInterleaved(publishers, maxConcurrency, batchSize, delayErrors);
370         }
371     }
372 }