View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.util.concurrent.TimeUnit;
4   import java.util.concurrent.atomic.AtomicInteger;
5   import java.util.concurrent.atomic.AtomicReference;
6   
7   import rx.Observable;
8   import rx.Observable.Operator;
9   import rx.Observable.Transformer;
10  import rx.Scheduler;
11  import rx.Scheduler.Worker;
12  import rx.Subscriber;
13  import rx.functions.Action0;
14  import rx.observers.Subscribers;
15  import rx.subscriptions.Subscriptions;
16  
17  public final class TransformerDelayFinalUnsubscribe<T> implements Transformer<T, T> {
18  
19  	private final long delayMs;
20  	private final Scheduler scheduler;
21  
22  	public TransformerDelayFinalUnsubscribe(long delayMs, Scheduler scheduler) {
23  		this.delayMs = delayMs;
24  		this.scheduler = scheduler;
25  	}
26  
27  	@Override
28  	public Observable<T> call(final Observable<T> o) {
29  		final AtomicInteger count = new AtomicInteger();
30  		final AtomicReference<Subscriber<T>> extra = new AtomicReference<Subscriber<T>>();
31  		final AtomicReference<Worker> worker = new AtomicReference<Worker>();
32  		final Object lock = new Object();
33  		return o //
34  				.doOnSubscribe(new Action0() {
35  					@Override
36  					public void call() {
37  						if (count.incrementAndGet() == 1) {
38  							final Worker w;
39  							final Subscriber<T> sub;
40  							synchronized (lock) {
41  								if (extra.get() == null) {
42  									sub = doNothing();
43  									extra.set(sub);
44  								} else {
45  									sub = null;
46  								}
47  								w = worker.get();
48  								worker.set(null);
49  							}
50  							if (w != null) {
51  								w.unsubscribe();
52  							}
53  							if (sub != null) {
54  								o.subscribe(sub);
55  							}
56  						} else {
57  							final Worker w;
58  							synchronized (lock) {
59  								w = worker.get();
60  								worker.set(null);
61  							}
62  							if (w != null) {
63  								w.unsubscribe();
64  							}
65  						}
66  					}
67  				}) //
68  				.lift(new OperatorAddToSubscription<T>(new Action0() {
69  
70  					@Override
71  					public void call() {
72  						if (count.decrementAndGet() == 0) {
73  							final Worker newW;
74  							final Worker w;
75  							synchronized (lock) {
76  								w = worker.get();
77  								newW = scheduler.createWorker();
78  								worker.set(newW);
79  							}
80  							if (w != null) {
81  								w.unsubscribe();
82  							}
83  							// scheduler unsubscribe
84  							newW.schedule(new Action0() {
85  								@Override
86  								public void call() {
87  									Subscriber<T> sub;
88  									synchronized (lock) {
89  										sub = extra.get();
90  										extra.set(null);
91  									}
92  									sub.unsubscribe();
93  									newW.unsubscribe();
94  									worker.compareAndSet(newW, null);
95  								}
96  							}, delayMs, TimeUnit.MILLISECONDS);
97  						}
98  					}
99  				}));
100 	}
101 
102 	private static <T> Subscriber<T> doNothing() {
103 		return new Subscriber<T>() {
104 
105 			@Override
106 			public void onCompleted() {
107 			}
108 
109 			@Override
110 			public void onError(Throwable e) {
111 			}
112 
113 			@Override
114 			public void onNext(T t) {
115 			}
116 		};
117 	}
118 
119 	private static final class OperatorAddToSubscription<T> implements Operator<T, T> {
120 
121 		private final Action0 action;
122 
123 		OperatorAddToSubscription(Action0 action) {
124 			this.action = action;
125 		}
126 
127 		@Override
128 		public Subscriber<? super T> call(Subscriber<? super T> child) {
129 			Subscriber<T> parent = Subscribers.wrap(child);
130 			child.add(Subscriptions.create(action));
131 			return parent;
132 		}
133 
134 	}
135 
136 }