1 package com.github.davidmoten.rx2;
2
3 import com.github.davidmoten.guavamini.Optional;
4 import com.github.davidmoten.rx2.flowable.CachedFlowable;
5 import com.github.davidmoten.rx2.observable.CachedObservable;
6 import com.github.davidmoten.rx2.observable.CloseableObservableWithReset;
7 import io.reactivex.Flowable;
8 import io.reactivex.Observable;
9 import io.reactivex.Scheduler;
10 import io.reactivex.disposables.Disposable;
11 import io.reactivex.functions.Consumer;
12
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.atomic.AtomicReference;
15
16 public final class Observables {
17
18
19 private Observables() {
20
21 }
22
23
24
25
26
27
28
29
30
31
32
33
34 public static <T> CachedObservable<T> cache(Observable<T> source) {
35 return new CachedObservable<T>(source);
36 }
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public static <T> Observable<T> cache(final Observable<T> source, final long duration,
59 final TimeUnit unit, final Scheduler.Worker worker) {
60 final AtomicReference<CachedObservable<T>> cacheRef = new AtomicReference<CachedObservable<T>>();
61 CachedObservable<T> cache = new CachedObservable<T>(source);
62 cacheRef.set(cache);
63 return cache.doOnSubscribe(new Consumer<Disposable>() {
64 @Override
65 public void accept(Disposable d) {
66 Runnable action = new Runnable() {
67 @Override
68 public void run() {
69 cacheRef.get().reset();
70 }
71 };
72 worker.schedule(action, duration, unit);
73 }
74 });
75 }
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 public static <T> CloseableObservableWithReset<T> cache(final Observable<T> source,
96 final long duration, final TimeUnit unit, final Scheduler scheduler) {
97 final AtomicReference<CachedObservable<T>> cacheRef = new AtomicReference<CachedObservable<T>>();
98 final AtomicReference<Optional<Scheduler.Worker>> workerRef = new AtomicReference<Optional<Scheduler.Worker>>(
99 Optional.<Scheduler.Worker> absent());
100 CachedObservable<T> cache = new CachedObservable<T>(source);
101 cacheRef.set(cache);
102 Runnable closeAction = new Runnable() {
103 @Override
104 public void run() {
105 while (true) {
106 Optional<Scheduler.Worker> w = workerRef.get();
107 if (w == null) {
108
109 break;
110 } else {
111 if (workerRef.compareAndSet(w, null)) {
112 if (w.isPresent()) {
113 w.get().dispose();
114 }
115
116 workerRef.set(null);
117 break;
118 }
119 }
120
121 }
122 }
123 };
124 Runnable resetAction = new Runnable() {
125
126 @Override
127 public void run() {
128 startScheduledResetAgain(duration, unit, scheduler, cacheRef, workerRef);
129 }
130 };
131 return new CloseableObservableWithReset<T>(cache, closeAction, resetAction);
132 }
133
134
135 private static <T> void startScheduledResetAgain(final long duration, final TimeUnit unit,
136 final Scheduler scheduler, final AtomicReference<CachedObservable<T>> cacheRef,
137 final AtomicReference<Optional<Scheduler.Worker>> workerRef) {
138
139 Runnable action = new Runnable() {
140 @Override
141 public void run() {
142 cacheRef.get().reset();
143 }
144 };
145
146 while (true) {
147 Optional<Scheduler.Worker> wOld = workerRef.get();
148 if (wOld == null) {
149
150 return;
151 }
152 Optional<Scheduler.Worker> w = Optional.of(scheduler.createWorker());
153 if (workerRef.compareAndSet(wOld, w)) {
154 if (wOld.isPresent())
155 wOld.get().dispose();
156 w.get().schedule(action, duration, unit);
157 break;
158 }
159 }
160 }
161 }