View Javadoc
1   package com.github.davidmoten.rx;
2   
3   import java.util.concurrent.atomic.AtomicLong;
4   import java.util.concurrent.atomic.AtomicLongFieldUpdater;
5   
6   import org.slf4j.Logger;
7   import org.slf4j.LoggerFactory;
8   
9   import rx.Observable;
10  import rx.Observable.OnSubscribe;
11  import rx.Observable.Operator;
12  import rx.Observer;
13  import rx.Subscriber;
14  import rx.functions.Action1;
15  import rx.functions.Func1;
16  
17  /**
18   * Utility methods for RxJava.
19   */
20  public final class RxUtil {
21  
22      /**
23       * slf4j logger.
24       */
25      private static final Logger log = LoggerFactory.getLogger(RxUtil.class);
26  
27      private RxUtil() {
28          // prevent instantiation
29      }
30  
31      /**
32       * Returns the concatenation of two {@link Observable}s but the first
33       * sequence will be emitted in its entirety and ignored before o2 starts
34       * emitting.
35       * 
36       * @param <T>
37       *            the generic type of the second observable
38       * @param o1
39       *            the sequence to ignore
40       * @param o2
41       *            the sequence to emit after o1 ignored
42       * @return observable result of concatenating two observables, ignoring the
43       *         first
44       */
45      @SuppressWarnings("unchecked")
46      public static <T> Observable<T> concatButIgnoreFirstSequence(Observable<?> o1, Observable<T> o2) {
47          return Observable.concat((Observable<T>) o1.ignoreElements(), o2);
48      }
49  
50      /**
51       * Logs errors and onNext at info level using slf4j {@link Logger}.
52       * 
53       * @param <T>
54       *            the return generic type
55       * @return a logging {@link Observer}
56       */
57      public static <T> Observer<? super T> log() {
58          return new Observer<T>() {
59  
60              @Override
61              public void onCompleted() {
62                  // do nothing
63              }
64  
65              @Override
66              public void onError(Throwable e) {
67                  log.error(e.getMessage(), e);
68              }
69  
70              @Override
71              public void onNext(T t) {
72                  log.info(t + "");
73              }
74          };
75      }
76  
77      /**
78       * Converts a transformation of an Observable into another Observable into
79       * an {@link Operator} suitable for use with
80       * {@link Observable#lift(Operator)} for instance.
81       * 
82       * @param <R>
83       *            from generic type
84       * @param <T>
85       *            to generic type
86       * @param operation
87       * @return an operator form of the given function
88       */
89      public static <R, T> Operator<R, T> toOperator(Func1<Observable<T>, Observable<R>> operation) {
90          return Transformers.toOperator(operation);
91      }
92  
93      /**
94       * Returns an {@link Action1} that increments a counter when the call method
95       * is called.
96       * 
97       * @param <T>
98       *            generic type of item being counted
99       * @return {@link Action1} to count calls.
100      */
101     public static <T> CountingAction<T> counter() {
102         return new CountingAction<T>();
103     }
104 
105     public static class CountingAction<T> implements Action1<T> {
106         private final AtomicLong count = new AtomicLong(0);
107 
108         public Observable<Long> count() {
109             return Observable.create(new OnSubscribe<Long>() {
110 
111                 @Override
112                 public void call(Subscriber<? super Long> subscriber) {
113                     subscriber.onNext(count.get());
114                     subscriber.onCompleted();
115                 }
116             });
117         }
118 
119         @Override
120         public void call(T t) {
121             count.incrementAndGet();
122         }
123     }
124 
125     public static <T extends Number> Func1<T, Boolean> greaterThanZero() {
126         return new Func1<T, Boolean>() {
127 
128             @Override
129             public Boolean call(T t) {
130                 return t.doubleValue() > 0;
131             }
132         };
133     }
134 
135     /**
136      * Returns a {@link Func1} that returns an empty {@link Observable}.
137      * 
138      * @return
139      */
140     public static <T> Func1<T, Observable<Object>> toEmpty() {
141         return Functions.constant(Observable.<Object> empty());
142     }
143 
144     /**
145      * Returns an {@link Operator} that flattens a sequence of
146      * {@link Observable} into a flat sequence of the items from the
147      * Observables. This operator may interleave the items asynchronously. For
148      * synchronous behaviour use {@link RxUtil#concat()}.
149      * 
150      * @return
151      */
152     public static <T> Operator<T, Observable<T>> flatten() {
153         return toOperator(new Func1<Observable<Observable<T>>, Observable<T>>() {
154 
155             @Override
156             public Observable<T> call(Observable<Observable<T>> source) {
157                 return source.flatMap(Functions.<Observable<T>> identity());
158             }
159         });
160     }
161 
162     public static <T> Operator<T, Observable<T>> concat() {
163         return toOperator(new Func1<Observable<Observable<T>>, Observable<T>>() {
164 
165             @Override
166             public Observable<T> call(Observable<Observable<T>> source) {
167                 return Observable.concat(source);
168             }
169         });
170     }
171 
172     /**
173      * Adds {@code n} to {@code requested} field and returns the value prior to
174      * addition once the addition is successful (uses CAS semantics). If
175      * overflows then sets {@code requested} field to {@code Long.MAX_VALUE}.
176      * 
177      * @param requested
178      *            atomic field updater for a request count
179      * @param object
180      *            contains the field updated by the updater
181      * @param n
182      *            the number of requests to add to the requested count
183      * @return requested value just prior to successful addition
184      */
185     public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
186         // add n to field but check for overflow
187         while (true) {
188             long current = requested.get(object);
189             long next = current + n;
190             // check for overflow
191             if (next < 0) {
192                 next = Long.MAX_VALUE;
193             }
194             if (requested.compareAndSet(object, current, next)) {
195                 return current;
196             }
197         }
198     }
199 
200     /**
201      * Adds {@code n} to {@code requested} and returns the value prior to addition once the
202      * addition is successful (uses CAS semantics). If overflows then sets
203      * {@code requested} field to {@code Long.MAX_VALUE}.
204      * 
205      * @param requested
206      *            atomic field updater for a request count
207      * @param object
208      *            contains the field updated by the updater
209      * @param n
210      *            the number of requests to add to the requested count
211      * @return requested value just prior to successful addition
212      */
213     public static long getAndAddRequest(AtomicLong requested, long n) {
214         // add n to field but check for overflow
215         while (true) {
216             long current = requested.get();
217             long next = current + n;
218             // check for overflow
219             if (next < 0) {
220                 next = Long.MAX_VALUE;
221             }
222             if (requested.compareAndSet(current, next)) {
223                 return current;
224             }
225         }
226     }
227 }