View Javadoc
1   package com.github.davidmoten.rx2;
2   
3   import com.github.davidmoten.guavamini.Optional;
4   import com.github.davidmoten.rx2.flowable.CachedFlowable;
5   import com.github.davidmoten.rx2.observable.CachedObservable;
6   import com.github.davidmoten.rx2.observable.CloseableObservableWithReset;
7   import io.reactivex.Flowable;
8   import io.reactivex.Observable;
9   import io.reactivex.Scheduler;
10  import io.reactivex.disposables.Disposable;
11  import io.reactivex.functions.Consumer;
12  
13  import java.util.concurrent.TimeUnit;
14  import java.util.concurrent.atomic.AtomicReference;
15  
16  public final class Observables {
17  
18  
19      private Observables() {
20          // prevent instantiation
21      }
22  
23      /**
24       * Returns a cached {@link Flowable} like {@link Flowable#cache()}
25       * except that the cache can be reset by calling
26       * {@link CachedFlowable#reset()}.
27       *
28       * @param source
29       *            the observable to be cached.
30       * @param <T>
31       *            the generic type of the source
32       * @return a cached observable whose cache can be reset.
33       */
34      public static <T> CachedObservable<T> cache(Observable<T> source) {
35          return new CachedObservable<T>(source);
36      }
37  
38      /**
39       * Returns a cached {@link Observable} like {@link Observable#cache()}
40       * except that the cache can be reset by calling
41       * {@link CachedObservable#reset()} and the cache will be automatically
42       * reset an interval after first subscription (or first subscription after
43       * reset). The interval is defined by {@code duration} and {@code unit} .
44       *
45       * @param source
46       *            the source observable
47       * @param duration
48       *            duration till next reset
49       * @param unit
50       *            units corresponding to the duration
51       * @param worker
52       *            worker to use for scheduling reset. Don't forget to
53       *            unsubscribe the worker when no longer required.
54       * @param <T>
55       *            the generic type of the source
56       * @return cached observable that resets regularly on a time interval
57       */
58      public static <T> Observable<T> cache(final Observable<T> source, final long duration,
59                                          final TimeUnit unit, final Scheduler.Worker worker) {
60          final AtomicReference<CachedObservable<T>> cacheRef = new AtomicReference<CachedObservable<T>>();
61          CachedObservable<T> cache = new CachedObservable<T>(source);
62          cacheRef.set(cache);
63          return cache.doOnSubscribe(new Consumer<Disposable>() {
64              @Override
65              public void accept(Disposable d) {
66                  Runnable action = new Runnable() {
67                      @Override
68                      public void run() {
69                          cacheRef.get().reset();
70                      }
71                  };
72                  worker.schedule(action, duration, unit);
73              }
74          });
75      }
76  
77      /**
78       * Returns a cached {@link Observable} like {@link Observable#cache()}
79       * except that the cache may be reset by the user calling
80       * {@link CloseableObservableWithReset#reset}.
81       *
82       * @param source
83       *            the source observable
84       * @param duration
85       *            duration till next reset
86       * @param unit
87       *            units corresponding to the duration
88       * @param scheduler
89       *            scheduler to use for scheduling reset.
90       * @param <T>
91       *            generic type of source observable
92       * @return {@link CloseableObservableWithReset} that should be closed once
93       *         finished to prevent worker memory leak.
94       */
95      public static <T> CloseableObservableWithReset<T> cache(final Observable<T> source,
96                                                              final long duration, final TimeUnit unit, final Scheduler scheduler) {
97          final AtomicReference<CachedObservable<T>> cacheRef = new AtomicReference<CachedObservable<T>>();
98          final AtomicReference<Optional<Scheduler.Worker>> workerRef = new AtomicReference<Optional<Scheduler.Worker>>(
99                  Optional.<Scheduler.Worker> absent());
100         CachedObservable<T> cache = new CachedObservable<T>(source);
101         cacheRef.set(cache);
102         Runnable closeAction = new Runnable() {
103             @Override
104             public void run() {
105                 while (true) {
106                     Optional<Scheduler.Worker> w = workerRef.get();
107                     if (w == null) {
108                         // we are finished
109                         break;
110                     } else {
111                         if (workerRef.compareAndSet(w, null)) {
112                             if (w.isPresent()) {
113                                 w.get().dispose();
114                             }
115                             // we are finished
116                             workerRef.set(null);
117                             break;
118                         }
119                     }
120                     // if not finished then try again
121                 }
122             }
123         };
124         Runnable resetAction = new Runnable() {
125 
126             @Override
127             public void run() {
128                 startScheduledResetAgain(duration, unit, scheduler, cacheRef, workerRef);
129             }
130         };
131         return new CloseableObservableWithReset<T>(cache, closeAction, resetAction);
132     }
133 
134 
135     private static <T> void startScheduledResetAgain(final long duration, final TimeUnit unit,
136                                                      final Scheduler scheduler, final AtomicReference<CachedObservable<T>> cacheRef,
137                                                      final AtomicReference<Optional<Scheduler.Worker>> workerRef) {
138 
139         Runnable action = new Runnable() {
140             @Override
141             public void run() {
142                 cacheRef.get().reset();
143             }
144         };
145         // CAS loop to cancel the current worker and create a new one
146         while (true) {
147             Optional<Scheduler.Worker> wOld = workerRef.get();
148             if (wOld == null) {
149                 // we are finished
150                 return;
151             }
152             Optional<Scheduler.Worker> w = Optional.of(scheduler.createWorker());
153             if (workerRef.compareAndSet(wOld, w)) {
154                 if (wOld.isPresent())
155                     wOld.get().dispose();
156                 w.get().schedule(action, duration, unit);
157                 break;
158             }
159         }
160     }
161 }