View Javadoc
1   package com.github.davidmoten.rx;
2   
3   import java.util.concurrent.TimeUnit;
4   import java.util.regex.Matcher;
5   import java.util.regex.Pattern;
6   
7   import rx.Scheduler;
8   import rx.Subscription;
9   import rx.functions.Action0;
10  
11  public class SchedulerWithId extends Scheduler {
12  
13      private final Scheduler scheduler;
14      private final String id;
15      private final static Pattern pattern = Pattern.compile("\\bschedId=\\[[^\\]]+\\]+\\b");
16  
17      public SchedulerWithId(Scheduler scheduler, String id) {
18          this.scheduler = scheduler;
19          this.id = "[" + id + "]";
20      }
21  
22      @Override
23      public Worker createWorker() {
24  
25          final Worker worker = scheduler.createWorker();
26          Worker w = new Worker() {
27  
28              @Override
29              public void unsubscribe() {
30                  worker.unsubscribe();
31              }
32  
33              @Override
34              public boolean isUnsubscribed() {
35                  return worker.isUnsubscribed();
36              }
37  
38              @Override
39              public Subscription schedule(final Action0 action) {
40                  Action0 a = new Action0() {
41                      @Override
42                      public void call() {
43                          setThreadName();
44                          action.call();
45                      }
46                  };
47                  return worker.schedule(a);
48              }
49  
50              @Override
51              public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
52                  Action0 a = new Action0() {
53                      @Override
54                      public void call() {
55                          setThreadName();
56                          action.call();
57                      }
58                  };
59                  return worker.schedule(a, delayTime, unit);
60              }
61  
62          };
63          return w;
64  
65      }
66  
67      private void setThreadName() {
68          String name = Thread.currentThread().getName();
69          String newName = updateNameWithId(name, id);
70          Thread.currentThread().setName(newName);
71      }
72  
73      private static String updateNameWithId(String name, String id) {
74          final String newName;
75          if (name == null) {
76              newName = id;
77          } else {
78              Matcher matcher = pattern.matcher(name);
79              if (matcher.find()) {
80                  newName = name.replace(matcher.group(), "schedId=" + id);
81              } else {
82                  newName = name + "|schedId=" + id;
83              }
84          }
85          return newName;
86      }
87  
88  }