1 package com.github.davidmoten.rx;
2
3 import java.util.concurrent.CountDownLatch;
4 import java.util.concurrent.TimeUnit;
5 import java.util.concurrent.TimeoutException;
6
7 import rx.Scheduler;
8 import rx.Scheduler.Worker;
9 import rx.functions.Action0;
10
11 public final class Schedulers {
12
13 public static Scheduler computation(String id) {
14 return new SchedulerWithId(rx.schedulers.Schedulers.computation(), id);
15 }
16
17 public static Scheduler computation() {
18 return withId(rx.schedulers.Schedulers.computation());
19 }
20
21 private static Scheduler withId(Scheduler scheduler) {
22 return new SchedulerWithId(scheduler, describeCallSite());
23 }
24
25 private static String describeCallSite() {
26 StackTraceElement[] elements = Thread.currentThread().getStackTrace();
27 StackTraceElement e = elements[3];
28 return e.getClassName() + ":" + e.getMethodName() + ":" + e.getLineNumber();
29 }
30
31 private static void doIt() {
32 System.out.println(describeCallSite());
33 }
34
35 public static void main(String[] args) {
36 doIt();
37 }
38
39 public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads, long timeout,
40 TimeUnit unit) {
41 final CountDownLatch latch = new CountDownLatch(numThreads);
42 for (int i = 1; i <= numThreads; i++) {
43 final Worker worker = scheduler.createWorker();
44 worker.schedule(new Action0() {
45
46 @Override
47 public void call() {
48 worker.unsubscribe();
49 latch.countDown();
50 }
51 });
52 }
53 try {
54 boolean finished = latch.await(timeout, unit);
55 if (!finished) {
56 throw new RuntimeException("timeout occured waiting for work to finish");
57 }
58 } catch (InterruptedException e) {
59 throw new RuntimeException(e);
60 }
61 }
62
63 public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads) {
64 blockUntilWorkFinished(scheduler, numThreads, 1, TimeUnit.MINUTES);
65 }
66 }