View Javadoc
1   package com.github.davidmoten.rx;
2   
3   import java.util.ArrayList;
4   import java.util.Collection;
5   import java.util.Comparator;
6   import java.util.List;
7   import java.util.Queue;
8   import java.util.concurrent.TimeUnit;
9   import java.util.concurrent.atomic.AtomicReference;
10  
11  import com.github.davidmoten.rx.internal.operators.OnSubscribeFromQueue;
12  import com.github.davidmoten.rx.internal.operators.OnSubscribeRepeating;
13  import com.github.davidmoten.rx.internal.operators.OrderedMerge;
14  import com.github.davidmoten.rx.internal.operators.Permutations;
15  import com.github.davidmoten.rx.internal.operators.Permutations.Swap;
16  import com.github.davidmoten.rx.observables.CachedObservable;
17  import com.github.davidmoten.util.Optional;
18  
19  import rx.Observable;
20  import rx.Scheduler;
21  import rx.Scheduler.Worker;
22  import rx.functions.Action0;
23  import rx.functions.Func0;
24  import rx.functions.Func1;
25  import rx.functions.Func2;
26  
27  public final class Obs {
28  
29      /**
30       * Returns a cached {@link Observable} like {@link Observable#cache()}
31       * except that the cache can be reset by calling
32       * {@link CachedObservable#reset()}.
33       * 
34       * @param source
35       *            the observable to be cached.
36       * @param <T>
37       *            the generic type of the source
38       * @return a cached observable whose cache can be reset.
39       */
40      public static <T> CachedObservable<T> cache(Observable<T> source) {
41          return new CachedObservable<T>(source);
42      }
43  
44      /**
45       * Returns a cached {@link Observable} like {@link Observable#cache()}
46       * except that the cache can be reset by calling
47       * {@link CachedObservable#reset()} and the cache will be automatically
48       * reset an interval after first subscription (or first subscription after
49       * reset). The interval is defined by {@code duration} and {@code unit} .
50       * 
51       * @param source
52       *            the source observable
53       * @param duration
54       *            duration till next reset
55       * @param unit
56       *            units corresponding to the duration
57       * @param worker
58       *            worker to use for scheduling reset. Don't forget to
59       *            unsubscribe the worker when no longer required.
60       * @param <T>
61       *            the generic type of the source
62       * @return cached observable that resets regularly on a time interval
63       */
64      public static <T> Observable<T> cache(final Observable<T> source, final long duration,
65              final TimeUnit unit, final Worker worker) {
66          final AtomicReference<CachedObservable<T>> cacheRef = new AtomicReference<CachedObservable<T>>();
67          CachedObservable<T> cache = new CachedObservable<T>(source);
68          cacheRef.set(cache);
69          return cache.doOnSubscribe(new Action0() {
70              @Override
71              public void call() {
72                  Action0 action = new Action0() {
73                      @Override
74                      public void call() {
75                          cacheRef.get().reset();
76                      }
77                  };
78                  worker.schedule(action, duration, unit);
79              }
80          });
81      }
82  
83      /**
84       * Returns a cached {@link Observable} like {@link Observable#cache()}
85       * except that the cache may be reset by the user calling
86       * {@link CloseableObservableWithReset#reset}.
87       * 
88       * @param source
89       *            the source observable
90       * @param duration
91       *            duration till next reset
92       * @param unit
93       *            units corresponding to the duration
94       * @param scheduler
95       *            scheduler to use for scheduling reset.
96       * @param <T>
97       *            generic type of source observable
98       * @return {@link CloseableObservableWithReset} that should be closed once
99       *         finished to prevent worker memory leak.
100      */
101     public static <T> CloseableObservableWithReset<T> cache(final Observable<T> source,
102             final long duration, final TimeUnit unit, final Scheduler scheduler) {
103         final AtomicReference<CachedObservable<T>> cacheRef = new AtomicReference<CachedObservable<T>>();
104         final AtomicReference<Optional<Worker>> workerRef = new AtomicReference<Optional<Worker>>(
105                 Optional.<Worker> absent());
106         CachedObservable<T> cache = new CachedObservable<T>(source);
107         cacheRef.set(cache);
108         Action0 closeAction = new Action0() {
109             @Override
110             public void call() {
111                 while (true) {
112                     Optional<Worker> w = workerRef.get();
113                     if (w == null) {
114                         // we are finished
115                         break;
116                     } else {
117                         if (workerRef.compareAndSet(w, null)) {
118                             if (w.isPresent()) {
119                                 w.get().unsubscribe();
120                             }
121                             // we are finished
122                             workerRef.set(null);
123                             break;
124                         }
125                     }
126                     // if not finished then try again
127                 }
128             }
129         };
130         Action0 resetAction = new Action0() {
131 
132             @Override
133             public void call() {
134                 startScheduledResetAgain(duration, unit, scheduler, cacheRef, workerRef);
135             }
136         };
137         return new CloseableObservableWithReset<T>(cache, closeAction, resetAction);
138     }
139 
140     private static <T> void startScheduledResetAgain(final long duration, final TimeUnit unit,
141             final Scheduler scheduler, final AtomicReference<CachedObservable<T>> cacheRef,
142             final AtomicReference<Optional<Worker>> workerRef) {
143 
144         Action0 action = new Action0() {
145             @Override
146             public void call() {
147                 cacheRef.get().reset();
148             }
149         };
150         // CAS loop to cancel the current worker and create a new one
151         while (true) {
152             Optional<Worker> wOld = workerRef.get();
153             if (wOld == null) {
154                 // we are finished
155                 return;
156             }
157             Optional<Worker> w = Optional.of(scheduler.createWorker());
158             if (workerRef.compareAndSet(wOld, w)) {
159                 if (wOld.isPresent())
160                     wOld.get().unsubscribe();
161                 w.get().schedule(action, duration, unit);
162                 break;
163             }
164         }
165     }
166 
167     /**
168      * Returns an Observable that epeats emitting {@code t} without completing.
169      * Supports backpressure.
170      * 
171      * @param t value to repeat
172      * @param <T> type of t
173 
174      * @return an observable that repeats t forever (or until unsubscribed)
175      */
176     public static <T> Observable<T> repeating(final T t) {
177         return Observable.create(new OnSubscribeRepeating<T>(t));
178     }
179 
180     public static <T extends Comparable<? super T>> Observable<T> create(
181             Collection<Observable<T>> sources) {
182         return create(sources, false);
183     }
184 
185     public static <T> Observable<T> create(Collection<Observable<T>> sources,
186             Comparator<? super T> comparator) {
187         return create(sources, comparator, false);
188     }
189 
190     public static <T extends Comparable<? super T>> Observable<T> create(
191             Collection<Observable<T>> sources, boolean delayErrors) {
192         return OrderedMerge.create(sources, delayErrors);
193     }
194 
195     public static <T> Observable<T> create(Collection<Observable<T>> sources,
196             Comparator<? super T> comparator, boolean delayErrors) {
197         return OrderedMerge.create(sources, comparator, delayErrors);
198     }
199 
200     public static <T> Observable<T> fromQueue(Queue<T> queue) {
201         return Observable.create(new OnSubscribeFromQueue<T>(queue));
202     }
203 
204     public static <T> Observable<List<Integer>> permutations(int size) {
205         List<Integer> indexes = new ArrayList<Integer>(size);
206         for (int i = 0; i < size; i++) {
207             indexes.add(i);
208         }
209         return Observable.from(Permutations.iterable(indexes)).scan(indexes,
210                 new Func2<List<Integer>, Swap<Integer>, List<Integer>>() {
211 
212                     @Override
213                     public List<Integer> call(List<Integer> a, Swap<Integer> swap) {
214                         List<Integer> b = new ArrayList<Integer>(a);
215                         b.set(swap.left(), a.get(swap.right()));
216                         b.set(swap.right(), a.get(swap.left()));
217                         return b;
218                     }
219                 });
220     }
221 
222     public static <T> Observable<List<T>> permutations(final List<T> list) {
223         return permutations(list.size()).map(new Func1<List<Integer>, List<T>>() {
224 
225             @Override
226             public List<T> call(List<Integer> a) {
227                 List<T> b = new ArrayList<T>(a.size());
228                 for (int i = 0; i < a.size(); i++) {
229                     b.add(list.get(a.get(i)));
230                 }
231                 return b;
232             }
233         });
234     }
235     
236     
237     public static Observable<Long> intervalLong(final long duration, final TimeUnit unit, final Scheduler scheduler) {
238         return Observable.defer(new Func0<Observable<Long>>() {
239             final long[] count = new long[1];
240             @Override
241             public Observable<Long> call() {
242                 return Observable.interval(duration, unit, scheduler)
243                         .map(new Func1<Long, Long>() {
244 
245                             @Override
246                             public Long call(Long t) {
247                                 return count[0]++;
248                             }});
249             }});
250     }
251     
252     public static Observable<Long> intervalLong(long duration, TimeUnit unit) {
253         return intervalLong(duration, unit, Schedulers.computation());
254     }
255     
256 }