View Javadoc
1   package com.github.davidmoten.rx2;
2   
3   import com.github.davidmoten.junit.Asserts;
4   import com.github.davidmoten.rx2.observable.CloseableObservableWithReset;
5   import io.reactivex.Flowable;
6   import io.reactivex.Observable;
7   import io.reactivex.annotations.NonNull;
8   import io.reactivex.functions.Consumer;
9   import io.reactivex.schedulers.Schedulers;
10  import org.junit.Test;
11  import org.reactivestreams.Subscription;
12  
13  import java.util.List;
14  import java.util.concurrent.TimeUnit;
15  import java.util.concurrent.atomic.AtomicInteger;
16  
17  import static org.junit.Assert.assertTrue;
18  
19  public class ObservablesTest {
20  
21      @Test
22      public void isUtilityClass() {
23          Asserts.assertIsUtilityClass(Observables.class);
24      }
25  
26      @Test
27      public void testCache() {
28  
29          final AtomicInteger subscriptionCount = new AtomicInteger(0);
30  
31          Flowable<String> source = Flowable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
32  
33          final CloseableObservableWithReset<List<String>> closeable = Observables
34                  .cache(source.doOnSubscribe(new Consumer<Subscription>() {
35                      @Override
36                      public void accept(@NonNull Subscription subscription) throws Exception {
37                          subscriptionCount.incrementAndGet();
38                      }
39                  }).toList().toObservable(), 3, TimeUnit.SECONDS, Schedulers.computation());
40  
41          Observable<List<String>> timed = closeable.observable()
42                  .doOnNext(new Consumer<List<String>>() {
43                      @Override
44                      public void accept(@NonNull List<String> s) throws Exception {
45                          closeable.reset();
46                      }
47                  });
48  
49          timed.subscribe();
50  
51          try {
52              Thread.sleep(1000);
53          } catch (InterruptedException e) {
54              e.printStackTrace();
55          }
56  
57          timed.subscribe();
58          try {
59              Thread.sleep(4000);
60          } catch (InterruptedException e) {
61              e.printStackTrace();
62          }
63  
64          timed.subscribe();
65  
66          assertTrue(subscriptionCount.get() == 2);
67  
68          // TODO assert stuff about closing
69          // closeable.close();
70          // closeable.close();
71      }
72  
73  }