View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.util.ArrayDeque;
4   import java.util.Comparator;
5   import java.util.Deque;
6   import java.util.HashMap;
7   import java.util.Map;
8   
9   import com.github.davidmoten.util.Preconditions;
10  
11  import rx.Observable.Operator;
12  import rx.Producer;
13  import rx.Subscriber;
14  
15  /**
16   * Uses a double-ended queue and collapses entries when they are redundant
17   * (whenever a value is added to the queue all values at the end of the queue
18   * that are greater or equal to that value are removed).
19   * 
20   * @param <T>
21   *            generic type of stream emissions
22   */
23  public final class OperatorWindowMinMax<T> implements Operator<T, T> {
24  
25      private final int windowSize;
26      private final Comparator<? super T> comparator;
27      private final Metric metric;
28  
29      public OperatorWindowMinMax(int windowSize, Comparator<? super T> comparator, Metric metric) {
30          Preconditions.checkArgument(windowSize > 0, "windowSize must be greater than zero");
31          Preconditions.checkNotNull(comparator, "comparator cannot be null");
32          Preconditions.checkNotNull(metric, "metric cannot be null");
33          this.windowSize = windowSize;
34          this.comparator = comparator;
35          this.metric = metric;
36      }
37  
38      public enum Metric {
39          MIN, MAX;
40      }
41  
42      @Override
43      public Subscriber<? super T> call(final Subscriber<? super T> child) {
44          return new Subscriber<T>(child) {
45  
46              long count = 0;
47  
48              // queue of indices
49              final Deque<Long> q = new ArrayDeque<Long>();
50  
51              // map index to value
52              final Map<Long, T> values = new HashMap<Long, T>();
53  
54              @Override
55              public void onCompleted() {
56                  child.onCompleted();
57              }
58  
59              @Override
60              public void onError(Throwable e) {
61                  child.onError(e);
62              }
63  
64              @Override
65              public void onNext(T t) {
66                  count++;
67                  // add to queue
68                  addToQueue(t);
69                  if (count >= windowSize) {
70                      // emit max
71  
72                      // head of queue is max
73                      Long head = q.peekFirst();
74                      final T value;
75                      if (head == count - windowSize) {
76                          // if window past that index then remove from map
77                          values.remove(q.pollFirst());
78                          value = values.get(q.peekFirst());
79                      } else {
80                          value = values.get(head);
81                      }
82                      child.onNext(value);
83                  }
84              }
85  
86              private void addToQueue(T t) {
87                  Long v;
88                  while ((v = q.peekLast()) != null && compare(t, values.get(v)) <= 0) {
89                      values.remove(q.pollLast());
90                  }
91                  values.put(count, t);
92                  q.offerLast(count);
93              }
94  
95              @Override
96              public void setProducer(final Producer producer) {
97                  child.setProducer(producer);
98                  producer.request(windowSize - 1);
99              }
100 
101         };
102     }
103 
104     private int compare(T a, T b) {
105         if (metric == Metric.MIN) {
106             return comparator.compare(a, b);
107         } else {
108             return comparator.compare(b, a);
109         }
110     }
111 
112 }