1 package com.github.davidmoten.rx;
2
3 import java.util.concurrent.atomic.AtomicLong;
4 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
5
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8
9 import rx.Observable;
10 import rx.Observable.OnSubscribe;
11 import rx.Observable.Operator;
12 import rx.Observer;
13 import rx.Subscriber;
14 import rx.functions.Action1;
15 import rx.functions.Func1;
16
17 /**
18 * Utility methods for RxJava.
19 */
20 public final class RxUtil {
21
22 /**
23 * slf4j logger.
24 */
25 private static final Logger log = LoggerFactory.getLogger(RxUtil.class);
26
27 private RxUtil() {
28 // prevent instantiation
29 }
30
31 /**
32 * Returns the concatenation of two {@link Observable}s but the first
33 * sequence will be emitted in its entirety and ignored before o2 starts
34 * emitting.
35 *
36 * @param <T>
37 * the generic type of the second observable
38 * @param o1
39 * the sequence to ignore
40 * @param o2
41 * the sequence to emit after o1 ignored
42 * @return observable result of concatenating two observables, ignoring the
43 * first
44 */
45 @SuppressWarnings("unchecked")
46 public static <T> Observable<T> concatButIgnoreFirstSequence(Observable<?> o1, Observable<T> o2) {
47 return Observable.concat((Observable<T>) o1.ignoreElements(), o2);
48 }
49
50 /**
51 * Logs errors and onNext at info level using slf4j {@link Logger}.
52 *
53 * @param <T>
54 * the return generic type
55 * @return a logging {@link Observer}
56 */
57 public static <T> Observer<? super T> log() {
58 return new Observer<T>() {
59
60 @Override
61 public void onCompleted() {
62 // do nothing
63 }
64
65 @Override
66 public void onError(Throwable e) {
67 log.error(e.getMessage(), e);
68 }
69
70 @Override
71 public void onNext(T t) {
72 log.info(t + "");
73 }
74 };
75 }
76
77 /**
78 * Converts a transformation of an Observable into another Observable into
79 * an {@link Operator} suitable for use with
80 * {@link Observable#lift(Operator)} for instance.
81 *
82 * @param <R>
83 * from generic type
84 * @param <T>
85 * to generic type
86 * @param operation
87 * @return an operator form of the given function
88 */
89 public static <R, T> Operator<R, T> toOperator(Func1<Observable<T>, Observable<R>> operation) {
90 return Transformers.toOperator(operation);
91 }
92
93 /**
94 * Returns an {@link Action1} that increments a counter when the call method
95 * is called.
96 *
97 * @param <T>
98 * generic type of item being counted
99 * @return {@link Action1} to count calls.
100 */
101 public static <T> CountingAction<T> counter() {
102 return new CountingAction<T>();
103 }
104
105 public static class CountingAction<T> implements Action1<T> {
106 private final AtomicLong count = new AtomicLong(0);
107
108 public Observable<Long> count() {
109 return Observable.create(new OnSubscribe<Long>() {
110
111 @Override
112 public void call(Subscriber<? super Long> subscriber) {
113 subscriber.onNext(count.get());
114 subscriber.onCompleted();
115 }
116 });
117 }
118
119 @Override
120 public void call(T t) {
121 count.incrementAndGet();
122 }
123 }
124
125 public static <T extends Number> Func1<T, Boolean> greaterThanZero() {
126 return new Func1<T, Boolean>() {
127
128 @Override
129 public Boolean call(T t) {
130 return t.doubleValue() > 0;
131 }
132 };
133 }
134
135 /**
136 * Returns a {@link Func1} that returns an empty {@link Observable}.
137 *
138 * @return
139 */
140 public static <T> Func1<T, Observable<Object>> toEmpty() {
141 return Functions.constant(Observable.<Object> empty());
142 }
143
144 /**
145 * Returns an {@link Operator} that flattens a sequence of
146 * {@link Observable} into a flat sequence of the items from the
147 * Observables. This operator may interleave the items asynchronously. For
148 * synchronous behaviour use {@link RxUtil#concat()}.
149 *
150 * @return
151 */
152 public static <T> Operator<T, Observable<T>> flatten() {
153 return toOperator(new Func1<Observable<Observable<T>>, Observable<T>>() {
154
155 @Override
156 public Observable<T> call(Observable<Observable<T>> source) {
157 return source.flatMap(Functions.<Observable<T>> identity());
158 }
159 });
160 }
161
162 public static <T> Operator<T, Observable<T>> concat() {
163 return toOperator(new Func1<Observable<Observable<T>>, Observable<T>>() {
164
165 @Override
166 public Observable<T> call(Observable<Observable<T>> source) {
167 return Observable.concat(source);
168 }
169 });
170 }
171
172 /**
173 * Adds {@code n} to {@code requested} field and returns the value prior to
174 * addition once the addition is successful (uses CAS semantics). If
175 * overflows then sets {@code requested} field to {@code Long.MAX_VALUE}.
176 *
177 * @param requested
178 * atomic field updater for a request count
179 * @param object
180 * contains the field updated by the updater
181 * @param n
182 * the number of requests to add to the requested count
183 * @return requested value just prior to successful addition
184 */
185 public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
186 // add n to field but check for overflow
187 while (true) {
188 long current = requested.get(object);
189 long next = current + n;
190 // check for overflow
191 if (next < 0) {
192 next = Long.MAX_VALUE;
193 }
194 if (requested.compareAndSet(object, current, next)) {
195 return current;
196 }
197 }
198 }
199
200 /**
201 * Adds {@code n} to {@code requested} and returns the value prior to addition once the
202 * addition is successful (uses CAS semantics). If overflows then sets
203 * {@code requested} field to {@code Long.MAX_VALUE}.
204 *
205 * @param requested
206 * atomic field updater for a request count
207 * @param object
208 * contains the field updated by the updater
209 * @param n
210 * the number of requests to add to the requested count
211 * @return requested value just prior to successful addition
212 */
213 public static long getAndAddRequest(AtomicLong requested, long n) {
214 // add n to field but check for overflow
215 while (true) {
216 long current = requested.get();
217 long next = current + n;
218 // check for overflow
219 if (next < 0) {
220 next = Long.MAX_VALUE;
221 }
222 if (requested.compareAndSet(current, next)) {
223 return current;
224 }
225 }
226 }
227 }