View Javadoc
1   package com.github.davidmoten.rx;
2   
3   import java.util.List;
4   import java.util.concurrent.TimeUnit;
5   
6   import org.junit.Test;
7   import org.mockito.InOrder;
8   import org.mockito.Mockito;
9   
10  import rx.Subscription;
11  import rx.functions.Action1;
12  import rx.subjects.PublishSubject;
13  
14  //TODO remove once happy that rxjava-core has coverage
15  public class SampleTest {
16  
17      @Test
18      public void testSampleDoesNotRepeatEmissions() throws InterruptedException {
19          PublishSubject<Integer> subject = PublishSubject.create();
20          @SuppressWarnings("unchecked")
21          final List<Integer> list = Mockito.mock(List.class);
22          Subscription sub = subject
23          // sample
24                  .sample(100, TimeUnit.MILLISECONDS)
25                  // subscribe and record emissions
26                  .subscribe(new Action1<Integer>() {
27                      @Override
28                      public void call(Integer n) {
29                          list.add(n);
30                      }
31                  });
32          // these items should be emitted before the first sample window closes
33          subject.onNext(1);
34          subject.onNext(2);
35          subject.onNext(3);
36  
37          // wait for at least one more sample window to have passed
38          Thread.sleep(250);
39  
40          // check that sample only reported the last item (3) once
41          InOrder inOrder = Mockito.inOrder(list);
42          inOrder.verify(list).add(3);
43          inOrder.verifyNoMoreInteractions();
44  
45          sub.unsubscribe();
46      }
47  
48  }