1 package com.github.davidmoten.rx;
2
3 import java.util.concurrent.atomic.AtomicLong;
4 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
5
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8
9 import rx.Observable;
10 import rx.Observable.OnSubscribe;
11 import rx.Observable.Operator;
12 import rx.Observer;
13 import rx.Subscriber;
14 import rx.functions.Action1;
15 import rx.functions.Func1;
16
17
18
19
20 public final class RxUtil {
21
22
23
24
25 private static final Logger log = LoggerFactory.getLogger(RxUtil.class);
26
27 private RxUtil() {
28
29 }
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 @SuppressWarnings("unchecked")
46 public static <T> Observable<T> concatButIgnoreFirstSequence(Observable<?> o1, Observable<T> o2) {
47 return Observable.concat((Observable<T>) o1.ignoreElements(), o2);
48 }
49
50
51
52
53
54
55
56
57 public static <T> Observer<? super T> log() {
58 return new Observer<T>() {
59
60 @Override
61 public void onCompleted() {
62
63 }
64
65 @Override
66 public void onError(Throwable e) {
67 log.error(e.getMessage(), e);
68 }
69
70 @Override
71 public void onNext(T t) {
72 log.info(t + "");
73 }
74 };
75 }
76
77
78
79
80
81
82
83
84
85
86
87
88
89 public static <R, T> Operator<R, T> toOperator(Func1<Observable<T>, Observable<R>> operation) {
90 return Transformers.toOperator(operation);
91 }
92
93
94
95
96
97
98
99
100
101 public static <T> CountingAction<T> counter() {
102 return new CountingAction<T>();
103 }
104
105 public static class CountingAction<T> implements Action1<T> {
106 private final AtomicLong count = new AtomicLong(0);
107
108 public Observable<Long> count() {
109 return Observable.create(new OnSubscribe<Long>() {
110
111 @Override
112 public void call(Subscriber<? super Long> subscriber) {
113 subscriber.onNext(count.get());
114 subscriber.onCompleted();
115 }
116 });
117 }
118
119 @Override
120 public void call(T t) {
121 count.incrementAndGet();
122 }
123 }
124
125 public static <T extends Number> Func1<T, Boolean> greaterThanZero() {
126 return new Func1<T, Boolean>() {
127
128 @Override
129 public Boolean call(T t) {
130 return t.doubleValue() > 0;
131 }
132 };
133 }
134
135
136
137
138
139
140 public static <T> Func1<T, Observable<Object>> toEmpty() {
141 return Functions.constant(Observable.<Object> empty());
142 }
143
144
145
146
147
148
149
150
151
152 public static <T> Operator<T, Observable<T>> flatten() {
153 return toOperator(new Func1<Observable<Observable<T>>, Observable<T>>() {
154
155 @Override
156 public Observable<T> call(Observable<Observable<T>> source) {
157 return source.flatMap(Functions.<Observable<T>> identity());
158 }
159 });
160 }
161
162 public static <T> Operator<T, Observable<T>> concat() {
163 return toOperator(new Func1<Observable<Observable<T>>, Observable<T>>() {
164
165 @Override
166 public Observable<T> call(Observable<Observable<T>> source) {
167 return Observable.concat(source);
168 }
169 });
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185 public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
186
187 while (true) {
188 long current = requested.get(object);
189 long next = current + n;
190
191 if (next < 0) {
192 next = Long.MAX_VALUE;
193 }
194 if (requested.compareAndSet(object, current, next)) {
195 return current;
196 }
197 }
198 }
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213 public static long getAndAddRequest(AtomicLong requested, long n) {
214
215 while (true) {
216 long current = requested.get();
217 long next = current + n;
218
219 if (next < 0) {
220 next = Long.MAX_VALUE;
221 }
222 if (requested.compareAndSet(current, next)) {
223 return current;
224 }
225 }
226 }
227 }