1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.util.ArrayDeque;
4 import java.util.Comparator;
5 import java.util.Deque;
6 import java.util.HashMap;
7 import java.util.Map;
8
9 import org.reactivestreams.Subscriber;
10 import org.reactivestreams.Subscription;
11
12 import com.github.davidmoten.guavamini.Preconditions;
13
14 import io.reactivex.Flowable;
15 import io.reactivex.FlowableSubscriber;
16 import io.reactivex.internal.subscriptions.SubscriptionHelper;
17
18
19
20
21
22
23
24
25
26 public final class FlowableWindowMinMax<T> extends Flowable<T> {
27
28 private final Flowable<T> source;
29 private final int windowSize;
30 private final Comparator<? super T> comparator;
31 private final Metric metric;
32
33 public FlowableWindowMinMax(Flowable<T> source, int windowSize, Comparator<? super T> comparator, Metric metric) {
34 Preconditions.checkArgument(windowSize > 0, "windowSize must be greater than zero");
35 Preconditions.checkNotNull(comparator, "comparator cannot be null");
36 Preconditions.checkNotNull(metric, "metric cannot be null");
37 this.source = source;
38 this.windowSize = windowSize;
39 this.comparator = comparator;
40 this.metric = metric;
41 }
42
43 @Override
44 protected void subscribeActual(org.reactivestreams.Subscriber<? super T> child) {
45 source.subscribe(new WindowMinMaxSubscriber<T>(windowSize, comparator, metric, child));
46 }
47
48 private static final class WindowMinMaxSubscriber<T> implements FlowableSubscriber<T>, Subscription {
49
50 private final int windowSize;
51 private final Comparator<? super T> comparator;
52 private final Metric metric;
53 private final Subscriber<? super T> child;
54
55
56 private final Map<Long, T> values;
57
58
59 private final Deque<Long> indices;
60
61 private long count = 0;
62 private Subscription parent;
63
64 WindowMinMaxSubscriber(int windowSize, Comparator<? super T> comparator, Metric metric,
65 Subscriber<? super T> child) {
66 this.windowSize = windowSize;
67 this.comparator = comparator;
68 this.metric = metric;
69 this.child = child;
70 this.values = new HashMap<Long, T>(windowSize);
71 this.indices = new ArrayDeque<Long>(windowSize);
72 }
73
74 @Override
75 public void onSubscribe(Subscription parent) {
76 if (SubscriptionHelper.validate(this.parent, parent)) {
77 this.parent = parent;
78 child.onSubscribe(this);
79 parent.request(windowSize - 1);
80 }
81 }
82
83 @Override
84 public void request(long n) {
85 if (SubscriptionHelper.validate(n)) {
86 parent.request(n);
87 }
88 }
89
90 @Override
91 public void cancel() {
92 parent.cancel();
93
94
95
96 }
97
98 @Override
99 public void onComplete() {
100 child.onComplete();
101 }
102
103 @Override
104 public void onError(Throwable e) {
105 child.onError(e);
106 }
107
108 @Override
109 public void onNext(T t) {
110 count++;
111
112 addToQueue(t);
113 if (count >= windowSize) {
114
115
116
117 Long head = indices.peekFirst();
118 final T value;
119 if (head == count - windowSize) {
120
121 values.remove(indices.pollFirst());
122 value = values.get(indices.peekFirst());
123 } else {
124 value = values.get(head);
125 }
126 child.onNext(value);
127 }
128 }
129
130 private void addToQueue(T t) {
131 Long v;
132 while ((v = indices.peekLast()) != null && compare(t, values.get(v)) <= 0) {
133 values.remove(indices.pollLast());
134 }
135 values.put(count, t);
136 indices.offerLast(count);
137 }
138
139 private int compare(T a, T b) {
140 if (metric == Metric.MIN) {
141 return comparator.compare(a, b);
142 } else {
143 return comparator.compare(b, a);
144 }
145 }
146 }
147
148 public enum Metric {
149 MIN, MAX;
150 }
151
152 }