1 package com.github.davidmoten.rx; 2 3 import java.util.concurrent.CountDownLatch; 4 import java.util.concurrent.TimeUnit; 5 import java.util.concurrent.TimeoutException; 6 7 import rx.Scheduler; 8 import rx.Scheduler.Worker; 9 import rx.functions.Action0; 10 11 public final class Schedulers { 12 13 public static Scheduler computation(String id) { 14 return new SchedulerWithId(rx.schedulers.Schedulers.computation(), id); 15 } 16 17 public static Scheduler computation() { 18 return withId(rx.schedulers.Schedulers.computation()); 19 } 20 21 private static Scheduler withId(Scheduler scheduler) { 22 return new SchedulerWithId(scheduler, describeCallSite()); 23 } 24 25 private static String describeCallSite() { 26 StackTraceElement[] elements = Thread.currentThread().getStackTrace(); 27 StackTraceElement e = elements[3]; 28 return e.getClassName() + ":" + e.getMethodName() + ":" + e.getLineNumber(); 29 } 30 31 private static void doIt() { 32 System.out.println(describeCallSite()); 33 } 34 35 public static void main(String[] args) { 36 doIt(); 37 } 38 39 public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads, long timeout, 40 TimeUnit unit) { 41 final CountDownLatch latch = new CountDownLatch(numThreads); 42 for (int i = 1; i <= numThreads; i++) { 43 final Worker worker = scheduler.createWorker(); 44 worker.schedule(new Action0() { 45 46 @Override 47 public void call() { 48 worker.unsubscribe(); 49 latch.countDown(); 50 } 51 }); 52 } 53 try { 54 boolean finished = latch.await(timeout, unit); 55 if (!finished) { 56 throw new RuntimeException("timeout occured waiting for work to finish"); 57 } 58 } catch (InterruptedException e) { 59 throw new RuntimeException(e); 60 } 61 } 62 63 public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads) { 64 blockUntilWorkFinished(scheduler, numThreads, 1, TimeUnit.MINUTES); 65 } 66 }