1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.util.ArrayDeque;
4 import java.util.Comparator;
5 import java.util.Deque;
6 import java.util.HashMap;
7 import java.util.Map;
8
9 import com.github.davidmoten.util.Preconditions;
10
11 import rx.Observable.Operator;
12 import rx.Producer;
13 import rx.Subscriber;
14
15
16
17
18
19
20
21
22
23 public final class OperatorWindowMinMax<T> implements Operator<T, T> {
24
25 private final int windowSize;
26 private final Comparator<? super T> comparator;
27 private final Metric metric;
28
29 public OperatorWindowMinMax(int windowSize, Comparator<? super T> comparator, Metric metric) {
30 Preconditions.checkArgument(windowSize > 0, "windowSize must be greater than zero");
31 Preconditions.checkNotNull(comparator, "comparator cannot be null");
32 Preconditions.checkNotNull(metric, "metric cannot be null");
33 this.windowSize = windowSize;
34 this.comparator = comparator;
35 this.metric = metric;
36 }
37
38 public enum Metric {
39 MIN, MAX;
40 }
41
42 @Override
43 public Subscriber<? super T> call(final Subscriber<? super T> child) {
44 return new Subscriber<T>(child) {
45
46 long count = 0;
47
48
49 final Deque<Long> q = new ArrayDeque<Long>();
50
51
52 final Map<Long, T> values = new HashMap<Long, T>();
53
54 @Override
55 public void onCompleted() {
56 child.onCompleted();
57 }
58
59 @Override
60 public void onError(Throwable e) {
61 child.onError(e);
62 }
63
64 @Override
65 public void onNext(T t) {
66 count++;
67
68 addToQueue(t);
69 if (count >= windowSize) {
70
71
72
73 Long head = q.peekFirst();
74 final T value;
75 if (head == count - windowSize) {
76
77 values.remove(q.pollFirst());
78 value = values.get(q.peekFirst());
79 } else {
80 value = values.get(head);
81 }
82 child.onNext(value);
83 }
84 }
85
86 private void addToQueue(T t) {
87 Long v;
88 while ((v = q.peekLast()) != null && compare(t, values.get(v)) <= 0) {
89 values.remove(q.pollLast());
90 }
91 values.put(count, t);
92 q.offerLast(count);
93 }
94
95 @Override
96 public void setProducer(final Producer producer) {
97 child.setProducer(producer);
98 producer.request(windowSize - 1);
99 }
100
101 };
102 }
103
104 private int compare(T a, T b) {
105 if (metric == Metric.MIN) {
106 return comparator.compare(a, b);
107 } else {
108 return comparator.compare(b, a);
109 }
110 }
111
112 }