1 package com.github.davidmoten.internal.operators;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertTrue;
6
7 import java.util.Arrays;
8 import java.util.Comparator;
9 import java.util.List;
10 import java.util.concurrent.atomic.AtomicBoolean;
11
12 import org.junit.Test;
13
14 import com.github.davidmoten.rtree.RTree;
15 import com.github.davidmoten.rtree.geometry.Geometries;
16 import com.github.davidmoten.rtree.geometry.Line;
17 import com.github.davidmoten.rtree.internal.operators.OperatorBoundedPriorityQueue;
18
19 import rx.Observable;
20 import rx.Observable.OnSubscribe;
21 import rx.Subscriber;
22
23 public class OperatorBoundedPriorityQueueTest {
24
25 private static Comparator<Integer> integerComparator = (i1, i2) -> i1.compareTo(i2);
26
27 @Test
28 public void testPriority() {
29 List<Integer> list = Observable.range(1, 5)
30 .lift(new OperatorBoundedPriorityQueue<Integer>(2, integerComparator)).toSortedList()
31 .toBlocking().single();
32 assertEquals(Arrays.asList(1, 2), list);
33 }
34
35 @Test
36 public void testUnsubscribeAfterFirst() {
37 final AtomicBoolean completed = new AtomicBoolean(false);
38 Observable.range(1, 5)
39
40 .lift(new OperatorBoundedPriorityQueue<Integer>(2, integerComparator))
41
42 .subscribe(new Subscriber<Integer>() {
43
44 @Override
45 public void onCompleted() {
46 completed.set(true);
47 }
48
49 @Override
50 public void onError(Throwable e) {
51 }
52
53 @Override
54 public void onNext(Integer t) {
55 unsubscribe();
56 }
57 });
58 assertFalse(completed.get());
59 }
60
61 @Test
62 public void testUnsubscribeAfterLastButBeforeCompletedCalled() {
63 final AtomicBoolean completed = new AtomicBoolean(false);
64 Observable.range(1, 5)
65
66 .lift(new OperatorBoundedPriorityQueue<Integer>(2, integerComparator))
67
68 .subscribe(new Subscriber<Integer>() {
69
70 int i = 0;
71
72 @Override
73 public void onCompleted() {
74 completed.set(true);
75 }
76
77 @Override
78 public void onError(Throwable e) {
79 }
80
81 @Override
82 public void onNext(Integer t) {
83 i++;
84 if (i == 2)
85 unsubscribe();
86 }
87 });
88 assertFalse(completed.get());
89 }
90
91 @Test
92 public void testError() {
93 final AtomicBoolean completed = new AtomicBoolean(false);
94 final AtomicBoolean error = new AtomicBoolean(false);
95 Observable.<Integer> error(new RuntimeException())
96
97 .lift(new OperatorBoundedPriorityQueue<Integer>(2, integerComparator))
98
99 .subscribe(new Subscriber<Integer>() {
100
101 @Override
102 public void onCompleted() {
103 completed.set(true);
104 }
105
106 @Override
107 public void onError(Throwable e) {
108 error.set(true);
109 }
110
111 @Override
112 public void onNext(Integer t) {
113
114 }
115 });
116 assertFalse(completed.get());
117 assertTrue(error.get());
118 }
119
120 @Test
121 public void testErrorCalledJustAfterUnsubscribe() {
122 final AtomicBoolean error = new AtomicBoolean(false);
123 final Subscriber<Integer> subscriber = new Subscriber<Integer>() {
124
125 @Override
126 public void onCompleted() {
127 }
128
129 @Override
130 public void onError(Throwable e) {
131 error.set(true);
132 }
133
134 @Override
135 public void onNext(Integer t) {
136 }
137 };
138 Observable.create(new OnSubscribe<Integer>() {
139
140 @Override
141 public void call(Subscriber<? super Integer> sub) {
142 sub.onNext(1);
143 subscriber.unsubscribe();
144 sub.onError(new RuntimeException());
145 }
146 })
147
148 .lift(new OperatorBoundedPriorityQueue<Integer>(1, integerComparator))
149
150 .subscribe(subscriber);
151 assertFalse(error.get());
152 }
153
154 @Test
155 public void testUnsubscribeCalledAfterFirst() {
156 final AtomicBoolean completed = new AtomicBoolean(false);
157 final AtomicBoolean next = new AtomicBoolean(false);
158 final Subscriber<Integer> subscriber = new Subscriber<Integer>() {
159
160 @Override
161 public void onCompleted() {
162 completed.set(true);
163 }
164
165 @Override
166 public void onError(Throwable e) {
167 }
168
169 @Override
170 public void onNext(Integer t) {
171 next.set(true);
172 }
173 };
174 Observable.create(new OnSubscribe<Integer>() {
175
176 @Override
177 public void call(Subscriber<? super Integer> sub) {
178 sub.onNext(1);
179 subscriber.unsubscribe();
180 sub.onNext(2);
181 sub.onCompleted();
182 }
183 })
184
185 .lift(new OperatorBoundedPriorityQueue<Integer>(1, integerComparator))
186
187 .subscribe(subscriber);
188 assertFalse(completed.get());
189 assertFalse(next.get());
190 }
191
192 @Test(timeout = 3000)
193 public void testOperatorShouldRequestMaxFromUpstream() {
194 RTree<String, Line> tree = RTree.star().create();
195 for (int i = 0; i < 5; ++i) {
196 tree = tree.add(String.format("Hello %d", i), Geometries.line(-i, -i, 5 + i, i));
197 }
198 tree.nearest(Geometries.point(2, 0.4), Double.MAX_VALUE, 1).toBlocking().single();
199 }
200
201 }