View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.util.ArrayDeque;
4   import java.util.Comparator;
5   import java.util.Deque;
6   import java.util.HashMap;
7   import java.util.Map;
8   
9   import org.reactivestreams.Subscriber;
10  import org.reactivestreams.Subscription;
11  
12  import com.github.davidmoten.guavamini.Preconditions;
13  
14  import io.reactivex.Flowable;
15  import io.reactivex.FlowableSubscriber;
16  import io.reactivex.internal.subscriptions.SubscriptionHelper;
17  
18  /**
19   * Uses a double-ended queue and collapses entries when they are redundant
20   * (whenever a value is added to the queue all values at the end of the queue
21   * that are greater or equal to that value are removed).
22   * 
23   * @param <T>
24   *            generic type of stream emissions
25   */
26  public final class FlowableWindowMinMax<T> extends Flowable<T> {
27  
28      private final Flowable<T> source;
29      private final int windowSize;
30      private final Comparator<? super T> comparator;
31      private final Metric metric;
32  
33      public FlowableWindowMinMax(Flowable<T> source, int windowSize, Comparator<? super T> comparator, Metric metric) {
34          Preconditions.checkArgument(windowSize > 0, "windowSize must be greater than zero");
35          Preconditions.checkNotNull(comparator, "comparator cannot be null");
36          Preconditions.checkNotNull(metric, "metric cannot be null");
37          this.source = source;
38          this.windowSize = windowSize;
39          this.comparator = comparator;
40          this.metric = metric;
41      }
42  
43      @Override
44      protected void subscribeActual(org.reactivestreams.Subscriber<? super T> child) {
45          source.subscribe(new WindowMinMaxSubscriber<T>(windowSize, comparator, metric, child));
46      }
47  
48      private static final class WindowMinMaxSubscriber<T> implements FlowableSubscriber<T>, Subscription {
49  
50          private final int windowSize;
51          private final Comparator<? super T> comparator;
52          private final Metric metric;
53          private final Subscriber<? super T> child;
54  
55          // map index to value
56          private final Map<Long, T> values;
57  
58          // queue of indices
59          private final Deque<Long> indices;
60  
61          private long count = 0;
62          private Subscription parent;
63  
64          WindowMinMaxSubscriber(int windowSize, Comparator<? super T> comparator, Metric metric,
65                  Subscriber<? super T> child) {
66              this.windowSize = windowSize;
67              this.comparator = comparator;
68              this.metric = metric;
69              this.child = child;
70              this.values = new HashMap<Long, T>(windowSize);
71              this.indices = new ArrayDeque<Long>(windowSize);
72          }
73  
74          @Override
75          public void onSubscribe(Subscription parent) {
76              if (SubscriptionHelper.validate(this.parent, parent)) {
77                  this.parent = parent;
78                  child.onSubscribe(this);
79                  parent.request(windowSize - 1);
80              }
81          }
82  
83          @Override
84          public void request(long n) {
85              if (SubscriptionHelper.validate(n)) {
86                  parent.request(n);
87              }
88          }
89  
90          @Override
91          public void cancel() {
92              parent.cancel();
93              // would be nice to clear the window here but would have performance
94              // impact because would need to worry about allowing concurrent
95              // changes to `indices` and `map`
96          }
97  
98          @Override
99          public void onComplete() {
100             child.onComplete();
101         }
102 
103         @Override
104         public void onError(Throwable e) {
105             child.onError(e);
106         }
107 
108         @Override
109         public void onNext(T t) {
110             count++;
111             // add to queue
112             addToQueue(t);
113             if (count >= windowSize) {
114                 // emit max
115 
116                 // head of queue is max
117                 Long head = indices.peekFirst();
118                 final T value;
119                 if (head == count - windowSize) {
120                     // if window past that index then remove from map
121                     values.remove(indices.pollFirst());
122                     value = values.get(indices.peekFirst());
123                 } else {
124                     value = values.get(head);
125                 }
126                 child.onNext(value);
127             }
128         }
129 
130         private void addToQueue(T t) {
131             Long v;
132             while ((v = indices.peekLast()) != null && compare(t, values.get(v)) <= 0) {
133                 values.remove(indices.pollLast());
134             }
135             values.put(count, t);
136             indices.offerLast(count);
137         }
138 
139         private int compare(T a, T b) {
140             if (metric == Metric.MIN) {
141                 return comparator.compare(a, b);
142             } else {
143                 return comparator.compare(b, a);
144             }
145         }
146     }
147 
148     public enum Metric {
149         MIN, MAX;
150     }
151 
152 }