View Javadoc
1   package com.github.davidmoten.rx;
2   
3   import java.util.concurrent.CountDownLatch;
4   import java.util.concurrent.TimeUnit;
5   import java.util.concurrent.TimeoutException;
6   
7   import rx.Scheduler;
8   import rx.Scheduler.Worker;
9   import rx.functions.Action0;
10  
11  public final class Schedulers {
12  
13      public static Scheduler computation(String id) {
14          return new SchedulerWithId(rx.schedulers.Schedulers.computation(), id);
15      }
16  
17      public static Scheduler computation() {
18          return withId(rx.schedulers.Schedulers.computation());
19      }
20  
21      private static Scheduler withId(Scheduler scheduler) {
22          return new SchedulerWithId(scheduler, describeCallSite());
23      }
24  
25      private static String describeCallSite() {
26          StackTraceElement[] elements = Thread.currentThread().getStackTrace();
27          StackTraceElement e = elements[3];
28          return e.getClassName() + ":" + e.getMethodName() + ":" + e.getLineNumber();
29      }
30  
31      private static void doIt() {
32          System.out.println(describeCallSite());
33      }
34  
35      public static void main(String[] args) {
36          doIt();
37      }
38  
39      public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads, long timeout,
40              TimeUnit unit) {
41          final CountDownLatch latch = new CountDownLatch(numThreads);
42          for (int i = 1; i <= numThreads; i++) {
43              final Worker worker = scheduler.createWorker();
44              worker.schedule(new Action0() {
45  
46                  @Override
47                  public void call() {
48                      worker.unsubscribe();
49                      latch.countDown();
50                  }
51              });
52          }
53          try {
54              boolean finished = latch.await(timeout, unit);
55              if (!finished) {
56                  throw new RuntimeException("timeout occured waiting for work to finish");
57              }
58          } catch (InterruptedException e) {
59              throw new RuntimeException(e);
60          }
61      }
62  
63      public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads) {
64          blockUntilWorkFinished(scheduler, numThreads, 1, TimeUnit.MINUTES);
65      }
66  }