View Javadoc
1   package com.github.davidmoten.internal.operators;
2   
3   import static org.junit.Assert.assertEquals;
4   import static org.junit.Assert.assertFalse;
5   import static org.junit.Assert.assertTrue;
6   
7   import java.util.Arrays;
8   import java.util.Comparator;
9   import java.util.List;
10  import java.util.concurrent.atomic.AtomicBoolean;
11  
12  import org.junit.Test;
13  
14  import com.github.davidmoten.rtree.RTree;
15  import com.github.davidmoten.rtree.geometry.Geometries;
16  import com.github.davidmoten.rtree.geometry.Line;
17  import com.github.davidmoten.rtree.internal.operators.OperatorBoundedPriorityQueue;
18  
19  import rx.Observable;
20  import rx.Observable.OnSubscribe;
21  import rx.Subscriber;
22  
23  public class OperatorBoundedPriorityQueueTest {
24  
25      private static Comparator<Integer> integerComparator = (i1, i2) -> i1.compareTo(i2);
26  
27      @Test
28      public void testPriority() {
29          List<Integer> list = Observable.range(1, 5)
30                  .lift(new OperatorBoundedPriorityQueue<Integer>(2, integerComparator)).toSortedList()
31                  .toBlocking().single();
32          assertEquals(Arrays.asList(1, 2), list);
33      }
34  
35      @Test
36      public void testUnsubscribeAfterFirst() {
37          final AtomicBoolean completed = new AtomicBoolean(false);
38          Observable.range(1, 5)
39                  // go through priority queue
40                  .lift(new OperatorBoundedPriorityQueue<Integer>(2, integerComparator))
41                  // subscribe
42                  .subscribe(new Subscriber<Integer>() {
43  
44                      @Override
45                      public void onCompleted() {
46                          completed.set(true);
47                      }
48  
49                      @Override
50                      public void onError(Throwable e) {
51                      }
52  
53                      @Override
54                      public void onNext(Integer t) {
55                          unsubscribe();
56                      }
57                  });
58          assertFalse(completed.get());
59      }
60  
61      @Test
62      public void testUnsubscribeAfterLastButBeforeCompletedCalled() {
63          final AtomicBoolean completed = new AtomicBoolean(false);
64          Observable.range(1, 5)
65                  // go through priority queue
66                  .lift(new OperatorBoundedPriorityQueue<Integer>(2, integerComparator))
67                  // subscribe
68                  .subscribe(new Subscriber<Integer>() {
69  
70                      int i = 0;
71  
72                      @Override
73                      public void onCompleted() {
74                          completed.set(true);
75                      }
76  
77                      @Override
78                      public void onError(Throwable e) {
79                      }
80  
81                      @Override
82                      public void onNext(Integer t) {
83                          i++;
84                          if (i == 2)
85                              unsubscribe();
86                      }
87                  });
88          assertFalse(completed.get());
89      }
90  
91      @Test
92      public void testError() {
93          final AtomicBoolean completed = new AtomicBoolean(false);
94          final AtomicBoolean error = new AtomicBoolean(false);
95          Observable.<Integer> error(new RuntimeException())
96                  // go through priority queue
97                  .lift(new OperatorBoundedPriorityQueue<Integer>(2, integerComparator))
98                  // subscribe
99                  .subscribe(new Subscriber<Integer>() {
100 
101                     @Override
102                     public void onCompleted() {
103                         completed.set(true);
104                     }
105 
106                     @Override
107                     public void onError(Throwable e) {
108                         error.set(true);
109                     }
110 
111                     @Override
112                     public void onNext(Integer t) {
113 
114                     }
115                 });
116         assertFalse(completed.get());
117         assertTrue(error.get());
118     }
119 
120     @Test
121     public void testErrorCalledJustAfterUnsubscribe() {
122         final AtomicBoolean error = new AtomicBoolean(false);
123         final Subscriber<Integer> subscriber = new Subscriber<Integer>() {
124 
125             @Override
126             public void onCompleted() {
127             }
128 
129             @Override
130             public void onError(Throwable e) {
131                 error.set(true);
132             }
133 
134             @Override
135             public void onNext(Integer t) {
136             }
137         };
138         Observable.create(new OnSubscribe<Integer>() {
139 
140             @Override
141             public void call(Subscriber<? super Integer> sub) {
142                 sub.onNext(1);
143                 subscriber.unsubscribe();
144                 sub.onError(new RuntimeException());
145             }
146         })
147                 // go through priority queue
148                 .lift(new OperatorBoundedPriorityQueue<Integer>(1, integerComparator))
149                 // subscribe
150                 .subscribe(subscriber);
151         assertFalse(error.get());
152     }
153 
154     @Test
155     public void testUnsubscribeCalledAfterFirst() {
156         final AtomicBoolean completed = new AtomicBoolean(false);
157         final AtomicBoolean next = new AtomicBoolean(false);
158         final Subscriber<Integer> subscriber = new Subscriber<Integer>() {
159 
160             @Override
161             public void onCompleted() {
162                 completed.set(true);
163             }
164 
165             @Override
166             public void onError(Throwable e) {
167             }
168 
169             @Override
170             public void onNext(Integer t) {
171                 next.set(true);
172             }
173         };
174         Observable.create(new OnSubscribe<Integer>() {
175 
176             @Override
177             public void call(Subscriber<? super Integer> sub) {
178                 sub.onNext(1);
179                 subscriber.unsubscribe();
180                 sub.onNext(2);
181                 sub.onCompleted();
182             }
183         })
184                 // go through priority queue
185                 .lift(new OperatorBoundedPriorityQueue<Integer>(1, integerComparator))
186                 // subscribe
187                 .subscribe(subscriber);
188         assertFalse(completed.get());
189         assertFalse(next.get());
190     }
191 
192     @Test(timeout = 3000)
193     public void testOperatorShouldRequestMaxFromUpstream() {
194         RTree<String, Line> tree = RTree.star().create();
195         for (int i = 0; i < 5; ++i) {
196             tree = tree.add(String.format("Hello %d", i), Geometries.line(-i, -i, 5 + i, i));
197         }
198         tree.nearest(Geometries.point(2, 0.4), Double.MAX_VALUE, 1).toBlocking().single();
199     }
200 
201 }