1 package com.github.davidmoten.rtree;
2
3 import java.util.concurrent.atomic.AtomicLong;
4
5 import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
6 import com.github.davidmoten.rtree.geometry.Geometry;
7 import com.github.davidmoten.rtree.internal.util.ImmutableStack;
8
9 import rx.Observable.OnSubscribe;
10 import rx.Producer;
11 import rx.Subscriber;
12 import rx.functions.Func1;
13
14 final class OnSubscribeSearch<T, S extends Geometry> implements OnSubscribe<Entry<T, S>> {
15
16 private final Node<T, S> node;
17 private final Func1<? super Geometry, Boolean> condition;
18
19 OnSubscribeSearch(Node<T, S> node, Func1<? super Geometry, Boolean> condition) {
20 this.node = node;
21 this.condition = condition;
22 }
23
24 @Override
25 public void call(Subscriber<? super Entry<T, S>> subscriber) {
26 subscriber.setProducer(new SearchProducer<T, S>(node, condition, subscriber));
27 }
28
29 @VisibleForTesting
30 static class SearchProducer<T, S extends Geometry> implements Producer {
31
32 private final Subscriber<? super Entry<T, S>> subscriber;
33 private final Node<T, S> node;
34 private final Func1<? super Geometry, Boolean> condition;
35 private volatile ImmutableStack<NodePosition<T, S>> stack;
36 private final AtomicLong requested = new AtomicLong(0);
37
38 SearchProducer(Node<T, S> node, Func1<? super Geometry, Boolean> condition,
39 Subscriber<? super Entry<T, S>> subscriber) {
40 this.node = node;
41 this.condition = condition;
42 this.subscriber = subscriber;
43 stack = ImmutableStack.create(new NodePosition<T, S>(node, 0));
44 }
45
46 @Override
47 public void request(long n) {
48 try {
49 if (n <= 0 || requested.get() == Long.MAX_VALUE)
50
51 return;
52 else if (n == Long.MAX_VALUE && requested.compareAndSet(0, Long.MAX_VALUE)) {
53
54 requestAll();
55 } else
56 requestSome(n);
57 } catch (RuntimeException e) {
58 subscriber.onError(e);
59 }
60 }
61
62 private void requestAll() {
63 node.searchWithoutBackpressure(condition, subscriber);
64 if (!subscriber.isUnsubscribed())
65 subscriber.onCompleted();
66 }
67
68 private void requestSome(long n) {
69
70
71
72
73
74
75 long previousCount = getAndAddRequest(requested, n);
76 if (previousCount == 0) {
77
78
79
80 ImmutableStack<NodePosition<T, S>> st = stack;
81 while (true) {
82
83 long r = requested.get();
84 st = Backpressure.search(condition, subscriber, st, r);
85 if (st.isEmpty()) {
86
87 stack = null;
88 if (!subscriber.isUnsubscribed()) {
89 subscriber.onCompleted();
90 }
91 return;
92 } else {
93 stack = st;
94 if (requested.addAndGet(-r) == 0)
95 return;
96 }
97 }
98
99 }
100 }
101 }
102
103
104
105
106
107
108
109
110
111
112
113
114 private static long getAndAddRequest(AtomicLong requested, long n) {
115
116 while (true) {
117 long current = requested.get();
118 long next = current + n;
119
120 if (next < 0) {
121 next = Long.MAX_VALUE;
122 }
123 if (requested.compareAndSet(current, next)) {
124 return current;
125 }
126 }
127 }
128
129 }