1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.util.Collection;
4 import java.util.Deque;
5 import java.util.Iterator;
6 import java.util.LinkedList;
7
8 import com.github.davidmoten.util.Preconditions;
9
10 import rx.functions.Func0;
11 import rx.plugins.RxJavaPlugins;
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 class RollingSPSCQueue<T> implements QueueWithResources<T> {
34
35 private final Func0<QueueWithResources<T>> queueFactory;
36 private final long maxSizeBytesPerQueue;
37 private final long maxItemsPerQueue;
38 private final Deque<QueueWithResources<T>> queues = new LinkedList<QueueWithResources<T>>();
39
40
41
42
43 private long count;
44
45
46 private boolean unsubscribed;
47
48 RollingSPSCQueue(Func0<QueueWithResources<T>> queueFactory, long maxSizeBytesPerQueue, long maxItemsPerQueue) {
49 Preconditions.checkNotNull(queueFactory);
50 Preconditions.checkArgument(maxSizeBytesPerQueue > 0, "maxSizeBytesPerQueue must be greater than zero");
51 Preconditions.checkArgument(maxItemsPerQueue > 1, "maxSizeBytesPerQueue must be greater than one");
52 this.count = 0;
53 this.maxSizeBytesPerQueue = maxSizeBytesPerQueue;
54 this.unsubscribed = false;
55 this.queueFactory = queueFactory;
56 this.maxItemsPerQueue = maxItemsPerQueue;
57 }
58
59 @Override
60 public void unsubscribe() {
61 if (unsubscribed) {
62 return;
63 }
64 synchronized (queues) {
65 if (!unsubscribed) {
66 unsubscribed = true;
67 try {
68 for (QueueWithResources<T> q : queues) {
69 q.unsubscribe();
70 }
71 queues.clear();
72 } catch (RuntimeException e) {
73 RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
74 throw e;
75 } catch (Error e) {
76 RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
77 throw e;
78 }
79 }
80 }
81 }
82
83 @Override
84 public boolean isUnsubscribed() {
85 if (unsubscribed)
86 return true;
87 synchronized (queues) {
88 return unsubscribed;
89 }
90 }
91
92 @Override
93 public boolean offer(T t) {
94
95
96 if (unsubscribed) {
97 return true;
98 }
99 count++;
100 if (createAnotherQueue()) {
101 count = 1;
102 QueueWithResources<T> q = queueFactory.call();
103 synchronized (queues) {
104 if (!unsubscribed) {
105 QueueWithResources<T> last = queues.peekLast();
106 if (last != null) {
107 last.freeResources();
108 }
109 queues.offerLast(q);
110 return q.offer(t);
111 } else {
112 return true;
113 }
114 }
115 } else {
116 synchronized (queues) {
117 if (unsubscribed) {
118 return true;
119 }
120 return queues.peekLast().offer(t);
121 }
122 }
123 }
124
125 private boolean createAnotherQueue() {
126 if (count == 1) {
127
128 return true;
129 } else if (count == maxItemsPerQueue) {
130 return true;
131 } else if (maxSizeBytesPerQueue != Long.MAX_VALUE) {
132 synchronized (queues) {
133 if (unsubscribed) {
134 return true;
135 }
136 return queues.peekLast().resourcesSize() >= maxSizeBytesPerQueue;
137 }
138 } else {
139 return false;
140 }
141 }
142
143 @Override
144 public T poll() {
145
146
147 if (unsubscribed) {
148 return null;
149 }
150 while (true) {
151 synchronized (queues) {
152 if (unsubscribed) {
153 return null;
154 }
155 QueueWithResources<T> first = queues.peekFirst();
156 if (first == null) {
157 return null;
158 }
159 T value = first.poll();
160 if (value == null) {
161 if (first == queues.peekLast()) {
162 return null;
163 } else {
164 QueueWithResources<T> removed = queues.pollFirst();
165 if (removed != null)
166 removed.unsubscribe();
167 }
168 } else {
169 return value;
170 }
171 }
172 }
173 }
174
175 @Override
176 public boolean isEmpty() {
177
178 if (unsubscribed) {
179 return true;
180 }
181 synchronized (queues) {
182 if (unsubscribed) {
183 return true;
184 }
185 QueueWithResources<T> first = queues.peekFirst();
186 if (first == null) {
187 return true;
188 } else {
189 return queues.peekLast() == first && first.isEmpty();
190 }
191 }
192 }
193
194 @Override
195 public void clear() {
196 throw new UnsupportedOperationException();
197 }
198
199 @Override
200 public int size() {
201 throw new UnsupportedOperationException();
202 }
203
204 @Override
205 public T peek() {
206 throw new UnsupportedOperationException();
207 }
208
209 @Override
210 public boolean contains(Object o) {
211 throw new UnsupportedOperationException();
212 }
213
214 @Override
215 public Iterator<T> iterator() {
216 throw new UnsupportedOperationException();
217 }
218
219 @Override
220 public Object[] toArray() {
221 throw new UnsupportedOperationException();
222 }
223
224 @SuppressWarnings("hiding")
225 @Override
226 public <T> T[] toArray(T[] a) {
227 throw new UnsupportedOperationException();
228 }
229
230 @Override
231 public boolean remove(Object o) {
232 throw new UnsupportedOperationException();
233 }
234
235 @Override
236 public boolean containsAll(Collection<?> c) {
237 throw new UnsupportedOperationException();
238 }
239
240 @Override
241 public boolean addAll(Collection<? extends T> c) {
242 throw new UnsupportedOperationException();
243 }
244
245 @Override
246 public boolean removeAll(Collection<?> c) {
247 throw new UnsupportedOperationException();
248 }
249
250 @Override
251 public boolean retainAll(Collection<?> c) {
252 throw new UnsupportedOperationException();
253 }
254
255 @Override
256 public boolean add(T e) {
257 throw new UnsupportedOperationException();
258 }
259
260 @Override
261 public T remove() {
262 throw new UnsupportedOperationException();
263 }
264
265 @Override
266 public T element() {
267 throw new UnsupportedOperationException();
268 }
269
270 @Override
271 public void freeResources() {
272
273 }
274
275 @Override
276 public long resourcesSize() {
277 throw new UnsupportedOperationException();
278 }
279
280 }