1 package com.github.davidmoten.rx;
2
3 import java.util.ArrayList;
4 import java.util.Collection;
5 import java.util.Comparator;
6 import java.util.List;
7 import java.util.Queue;
8 import java.util.concurrent.TimeUnit;
9 import java.util.concurrent.atomic.AtomicReference;
10
11 import com.github.davidmoten.rx.internal.operators.OnSubscribeFromQueue;
12 import com.github.davidmoten.rx.internal.operators.OnSubscribeRepeating;
13 import com.github.davidmoten.rx.internal.operators.OrderedMerge;
14 import com.github.davidmoten.rx.internal.operators.Permutations;
15 import com.github.davidmoten.rx.internal.operators.Permutations.Swap;
16 import com.github.davidmoten.rx.observables.CachedObservable;
17 import com.github.davidmoten.util.Optional;
18
19 import rx.Observable;
20 import rx.Scheduler;
21 import rx.Scheduler.Worker;
22 import rx.functions.Action0;
23 import rx.functions.Func0;
24 import rx.functions.Func1;
25 import rx.functions.Func2;
26
27 public final class Obs {
28
29
30
31
32
33
34
35
36
37
38
39
40 public static <T> CachedObservable<T> cache(Observable<T> source) {
41 return new CachedObservable<T>(source);
42 }
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 public static <T> Observable<T> cache(final Observable<T> source, final long duration,
65 final TimeUnit unit, final Worker worker) {
66 final AtomicReference<CachedObservable<T>> cacheRef = new AtomicReference<CachedObservable<T>>();
67 CachedObservable<T> cache = new CachedObservable<T>(source);
68 cacheRef.set(cache);
69 return cache.doOnSubscribe(new Action0() {
70 @Override
71 public void call() {
72 Action0 action = new Action0() {
73 @Override
74 public void call() {
75 cacheRef.get().reset();
76 }
77 };
78 worker.schedule(action, duration, unit);
79 }
80 });
81 }
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 public static <T> CloseableObservableWithReset<T> cache(final Observable<T> source,
102 final long duration, final TimeUnit unit, final Scheduler scheduler) {
103 final AtomicReference<CachedObservable<T>> cacheRef = new AtomicReference<CachedObservable<T>>();
104 final AtomicReference<Optional<Worker>> workerRef = new AtomicReference<Optional<Worker>>(
105 Optional.<Worker> absent());
106 CachedObservable<T> cache = new CachedObservable<T>(source);
107 cacheRef.set(cache);
108 Action0 closeAction = new Action0() {
109 @Override
110 public void call() {
111 while (true) {
112 Optional<Worker> w = workerRef.get();
113 if (w == null) {
114
115 break;
116 } else {
117 if (workerRef.compareAndSet(w, null)) {
118 if (w.isPresent()) {
119 w.get().unsubscribe();
120 }
121
122 workerRef.set(null);
123 break;
124 }
125 }
126
127 }
128 }
129 };
130 Action0 resetAction = new Action0() {
131
132 @Override
133 public void call() {
134 startScheduledResetAgain(duration, unit, scheduler, cacheRef, workerRef);
135 }
136 };
137 return new CloseableObservableWithReset<T>(cache, closeAction, resetAction);
138 }
139
140 private static <T> void startScheduledResetAgain(final long duration, final TimeUnit unit,
141 final Scheduler scheduler, final AtomicReference<CachedObservable<T>> cacheRef,
142 final AtomicReference<Optional<Worker>> workerRef) {
143
144 Action0 action = new Action0() {
145 @Override
146 public void call() {
147 cacheRef.get().reset();
148 }
149 };
150
151 while (true) {
152 Optional<Worker> wOld = workerRef.get();
153 if (wOld == null) {
154
155 return;
156 }
157 Optional<Worker> w = Optional.of(scheduler.createWorker());
158 if (workerRef.compareAndSet(wOld, w)) {
159 if (wOld.isPresent())
160 wOld.get().unsubscribe();
161 w.get().schedule(action, duration, unit);
162 break;
163 }
164 }
165 }
166
167
168
169
170
171
172
173
174
175
176 public static <T> Observable<T> repeating(final T t) {
177 return Observable.create(new OnSubscribeRepeating<T>(t));
178 }
179
180 public static <T extends Comparable<? super T>> Observable<T> create(
181 Collection<Observable<T>> sources) {
182 return create(sources, false);
183 }
184
185 public static <T> Observable<T> create(Collection<Observable<T>> sources,
186 Comparator<? super T> comparator) {
187 return create(sources, comparator, false);
188 }
189
190 public static <T extends Comparable<? super T>> Observable<T> create(
191 Collection<Observable<T>> sources, boolean delayErrors) {
192 return OrderedMerge.create(sources, delayErrors);
193 }
194
195 public static <T> Observable<T> create(Collection<Observable<T>> sources,
196 Comparator<? super T> comparator, boolean delayErrors) {
197 return OrderedMerge.create(sources, comparator, delayErrors);
198 }
199
200 public static <T> Observable<T> fromQueue(Queue<T> queue) {
201 return Observable.create(new OnSubscribeFromQueue<T>(queue));
202 }
203
204 public static <T> Observable<List<Integer>> permutations(int size) {
205 List<Integer> indexes = new ArrayList<Integer>(size);
206 for (int i = 0; i < size; i++) {
207 indexes.add(i);
208 }
209 return Observable.from(Permutations.iterable(indexes)).scan(indexes,
210 new Func2<List<Integer>, Swap<Integer>, List<Integer>>() {
211
212 @Override
213 public List<Integer> call(List<Integer> a, Swap<Integer> swap) {
214 List<Integer> b = new ArrayList<Integer>(a);
215 b.set(swap.left(), a.get(swap.right()));
216 b.set(swap.right(), a.get(swap.left()));
217 return b;
218 }
219 });
220 }
221
222 public static <T> Observable<List<T>> permutations(final List<T> list) {
223 return permutations(list.size()).map(new Func1<List<Integer>, List<T>>() {
224
225 @Override
226 public List<T> call(List<Integer> a) {
227 List<T> b = new ArrayList<T>(a.size());
228 for (int i = 0; i < a.size(); i++) {
229 b.add(list.get(a.get(i)));
230 }
231 return b;
232 }
233 });
234 }
235
236
237 public static Observable<Long> intervalLong(final long duration, final TimeUnit unit, final Scheduler scheduler) {
238 return Observable.defer(new Func0<Observable<Long>>() {
239 final long[] count = new long[1];
240 @Override
241 public Observable<Long> call() {
242 return Observable.interval(duration, unit, scheduler)
243 .map(new Func1<Long, Long>() {
244
245 @Override
246 public Long call(Long t) {
247 return count[0]++;
248 }});
249 }});
250 }
251
252 public static Observable<Long> intervalLong(long duration, TimeUnit unit) {
253 return intervalLong(duration, unit, Schedulers.computation());
254 }
255
256 }