1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.util.concurrent.TimeUnit;
4 import java.util.concurrent.atomic.AtomicInteger;
5 import java.util.concurrent.atomic.AtomicReference;
6
7 import rx.Observable;
8 import rx.Observable.Operator;
9 import rx.Observable.Transformer;
10 import rx.Scheduler;
11 import rx.Scheduler.Worker;
12 import rx.Subscriber;
13 import rx.functions.Action0;
14 import rx.observers.Subscribers;
15 import rx.subscriptions.Subscriptions;
16
17 public final class TransformerDelayFinalUnsubscribe<T> implements Transformer<T, T> {
18
19 private final long delayMs;
20 private final Scheduler scheduler;
21
22 public TransformerDelayFinalUnsubscribe(long delayMs, Scheduler scheduler) {
23 this.delayMs = delayMs;
24 this.scheduler = scheduler;
25 }
26
27 @Override
28 public Observable<T> call(final Observable<T> o) {
29 final AtomicInteger count = new AtomicInteger();
30 final AtomicReference<Subscriber<T>> extra = new AtomicReference<Subscriber<T>>();
31 final AtomicReference<Worker> worker = new AtomicReference<Worker>();
32 final Object lock = new Object();
33 return o
34 .doOnSubscribe(new Action0() {
35 @Override
36 public void call() {
37 if (count.incrementAndGet() == 1) {
38 final Worker w;
39 final Subscriber<T> sub;
40 synchronized (lock) {
41 if (extra.get() == null) {
42 sub = doNothing();
43 extra.set(sub);
44 } else {
45 sub = null;
46 }
47 w = worker.get();
48 worker.set(null);
49 }
50 if (w != null) {
51 w.unsubscribe();
52 }
53 if (sub != null) {
54 o.subscribe(sub);
55 }
56 } else {
57 final Worker w;
58 synchronized (lock) {
59 w = worker.get();
60 worker.set(null);
61 }
62 if (w != null) {
63 w.unsubscribe();
64 }
65 }
66 }
67 })
68 .lift(new OperatorAddToSubscription<T>(new Action0() {
69
70 @Override
71 public void call() {
72 if (count.decrementAndGet() == 0) {
73 final Worker newW;
74 final Worker w;
75 synchronized (lock) {
76 w = worker.get();
77 newW = scheduler.createWorker();
78 worker.set(newW);
79 }
80 if (w != null) {
81 w.unsubscribe();
82 }
83
84 newW.schedule(new Action0() {
85 @Override
86 public void call() {
87 Subscriber<T> sub;
88 synchronized (lock) {
89 sub = extra.get();
90 extra.set(null);
91 }
92 sub.unsubscribe();
93 newW.unsubscribe();
94 worker.compareAndSet(newW, null);
95 }
96 }, delayMs, TimeUnit.MILLISECONDS);
97 }
98 }
99 }));
100 }
101
102 private static <T> Subscriber<T> doNothing() {
103 return new Subscriber<T>() {
104
105 @Override
106 public void onCompleted() {
107 }
108
109 @Override
110 public void onError(Throwable e) {
111 }
112
113 @Override
114 public void onNext(T t) {
115 }
116 };
117 }
118
119 private static final class OperatorAddToSubscription<T> implements Operator<T, T> {
120
121 private final Action0 action;
122
123 OperatorAddToSubscription(Action0 action) {
124 this.action = action;
125 }
126
127 @Override
128 public Subscriber<? super T> call(Subscriber<? super T> child) {
129 Subscriber<T> parent = Subscribers.wrap(child);
130 child.add(Subscriptions.create(action));
131 return parent;
132 }
133
134 }
135
136 }