1 package com.github.davidmoten.rx;
2
3 import java.util.concurrent.TimeUnit;
4 import java.util.regex.Matcher;
5 import java.util.regex.Pattern;
6
7 import rx.Scheduler;
8 import rx.Subscription;
9 import rx.functions.Action0;
10
11 public class SchedulerWithId extends Scheduler {
12
13 private final Scheduler scheduler;
14 private final String id;
15 private final static Pattern pattern = Pattern.compile("\\bschedId=\\[[^\\]]+\\]+\\b");
16
17 public SchedulerWithId(Scheduler scheduler, String id) {
18 this.scheduler = scheduler;
19 this.id = "[" + id + "]";
20 }
21
22 @Override
23 public Worker createWorker() {
24
25 final Worker worker = scheduler.createWorker();
26 Worker w = new Worker() {
27
28 @Override
29 public void unsubscribe() {
30 worker.unsubscribe();
31 }
32
33 @Override
34 public boolean isUnsubscribed() {
35 return worker.isUnsubscribed();
36 }
37
38 @Override
39 public Subscription schedule(final Action0 action) {
40 Action0 a = new Action0() {
41 @Override
42 public void call() {
43 setThreadName();
44 action.call();
45 }
46 };
47 return worker.schedule(a);
48 }
49
50 @Override
51 public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
52 Action0 a = new Action0() {
53 @Override
54 public void call() {
55 setThreadName();
56 action.call();
57 }
58 };
59 return worker.schedule(a, delayTime, unit);
60 }
61
62 };
63 return w;
64
65 }
66
67 private void setThreadName() {
68 String name = Thread.currentThread().getName();
69 String newName = updateNameWithId(name, id);
70 Thread.currentThread().setName(newName);
71 }
72
73 private static String updateNameWithId(String name, String id) {
74 final String newName;
75 if (name == null) {
76 newName = id;
77 } else {
78 Matcher matcher = pattern.matcher(name);
79 if (matcher.find()) {
80 newName = name.replace(matcher.group(), "schedId=" + id);
81 } else {
82 newName = name + "|schedId=" + id;
83 }
84 }
85 return newName;
86 }
87
88 }