View Javadoc
1   package com.github.davidmoten.rx2;
2   
3   import java.util.concurrent.CountDownLatch;
4   import java.util.concurrent.TimeUnit;
5   
6   import com.github.davidmoten.rx2.internal.SchedulerWithId;
7   
8   import io.reactivex.Scheduler;
9   import io.reactivex.Scheduler.Worker;
10  
11  public final class SchedulerHelper {
12  
13      private SchedulerHelper() {
14          // prevent instantiation
15      }
16  
17      public static Scheduler withThreadIdFromCallSite(Scheduler scheduler) {
18          return new SchedulerWithId(scheduler, describeCallSite());
19      }
20  
21      public static Scheduler withThreadId(Scheduler scheduler, String id) {
22          return new SchedulerWithId(scheduler, id);
23      }
24  
25      private static String describeCallSite() {
26          StackTraceElement[] elements = Thread.currentThread().getStackTrace();
27          StackTraceElement e = elements[3];
28          return e.getClassName() + ":" + e.getMethodName() + ":" + e.getLineNumber();
29      }
30  
31      public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads, long timeout, TimeUnit unit) {
32          final CountDownLatch latch = new CountDownLatch(numThreads);
33          for (int i = 1; i <= numThreads; i++) {
34              final Worker worker = scheduler.createWorker();
35              worker.schedule(new Runnable() {
36                  @Override
37                  public void run() {
38                      worker.dispose();
39                      latch.countDown();
40                  }
41              });
42          }
43          try {
44              boolean finished = latch.await(timeout, unit);
45              if (!finished) {
46                  throw new RuntimeException("timeout occured waiting for work to finish");
47              }
48          } catch (InterruptedException e) {
49              throw new RuntimeException(e);
50          }
51      }
52  
53      public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads) {
54          blockUntilWorkFinished(scheduler, numThreads, 1, TimeUnit.MINUTES);
55      }
56  }