1 package com.github.davidmoten.rx2;
2
3 import java.util.ArrayList;
4 import java.util.Arrays;
5 import java.util.List;
6 import java.util.concurrent.TimeUnit;
7
8 import com.github.davidmoten.guavamini.Optional;
9 import com.github.davidmoten.guavamini.Preconditions;
10
11 import io.reactivex.Flowable;
12 import io.reactivex.Scheduler;
13 import io.reactivex.functions.BiFunction;
14 import io.reactivex.functions.Consumer;
15 import io.reactivex.functions.Function;
16 import io.reactivex.functions.Predicate;
17 import io.reactivex.internal.functions.Functions;
18 import io.reactivex.schedulers.Schedulers;
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 public final class RetryWhen {
37
38 private RetryWhen() {
39
40 }
41
42 private static final long NO_MORE_DELAYS = -1;
43
44 private static Function<Flowable<? extends Throwable>, Flowable<Object>> notificationHandler(
45 final Flowable<Long> delays, final Scheduler scheduler, final Consumer<? super ErrorAndDuration> action,
46 final List<Class<? extends Throwable>> retryExceptions,
47 final List<Class<? extends Throwable>> failExceptions,
48 final Predicate<? super Throwable> exceptionPredicate) {
49
50 final Function<ErrorAndDuration, Flowable<ErrorAndDuration>> checkExceptions = createExceptionChecker(
51 retryExceptions, failExceptions, exceptionPredicate);
52
53 return createNotificationHandler(delays, scheduler, action, checkExceptions);
54 }
55
56 private static Function<Flowable<? extends Throwable>, Flowable<Object>> createNotificationHandler(
57 final Flowable<Long> delays, final Scheduler scheduler, final Consumer<? super ErrorAndDuration> action,
58 final Function<ErrorAndDuration, Flowable<ErrorAndDuration>> checkExceptions) {
59 return new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
60
61 @SuppressWarnings("unchecked")
62 @Override
63 public Flowable<Object> apply(Flowable<? extends Throwable> errors) {
64
65
66
67 return (Flowable<Object>) (Flowable<?>) errors
68
69 .zipWith(delays.concatWith(Flowable.just(NO_MORE_DELAYS)), TO_ERROR_AND_DURATION)
70
71 .flatMap(checkExceptions)
72
73
74 .doOnNext(callActionExceptForLast(action))
75
76 .flatMap(delay(scheduler));
77 }
78 };
79 }
80
81 private static Consumer<ErrorAndDuration> callActionExceptForLast(final Consumer<? super ErrorAndDuration> action) {
82 return new Consumer<ErrorAndDuration>() {
83 @Override
84 public void accept(ErrorAndDuration e) throws Exception {
85 if (e.durationMs() != NO_MORE_DELAYS) {
86 action.accept(e);
87 }
88 }
89 };
90 }
91
92
93 private static Function<ErrorAndDuration, Flowable<ErrorAndDuration>> createExceptionChecker(
94 final List<Class<? extends Throwable>> retryExceptions,
95 final List<Class<? extends Throwable>> failExceptions,
96 final Predicate<? super Throwable> exceptionPredicate) {
97 return new Function<ErrorAndDuration, Flowable<ErrorAndDuration>>() {
98
99 @Override
100 public Flowable<ErrorAndDuration> apply(ErrorAndDuration e) throws Exception {
101 if (!exceptionPredicate.test(e.throwable()))
102 return Flowable.error(e.throwable());
103 for (Class<? extends Throwable> cls : failExceptions) {
104 if (cls.isAssignableFrom(e.throwable().getClass()))
105 return Flowable.error(e.throwable());
106 }
107 if (retryExceptions.size() > 0) {
108 for (Class<? extends Throwable> cls : retryExceptions) {
109 if (cls.isAssignableFrom(e.throwable().getClass()))
110 return Flowable.just(e);
111 }
112 return Flowable.error(e.throwable());
113 } else {
114 return Flowable.just(e);
115 }
116 }
117 };
118 }
119
120 private static BiFunction<Throwable, Long, ErrorAndDuration> TO_ERROR_AND_DURATION = new BiFunction<Throwable, Long, ErrorAndDuration>() {
121 @Override
122 public ErrorAndDuration apply(Throwable throwable, Long durationMs) {
123 return new ErrorAndDuration(throwable, durationMs);
124 }
125 };
126
127 private static Function<ErrorAndDuration, Flowable<ErrorAndDuration>> delay(final Scheduler scheduler) {
128 return new Function<ErrorAndDuration, Flowable<ErrorAndDuration>>() {
129 @Override
130 public Flowable<ErrorAndDuration> apply(ErrorAndDuration e) {
131 if (e.durationMs() == NO_MORE_DELAYS)
132 return Flowable.error(e.throwable());
133 else
134 return Flowable.timer(e.durationMs(), TimeUnit.MILLISECONDS, scheduler)
135 .map(com.github.davidmoten.rx2.Functions.constant(e));
136 }
137 };
138 }
139
140
141
142 public static Builder retryWhenInstanceOf(Class<? extends Throwable>... classes) {
143 return new Builder().retryWhenInstanceOf(classes);
144 }
145
146 public static Builder failWhenInstanceOf(Class<? extends Throwable>... classes) {
147 return new Builder().failWhenInstanceOf(classes);
148 }
149
150 public static Builder retryIf(Predicate<Throwable> predicate) {
151 return new Builder().retryIf(predicate);
152 }
153
154 public static Builder delays(Flowable<Long> delays, TimeUnit unit) {
155 return new Builder().delays(delays, unit);
156 }
157
158 public static Builder delaysInt(Flowable<Integer> delays, TimeUnit unit) {
159 return new Builder().delaysInt(delays, unit);
160 }
161
162 public static Builder delay(long delay, final TimeUnit unit) {
163 return new Builder().delay(delay, unit);
164 }
165
166 public static Builder maxRetries(int maxRetries) {
167 return new Builder().maxRetries(maxRetries);
168 }
169
170 public static Builder scheduler(Scheduler scheduler) {
171 return new Builder().scheduler(scheduler);
172 }
173
174 public static Builder action(Consumer<? super ErrorAndDuration> action) {
175 return new Builder().action(action);
176 }
177
178 public static Builder exponentialBackoff(final long firstDelay, final TimeUnit unit, final double factor) {
179 return new Builder().exponentialBackoff(firstDelay, unit, factor);
180 }
181
182 public static Builder exponentialBackoff(long firstDelay, TimeUnit unit) {
183 return new Builder().exponentialBackoff(firstDelay, unit);
184 }
185
186 public static final class Builder {
187
188 private final List<Class<? extends Throwable>> retryExceptions = new ArrayList<Class<? extends Throwable>>();
189 private final List<Class<? extends Throwable>> failExceptions = new ArrayList<Class<? extends Throwable>>();
190 private Predicate<? super Throwable> exceptionPredicate = Functions.alwaysTrue();
191
192 private Flowable<Long> delays = Flowable.just(0L).repeat();
193 private Optional<Integer> maxRetries = Optional.absent();
194 private Optional<Scheduler> scheduler = Optional.of(Schedulers.computation());
195 private Consumer<? super ErrorAndDuration> action = Consumers.doNothing();
196
197 private Builder() {
198
199 }
200
201 public Builder retryWhenInstanceOf(Class<? extends Throwable>... classes) {
202 retryExceptions.addAll(Arrays.asList(classes));
203 return this;
204 }
205
206 public Builder failWhenInstanceOf(Class<? extends Throwable>... classes) {
207 failExceptions.addAll(Arrays.asList(classes));
208 return this;
209 }
210
211 public Builder retryIf(Predicate<Throwable> predicate) {
212 this.exceptionPredicate = predicate;
213 return this;
214 }
215
216 public Builder delays(Flowable<Long> delays, TimeUnit unit) {
217 this.delays = delays.map(toMillis(unit));
218 return this;
219 }
220
221 private static class ToLongHolder {
222 static final Function<Integer, Long> INSTANCE = new Function<Integer, Long>() {
223 @Override
224 public Long apply(Integer n) {
225 if (n == null) {
226 return null;
227 } else {
228 return n.longValue();
229 }
230 }
231 };
232 }
233
234 public Builder delaysInt(Flowable<Integer> delays, TimeUnit unit) {
235 return delays(delays.map(ToLongHolder.INSTANCE), unit);
236 }
237
238 public Builder delay(Long delay, final TimeUnit unit) {
239 this.delays = Flowable.just(delay).map(toMillis(unit)).repeat();
240 return this;
241 }
242
243 private static Function<Long, Long> toMillis(final TimeUnit unit) {
244 return new Function<Long, Long>() {
245
246 @Override
247 public Long apply(Long t) {
248 return unit.toMillis(t);
249 }
250 };
251 }
252
253 public Builder maxRetries(int maxRetries) {
254 this.maxRetries = Optional.of(maxRetries);
255 return this;
256 }
257
258 public Builder scheduler(Scheduler scheduler) {
259 this.scheduler = Optional.of(scheduler);
260 return this;
261 }
262
263 public Builder action(Consumer<? super ErrorAndDuration> action) {
264 this.action = action;
265 return this;
266 }
267
268 public Builder exponentialBackoff(final long firstDelay, final long maxDelay, final TimeUnit unit,
269 final double factor) {
270
271 delays = Flowable.range(1, Integer.MAX_VALUE)
272
273 .map(new Function<Integer, Long>() {
274 @Override
275 public Long apply(Integer n) {
276 long delayMs = Math.round(Math.pow(factor, n - 1) * unit.toMillis(firstDelay));
277 if (maxDelay == -1) {
278 return delayMs;
279 } else {
280 long maxDelayMs = unit.toMillis(maxDelay);
281 return Math.min(maxDelayMs, delayMs);
282 }
283 }
284 });
285 return this;
286 }
287
288 public Builder exponentialBackoff(final long firstDelay, final TimeUnit unit, final double factor) {
289 return exponentialBackoff(firstDelay, -1, unit, factor);
290 }
291
292 public Builder exponentialBackoff(long firstDelay, TimeUnit unit) {
293 return exponentialBackoff(firstDelay, unit, 2);
294 }
295
296 public Function<Flowable<? extends Throwable>, Flowable<Object>> build() {
297 Preconditions.checkNotNull(delays);
298 if (maxRetries.isPresent()) {
299 delays = delays.take(maxRetries.get());
300 }
301 return notificationHandler(delays, scheduler.get(), action, retryExceptions, failExceptions,
302 exceptionPredicate);
303 }
304
305 }
306
307 public static final class ErrorAndDuration {
308
309 private final Throwable throwable;
310 private final long durationMs;
311
312 public ErrorAndDuration(Throwable throwable, long durationMs) {
313 this.throwable = throwable;
314 this.durationMs = durationMs;
315 }
316
317 public Throwable throwable() {
318 return throwable;
319 }
320
321 public long durationMs() {
322 return durationMs;
323 }
324
325 }
326 }