1 package com.github.davidmoten.rx2;
2
3 import com.github.davidmoten.junit.Asserts;
4 import com.github.davidmoten.rx2.observable.CloseableObservableWithReset;
5 import io.reactivex.Flowable;
6 import io.reactivex.Observable;
7 import io.reactivex.annotations.NonNull;
8 import io.reactivex.functions.Consumer;
9 import io.reactivex.schedulers.Schedulers;
10 import org.junit.Test;
11 import org.reactivestreams.Subscription;
12
13 import java.util.List;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.atomic.AtomicInteger;
16
17 import static org.junit.Assert.assertTrue;
18
19 public class ObservablesTest {
20
21 @Test
22 public void isUtilityClass() {
23 Asserts.assertIsUtilityClass(Observables.class);
24 }
25
26 @Test
27 public void testCache() {
28
29 final AtomicInteger subscriptionCount = new AtomicInteger(0);
30
31 Flowable<String> source = Flowable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
32
33 final CloseableObservableWithReset<List<String>> closeable = Observables
34 .cache(source.doOnSubscribe(new Consumer<Subscription>() {
35 @Override
36 public void accept(@NonNull Subscription subscription) throws Exception {
37 subscriptionCount.incrementAndGet();
38 }
39 }).toList().toObservable(), 3, TimeUnit.SECONDS, Schedulers.computation());
40
41 Observable<List<String>> timed = closeable.observable()
42 .doOnNext(new Consumer<List<String>>() {
43 @Override
44 public void accept(@NonNull List<String> s) throws Exception {
45 closeable.reset();
46 }
47 });
48
49 timed.subscribe();
50
51 try {
52 Thread.sleep(1000);
53 } catch (InterruptedException e) {
54 e.printStackTrace();
55 }
56
57 timed.subscribe();
58 try {
59 Thread.sleep(4000);
60 } catch (InterruptedException e) {
61 e.printStackTrace();
62 }
63
64 timed.subscribe();
65
66 assertTrue(subscriptionCount.get() == 2);
67
68
69
70
71 }
72
73 }