1 package com.github.davidmoten.rx2;
2
3 import java.util.concurrent.CountDownLatch;
4 import java.util.concurrent.TimeUnit;
5
6 import com.github.davidmoten.rx2.internal.SchedulerWithId;
7
8 import io.reactivex.Scheduler;
9 import io.reactivex.Scheduler.Worker;
10
11 public final class SchedulerHelper {
12
13 private SchedulerHelper() {
14
15 }
16
17 public static Scheduler withThreadIdFromCallSite(Scheduler scheduler) {
18 return new SchedulerWithId(scheduler, describeCallSite());
19 }
20
21 public static Scheduler withThreadId(Scheduler scheduler, String id) {
22 return new SchedulerWithId(scheduler, id);
23 }
24
25 private static String describeCallSite() {
26 StackTraceElement[] elements = Thread.currentThread().getStackTrace();
27 StackTraceElement e = elements[3];
28 return e.getClassName() + ":" + e.getMethodName() + ":" + e.getLineNumber();
29 }
30
31 public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads, long timeout, TimeUnit unit) {
32 final CountDownLatch latch = new CountDownLatch(numThreads);
33 for (int i = 1; i <= numThreads; i++) {
34 final Worker worker = scheduler.createWorker();
35 worker.schedule(new Runnable() {
36 @Override
37 public void run() {
38 worker.dispose();
39 latch.countDown();
40 }
41 });
42 }
43 try {
44 boolean finished = latch.await(timeout, unit);
45 if (!finished) {
46 throw new RuntimeException("timeout occured waiting for work to finish");
47 }
48 } catch (InterruptedException e) {
49 throw new RuntimeException(e);
50 }
51 }
52
53 public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads) {
54 blockUntilWorkFinished(scheduler, numThreads, 1, TimeUnit.MINUTES);
55 }
56 }