1 package com.github.davidmoten.rx; 2 3 import java.util.concurrent.TimeUnit; 4 import java.util.regex.Matcher; 5 import java.util.regex.Pattern; 6 7 import rx.Scheduler; 8 import rx.Subscription; 9 import rx.functions.Action0; 10 11 public class SchedulerWithId extends Scheduler { 12 13 private final Scheduler scheduler; 14 private final String id; 15 private final static Pattern pattern = Pattern.compile("\\bschedId=\\[[^\\]]+\\]+\\b"); 16 17 public SchedulerWithId(Scheduler scheduler, String id) { 18 this.scheduler = scheduler; 19 this.id = "[" + id + "]"; 20 } 21 22 @Override 23 public Worker createWorker() { 24 25 final Worker worker = scheduler.createWorker(); 26 Worker w = new Worker() { 27 28 @Override 29 public void unsubscribe() { 30 worker.unsubscribe(); 31 } 32 33 @Override 34 public boolean isUnsubscribed() { 35 return worker.isUnsubscribed(); 36 } 37 38 @Override 39 public Subscription schedule(final Action0 action) { 40 Action0 a = new Action0() { 41 @Override 42 public void call() { 43 setThreadName(); 44 action.call(); 45 } 46 }; 47 return worker.schedule(a); 48 } 49 50 @Override 51 public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { 52 Action0 a = new Action0() { 53 @Override 54 public void call() { 55 setThreadName(); 56 action.call(); 57 } 58 }; 59 return worker.schedule(a, delayTime, unit); 60 } 61 62 }; 63 return w; 64 65 } 66 67 private void setThreadName() { 68 String name = Thread.currentThread().getName(); 69 String newName = updateNameWithId(name, id); 70 Thread.currentThread().setName(newName); 71 } 72 73 private static String updateNameWithId(String name, String id) { 74 final String newName; 75 if (name == null) { 76 newName = id; 77 } else { 78 Matcher matcher = pattern.matcher(name); 79 if (matcher.find()) { 80 newName = name.replace(matcher.group(), "schedId=" + id); 81 } else { 82 newName = name + "|schedId=" + id; 83 } 84 } 85 return newName; 86 } 87 88 }