View Javadoc
1   package com.github.davidmoten.rtree;
2   
3   import java.util.concurrent.atomic.AtomicLong;
4   
5   import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
6   import com.github.davidmoten.rtree.geometry.Geometry;
7   import com.github.davidmoten.rtree.internal.util.ImmutableStack;
8   
9   import rx.Observable.OnSubscribe;
10  import rx.Producer;
11  import rx.Subscriber;
12  import rx.functions.Func1;
13  
14  final class OnSubscribeSearch<T, S extends Geometry> implements OnSubscribe<Entry<T, S>> {
15  
16      private final Node<T, S> node;
17      private final Func1<? super Geometry, Boolean> condition;
18  
19      OnSubscribeSearch(Node<T, S> node, Func1<? super Geometry, Boolean> condition) {
20          this.node = node;
21          this.condition = condition;
22      }
23  
24      @Override
25      public void call(Subscriber<? super Entry<T, S>> subscriber) {
26          subscriber.setProducer(new SearchProducer<T, S>(node, condition, subscriber));
27      }
28  
29      @VisibleForTesting
30      static class SearchProducer<T, S extends Geometry> implements Producer {
31  
32          private final Subscriber<? super Entry<T, S>> subscriber;
33          private final Node<T, S> node;
34          private final Func1<? super Geometry, Boolean> condition;
35          private volatile ImmutableStack<NodePosition<T, S>> stack;
36          private final AtomicLong requested = new AtomicLong(0);
37  
38          SearchProducer(Node<T, S> node, Func1<? super Geometry, Boolean> condition,
39                  Subscriber<? super Entry<T, S>> subscriber) {
40              this.node = node;
41              this.condition = condition;
42              this.subscriber = subscriber;
43              stack = ImmutableStack.create(new NodePosition<T, S>(node, 0));
44          }
45  
46          @Override
47          public void request(long n) {
48              try {
49                  if (n <= 0 || requested.get() == Long.MAX_VALUE)
50                      // none requested or already started with fast path
51                      return;
52                  else if (n == Long.MAX_VALUE && requested.compareAndSet(0, Long.MAX_VALUE)) {
53                      // fast path
54                      requestAll();
55                  } else
56                      requestSome(n);
57              } catch (RuntimeException e) {
58                  subscriber.onError(e);
59              }
60          }
61  
62          private void requestAll() {
63              node.searchWithoutBackpressure(condition, subscriber);
64              if (!subscriber.isUnsubscribed())
65                  subscriber.onCompleted();
66          }
67  
68          private void requestSome(long n) {
69              // back pressure path
70              // this algorithm copied roughly from
71              // rxjava-core/OnSubscribeFromIterable.java
72  
73              // rxjava used AtomicLongFieldUpdater instead of AtomicLong
74              // but benchmarks showed no benefit here so reverted to AtomicLong
75              long previousCount = getAndAddRequest(requested, n);
76              if (previousCount == 0) {
77                  // don't touch stack every time during the loop because
78                  // is a volatile and every write forces a thread memory
79                  // cache flush
80                  ImmutableStack<NodePosition<T, S>> st = stack;
81                  while (true) {
82                      // minimize atomic reads by assigning to a variable here
83                      long r = requested.get();
84                      st = Backpressure.search(condition, subscriber, st, r);
85                      if (st.isEmpty()) {
86                          // release some state for gc (although empty stack so not very significant)
87                          stack = null;
88                          if (!subscriber.isUnsubscribed()) {
89                              subscriber.onCompleted();
90                          }
91                          return;
92                      } else {
93                          stack = st;
94                          if (requested.addAndGet(-r) == 0)
95                              return;
96                      }
97                  }
98  
99              }
100         }
101     }
102     
103     /**
104      * Adds {@code n} to {@code requested} and returns the value prior to
105      * addition once the addition is successful (uses CAS semantics). If
106      * overflows then sets {@code requested} field to {@code Long.MAX_VALUE}.
107      * 
108      * @param requested
109      *            atomic field updater for a request count
110      * @param n
111      *            the number of requests to add to the requested count
112      * @return requested value just prior to successful addition
113      */
114     private static long getAndAddRequest(AtomicLong requested, long n) {
115         // add n to field but check for overflow
116         while (true) {
117             long current = requested.get();
118             long next = current + n;
119             // check for overflow
120             if (next < 0) {
121                 next = Long.MAX_VALUE;
122             }
123             if (requested.compareAndSet(current, next)) {
124                 return current;
125             }
126         }
127     }
128 
129 }