View Javadoc
1   package com.github.davidmoten.rx2;
2   
3   import java.util.ArrayList;
4   import java.util.Arrays;
5   import java.util.List;
6   import java.util.concurrent.TimeUnit;
7   
8   import com.github.davidmoten.guavamini.Optional;
9   import com.github.davidmoten.guavamini.Preconditions;
10  
11  import io.reactivex.Flowable;
12  import io.reactivex.Scheduler;
13  import io.reactivex.functions.BiFunction;
14  import io.reactivex.functions.Consumer;
15  import io.reactivex.functions.Function;
16  import io.reactivex.functions.Predicate;
17  import io.reactivex.internal.functions.Functions;
18  import io.reactivex.schedulers.Schedulers;
19  
20  /**
21   * Provides builder for the {@link Function} parameter of
22   * {@link Flowable#retryWhen(Function)}. For example:
23   * 
24   * <pre>
25   * o.retryWhen(RetryWhen.maxRetries(4).delay(10, TimeUnit.SECONDS).action(log).build());
26   * </pre>
27   * 
28   * <p>
29   * or
30   * </p>
31   * 
32   * <pre>
33   * o.retryWhen(RetryWhen.exponentialBackoff(100, TimeUnit.MILLISECONDS).maxRetries(10).build());
34   * </pre>
35   */
36  public final class RetryWhen {
37  
38      private RetryWhen() {
39          // prevent instantiation
40      }
41  
42      private static final long NO_MORE_DELAYS = -1;
43  
44      private static Function<Flowable<? extends Throwable>, Flowable<Object>> notificationHandler(
45              final Flowable<Long> delays, final Scheduler scheduler, final Consumer<? super ErrorAndDuration> action,
46              final List<Class<? extends Throwable>> retryExceptions,
47              final List<Class<? extends Throwable>> failExceptions,
48              final Predicate<? super Throwable> exceptionPredicate) {
49  
50          final Function<ErrorAndDuration, Flowable<ErrorAndDuration>> checkExceptions = createExceptionChecker(
51                  retryExceptions, failExceptions, exceptionPredicate);
52  
53          return createNotificationHandler(delays, scheduler, action, checkExceptions);
54      }
55  
56      private static Function<Flowable<? extends Throwable>, Flowable<Object>> createNotificationHandler(
57              final Flowable<Long> delays, final Scheduler scheduler, final Consumer<? super ErrorAndDuration> action,
58              final Function<ErrorAndDuration, Flowable<ErrorAndDuration>> checkExceptions) {
59          return new Function<Flowable<? extends Throwable>, Flowable<Object>>() {
60  
61              @SuppressWarnings("unchecked")
62              @Override
63              public Flowable<Object> apply(Flowable<? extends Throwable> errors) {
64                  // TODO remove this cast when rxjava 2.0.3 released because
65                  // signature of retryWhen
66                  // will be fixed
67                  return (Flowable<Object>) (Flowable<?>) errors
68                          // zip with delays, use -1 to signal completion
69                          .zipWith(delays.concatWith(Flowable.just(NO_MORE_DELAYS)), TO_ERROR_AND_DURATION)
70                          // check retry and non-retry exceptions
71                          .flatMap(checkExceptions)
72                          // perform user action (for example log that a
73                          // delay is happening)
74                          .doOnNext(callActionExceptForLast(action))
75                          // delay the time in ErrorAndDuration
76                          .flatMap(delay(scheduler));
77              }
78          };
79      }
80  
81      private static Consumer<ErrorAndDuration> callActionExceptForLast(final Consumer<? super ErrorAndDuration> action) {
82          return new Consumer<ErrorAndDuration>() {
83              @Override
84              public void accept(ErrorAndDuration e) throws Exception {
85                  if (e.durationMs() != NO_MORE_DELAYS) {
86                      action.accept(e);
87                  }
88              }
89          };
90      }
91  
92      // TODO unit test
93      private static Function<ErrorAndDuration, Flowable<ErrorAndDuration>> createExceptionChecker(
94              final List<Class<? extends Throwable>> retryExceptions,
95              final List<Class<? extends Throwable>> failExceptions,
96              final Predicate<? super Throwable> exceptionPredicate) {
97          return new Function<ErrorAndDuration, Flowable<ErrorAndDuration>>() {
98  
99              @Override
100             public Flowable<ErrorAndDuration> apply(ErrorAndDuration e) throws Exception {
101                 if (!exceptionPredicate.test(e.throwable()))
102                     return Flowable.error(e.throwable());
103                 for (Class<? extends Throwable> cls : failExceptions) {
104                     if (cls.isAssignableFrom(e.throwable().getClass()))
105                         return Flowable.error(e.throwable());
106                 }
107                 if (retryExceptions.size() > 0) {
108                     for (Class<? extends Throwable> cls : retryExceptions) {
109                         if (cls.isAssignableFrom(e.throwable().getClass()))
110                             return Flowable.just(e);
111                     }
112                     return Flowable.error(e.throwable());
113                 } else {
114                     return Flowable.just(e);
115                 }
116             }
117         };
118     }
119 
120     private static BiFunction<Throwable, Long, ErrorAndDuration> TO_ERROR_AND_DURATION = new BiFunction<Throwable, Long, ErrorAndDuration>() {
121         @Override
122         public ErrorAndDuration apply(Throwable throwable, Long durationMs) {
123             return new ErrorAndDuration(throwable, durationMs);
124         }
125     };
126 
127     private static Function<ErrorAndDuration, Flowable<ErrorAndDuration>> delay(final Scheduler scheduler) {
128         return new Function<ErrorAndDuration, Flowable<ErrorAndDuration>>() {
129             @Override
130             public Flowable<ErrorAndDuration> apply(ErrorAndDuration e) {
131                 if (e.durationMs() == NO_MORE_DELAYS)
132                     return Flowable.error(e.throwable());
133                 else
134                     return Flowable.timer(e.durationMs(), TimeUnit.MILLISECONDS, scheduler)
135                             .map(com.github.davidmoten.rx2.Functions.constant(e));
136             }
137         };
138     }
139 
140     // Builder factory methods
141 
142     public static Builder retryWhenInstanceOf(Class<? extends Throwable>... classes) {
143         return new Builder().retryWhenInstanceOf(classes);
144     }
145 
146     public static Builder failWhenInstanceOf(Class<? extends Throwable>... classes) {
147         return new Builder().failWhenInstanceOf(classes);
148     }
149 
150     public static Builder retryIf(Predicate<Throwable> predicate) {
151         return new Builder().retryIf(predicate);
152     }
153 
154     public static Builder delays(Flowable<Long> delays, TimeUnit unit) {
155         return new Builder().delays(delays, unit);
156     }
157 
158     public static Builder delaysInt(Flowable<Integer> delays, TimeUnit unit) {
159         return new Builder().delaysInt(delays, unit);
160     }
161 
162     public static Builder delay(long delay, final TimeUnit unit) {
163         return new Builder().delay(delay, unit);
164     }
165 
166     public static Builder maxRetries(int maxRetries) {
167         return new Builder().maxRetries(maxRetries);
168     }
169 
170     public static Builder scheduler(Scheduler scheduler) {
171         return new Builder().scheduler(scheduler);
172     }
173 
174     public static Builder action(Consumer<? super ErrorAndDuration> action) {
175         return new Builder().action(action);
176     }
177 
178     public static Builder exponentialBackoff(final long firstDelay, final TimeUnit unit, final double factor) {
179         return new Builder().exponentialBackoff(firstDelay, unit, factor);
180     }
181 
182     public static Builder exponentialBackoff(long firstDelay, TimeUnit unit) {
183         return new Builder().exponentialBackoff(firstDelay, unit);
184     }
185 
186     public static final class Builder {
187 
188         private final List<Class<? extends Throwable>> retryExceptions = new ArrayList<Class<? extends Throwable>>();
189         private final List<Class<? extends Throwable>> failExceptions = new ArrayList<Class<? extends Throwable>>();
190         private Predicate<? super Throwable> exceptionPredicate = Functions.alwaysTrue();
191 
192         private Flowable<Long> delays = Flowable.just(0L).repeat();
193         private Optional<Integer> maxRetries = Optional.absent();
194         private Optional<Scheduler> scheduler = Optional.of(Schedulers.computation());
195         private Consumer<? super ErrorAndDuration> action = Consumers.doNothing();
196 
197         private Builder() {
198             // must use static factory method to instantiate
199         }
200 
201         public Builder retryWhenInstanceOf(Class<? extends Throwable>... classes) {
202             retryExceptions.addAll(Arrays.asList(classes));
203             return this;
204         }
205 
206         public Builder failWhenInstanceOf(Class<? extends Throwable>... classes) {
207             failExceptions.addAll(Arrays.asList(classes));
208             return this;
209         }
210 
211         public Builder retryIf(Predicate<Throwable> predicate) {
212             this.exceptionPredicate = predicate;
213             return this;
214         }
215 
216         public Builder delays(Flowable<Long> delays, TimeUnit unit) {
217             this.delays = delays.map(toMillis(unit));
218             return this;
219         }
220 
221         private static class ToLongHolder {
222             static final Function<Integer, Long> INSTANCE = new Function<Integer, Long>() {
223                 @Override
224                 public Long apply(Integer n) {
225                     if (n == null) {
226                         return null;
227                     } else {
228                         return n.longValue();
229                     }
230                 }
231             };
232         }
233 
234         public Builder delaysInt(Flowable<Integer> delays, TimeUnit unit) {
235             return delays(delays.map(ToLongHolder.INSTANCE), unit);
236         }
237 
238         public Builder delay(Long delay, final TimeUnit unit) {
239             this.delays = Flowable.just(delay).map(toMillis(unit)).repeat();
240             return this;
241         }
242 
243         private static Function<Long, Long> toMillis(final TimeUnit unit) {
244             return new Function<Long, Long>() {
245 
246                 @Override
247                 public Long apply(Long t) {
248                     return unit.toMillis(t);
249                 }
250             };
251         }
252 
253         public Builder maxRetries(int maxRetries) {
254             this.maxRetries = Optional.of(maxRetries);
255             return this;
256         }
257 
258         public Builder scheduler(Scheduler scheduler) {
259             this.scheduler = Optional.of(scheduler);
260             return this;
261         }
262 
263         public Builder action(Consumer<? super ErrorAndDuration> action) {
264             this.action = action;
265             return this;
266         }
267 
268         public Builder exponentialBackoff(final long firstDelay, final long maxDelay, final TimeUnit unit,
269                 final double factor) {
270 
271             delays = Flowable.range(1, Integer.MAX_VALUE)
272                     // make exponential
273                     .map(new Function<Integer, Long>() {
274                         @Override
275                         public Long apply(Integer n) {
276                             long delayMs = Math.round(Math.pow(factor, n - 1) * unit.toMillis(firstDelay));
277                             if (maxDelay == -1) {
278                                 return delayMs;
279                             } else {
280                                 long maxDelayMs = unit.toMillis(maxDelay);
281                                 return Math.min(maxDelayMs, delayMs);
282                             }
283                         }
284                     });
285             return this;
286         }
287 
288         public Builder exponentialBackoff(final long firstDelay, final TimeUnit unit, final double factor) {
289             return exponentialBackoff(firstDelay, -1, unit, factor);
290         }
291 
292         public Builder exponentialBackoff(long firstDelay, TimeUnit unit) {
293             return exponentialBackoff(firstDelay, unit, 2);
294         }
295 
296         public Function<Flowable<? extends Throwable>, Flowable<Object>> build() {
297             Preconditions.checkNotNull(delays);
298             if (maxRetries.isPresent()) {
299                 delays = delays.take(maxRetries.get());
300             }
301             return notificationHandler(delays, scheduler.get(), action, retryExceptions, failExceptions,
302                     exceptionPredicate);
303         }
304 
305     }
306 
307     public static final class ErrorAndDuration {
308 
309         private final Throwable throwable;
310         private final long durationMs;
311 
312         public ErrorAndDuration(Throwable throwable, long durationMs) {
313             this.throwable = throwable;
314             this.durationMs = durationMs;
315         }
316 
317         public Throwable throwable() {
318             return throwable;
319         }
320 
321         public long durationMs() {
322             return durationMs;
323         }
324 
325     }
326 }