public final class Flowables extends Object
Modifier and Type | Class and Description |
---|---|
static class |
Flowables.MergeInterleaveBuilder<T> |
Modifier and Type | Method and Description |
---|---|
static <T> CachedFlowable<T> |
cache(io.reactivex.Flowable<T> source)
Returns a cached
Flowable like Flowable.cache() except that
the cache can be reset by calling CachedFlowable.reset() . |
static <T> io.reactivex.Flowable<T> |
cache(io.reactivex.Flowable<T> source,
long duration,
TimeUnit unit,
io.reactivex.Scheduler.Worker worker)
Returns a cached
Flowable like Flowable.cache() except that
the cache can be reset by calling CachedFlowable.reset() and the
cache will be automatically reset an interval after first subscription (or
first subscription after reset). |
static <T> CloseableFlowableWithReset<T> |
cache(io.reactivex.Flowable<T> source,
long duration,
TimeUnit unit,
io.reactivex.Scheduler scheduler)
Returns a cached
Flowable like Flowable.cache() except that
the cache may be reset by the user calling
CloseableFlowableWithReset.reset() . |
static <T> io.reactivex.Flowable<T> |
fetchPagesByRequest(io.reactivex.functions.BiFunction<? super Long,? super Long,? extends io.reactivex.Flowable<T>> fetch) |
static <T> io.reactivex.Flowable<T> |
fetchPagesByRequest(io.reactivex.functions.BiFunction<? super Long,? super Long,? extends io.reactivex.Flowable<T>> fetch,
long start) |
static <T> io.reactivex.Flowable<T> |
fetchPagesByRequest(io.reactivex.functions.BiFunction<? super Long,? super Long,? extends io.reactivex.Flowable<T>> fetch,
long start,
int maxConcurrent)
Creates a Flowable that is aimed at supporting calls to a service that
provides data in pages where the page sizes are determined by requests from
downstream (requests are a part of the backpressure machinery of RxJava).
|
static <A,B,K,C> io.reactivex.Flowable<C> |
match(io.reactivex.Flowable<A> a,
io.reactivex.Flowable<B> b,
io.reactivex.functions.Function<? super A,K> aKey,
io.reactivex.functions.Function<? super B,K> bKey,
io.reactivex.functions.BiFunction<? super A,? super B,C> combiner) |
static <A,B,K,C> io.reactivex.Flowable<C> |
match(io.reactivex.Flowable<A> a,
io.reactivex.Flowable<B> b,
io.reactivex.functions.Function<? super A,K> aKey,
io.reactivex.functions.Function<? super B,K> bKey,
io.reactivex.functions.BiFunction<? super A,? super B,C> combiner,
int requestSize) |
static <T> Flowables.MergeInterleaveBuilder<T> |
mergeInterleaved(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<? extends T>> publishers) |
static <T> io.reactivex.Flowable<T> |
mergeInterleaved(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<? extends T>> publishers,
int maxConcurrency) |
static <T> io.reactivex.Flowable<T> |
mergeInterleaved(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<? extends T>> publishers,
int maxConcurrency,
int batchSize,
boolean delayErrors) |
static <T> io.reactivex.Flowable<T> |
repeat(T t) |
static <T> io.reactivex.Flowable<T> |
repeat(T t,
long count) |
public static <A,B,K,C> io.reactivex.Flowable<C> match(io.reactivex.Flowable<A> a, io.reactivex.Flowable<B> b, io.reactivex.functions.Function<? super A,K> aKey, io.reactivex.functions.Function<? super B,K> bKey, io.reactivex.functions.BiFunction<? super A,? super B,C> combiner, int requestSize)
public static <A,B,K,C> io.reactivex.Flowable<C> match(io.reactivex.Flowable<A> a, io.reactivex.Flowable<B> b, io.reactivex.functions.Function<? super A,K> aKey, io.reactivex.functions.Function<? super B,K> bKey, io.reactivex.functions.BiFunction<? super A,? super B,C> combiner)
public static <T> io.reactivex.Flowable<T> repeat(T t)
public static <T> io.reactivex.Flowable<T> repeat(T t, long count)
public static <T> io.reactivex.Flowable<T> fetchPagesByRequest(io.reactivex.functions.BiFunction<? super Long,? super Long,? extends io.reactivex.Flowable<T>> fetch, long start, int maxConcurrent)
Creates a Flowable that is aimed at supporting calls to a service that provides data in pages where the page sizes are determined by requests from downstream (requests are a part of the backpressure machinery of RxJava).
Here's an example.
Suppose you have a stateless web service, say a rest service that returns JSON/XML and supplies you with
The service supports paging in that you can pass it a start number and a page size and it will return just that slice from the list.
Now I want to give a library with a Flowable definition of this service to my colleagues that they can call in their applications whatever they may be. For example,
Let's see how we can efficiently support those use cases. I'm going to assume that the movie data returned by the service are mapped conveniently to objects by whatever framework I'm using (JAXB, Jersey, etc.). The fetch method looks like this:
// note that start is 0-based
List<Movie> mostPopularMovies(int start, int size);
Now I'm going to wrap this synchronous call as a Flowable to give to my colleagues:
Flowable<Movie> mostPopularMovies(int start) {
return Flowables.fetchPagesByRequest(
(position, n) -> Flowable.fromIterable(mostPopular(position, n)),
start)
// rebatch requests so that they are always between
// 5 and 100 except for the first request
.compose(Transformers.rebatchRequests(5, 100, false));
}
Flowable<Movie> mostPopularMovies() {
return mostPopularMovies(0);
}
Note particularly that the method above uses a variant of rebatchRequests to limit both minimum and maximum requests. We particularly don't want to allow a single call requesting the top 100,000 popular movies because of the memory and network pressures that arise from that call.
Righto, Fred now uses the new API like this:
{ @code Movie top = mostPopularMovies().compose(Transformers.maxRequest(1)).first() .blockingFirst(); }
The use of maxRequest above may seem unnecessary but strangely enough the first operator requests Long.MAX_VALUE of upstream and cancels as soon as one arrives. The take, elemnentAt and firstXXX operators all have this counter-intuitive characteristic.
Greta uses the new API like this:
mostPopularMovies()
.rebatchRequests(20)
.doOnNext(movie -> addToUI(movie))
.subscribe(subscriber);
A bit more detail about fetchPagesByRequest:
If the fetch function returns a Flowable that delivers fewer than the requested number of items then the overall stream completes.
T
- item typefetch
- a function that takes a position index and a length and returns a
Flowablestart
- the start indexmaxConcurrent
- how many pages to request concurrentlypublic static <T> io.reactivex.Flowable<T> fetchPagesByRequest(io.reactivex.functions.BiFunction<? super Long,? super Long,? extends io.reactivex.Flowable<T>> fetch, long start)
public static <T> io.reactivex.Flowable<T> fetchPagesByRequest(io.reactivex.functions.BiFunction<? super Long,? super Long,? extends io.reactivex.Flowable<T>> fetch)
public static <T> CachedFlowable<T> cache(io.reactivex.Flowable<T> source)
Flowable
like Flowable.cache()
except that
the cache can be reset by calling CachedFlowable.reset()
.T
- the generic type of the sourcesource
- the observable to be cached.public static <T> io.reactivex.Flowable<T> cache(io.reactivex.Flowable<T> source, long duration, TimeUnit unit, io.reactivex.Scheduler.Worker worker)
Flowable
like Flowable.cache()
except that
the cache can be reset by calling CachedFlowable.reset()
and the
cache will be automatically reset an interval after first subscription (or
first subscription after reset). The interval is defined by duration
and unit
.T
- the generic type of the sourcesource
- the source observableduration
- duration till next resetunit
- units corresponding to the durationworker
- worker to use for scheduling reset. Don't forget to unsubscribe
the worker when no longer required.public static <T> CloseableFlowableWithReset<T> cache(io.reactivex.Flowable<T> source, long duration, TimeUnit unit, io.reactivex.Scheduler scheduler)
Flowable
like Flowable.cache()
except that
the cache may be reset by the user calling
CloseableFlowableWithReset.reset()
.T
- generic type of source observablesource
- the source observableduration
- duration till next resetunit
- units corresponding to the durationscheduler
- scheduler to use for scheduling reset.CloseableFlowableWithReset
that should be closed once
finished to prevent worker memory leak.public static <T> io.reactivex.Flowable<T> mergeInterleaved(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<? extends T>> publishers, int maxConcurrency, int batchSize, boolean delayErrors)
public static <T> io.reactivex.Flowable<T> mergeInterleaved(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<? extends T>> publishers, int maxConcurrency)
public static <T> Flowables.MergeInterleaveBuilder<T> mergeInterleaved(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<? extends T>> publishers)
Copyright © 2013–2020. All rights reserved.