View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.util.Collection;
4   import java.util.Deque;
5   import java.util.Iterator;
6   import java.util.LinkedList;
7   
8   import com.github.davidmoten.util.Preconditions;
9   
10  import rx.functions.Func0;
11  import rx.plugins.RxJavaPlugins;
12  
13  /**
14   * <p>
15   * This abstraction around multiple queues exists as a strategy to reclaim file
16   * system space taken by file based queues. The strategy is to use a double
17   * ended queue of queues (each queue having its own files). As the number of
18   * entries added to a queue (regardless of how many are read) meets a threshold
19   * another queue is created on the end of the deque and new entries then are
20   * added to that. As entries are read from a queue that is not the last queue,
21   * it is deleted when empty and its file resources recovered (deleted).
22   * 
23   * <p>
24   * {@code RollingSPSCQueue} is partially thread-safe. It is designed to support
25   * {@code OperatorBufferToFile} and expects calls to {@code offer()} to be
26   * sequential (a happens-before relationship), and calls to {@code poll()} to be
27   * sequential. Calls to {@code offer()}, {@code poll()}, {@code isEmpty()},
28   * {@code peek()},{@code close()} may happen concurrently.
29   * 
30   * @param <T>
31   *            type of item being queued
32   */
33  class RollingSPSCQueue<T> implements QueueWithResources<T> {
34  
35  	private final Func0<QueueWithResources<T>> queueFactory;
36  	private final long maxSizeBytesPerQueue;
37  	private final long maxItemsPerQueue;
38  	private final Deque<QueueWithResources<T>> queues = new LinkedList<QueueWithResources<T>>();
39  
40  	// counter used to determine when to rollover to another queue
41  	// visibility managed by the fact that calls to offer are happens-before
42  	// sequential
43  	private long count;
44  
45  	// guarded by queues
46  	private boolean unsubscribed;
47  
48  	RollingSPSCQueue(Func0<QueueWithResources<T>> queueFactory, long maxSizeBytesPerQueue, long maxItemsPerQueue) {
49  		Preconditions.checkNotNull(queueFactory);
50  		Preconditions.checkArgument(maxSizeBytesPerQueue > 0, "maxSizeBytesPerQueue must be greater than zero");
51  		Preconditions.checkArgument(maxItemsPerQueue > 1, "maxSizeBytesPerQueue must be greater than one");
52  		this.count = 0;
53  		this.maxSizeBytesPerQueue = maxSizeBytesPerQueue;
54  		this.unsubscribed = false;
55  		this.queueFactory = queueFactory;
56  		this.maxItemsPerQueue = maxItemsPerQueue;
57  	}
58  
59  	@Override
60  	public void unsubscribe() {
61  		if (unsubscribed) {
62  			return;
63  		}
64  		synchronized (queues) {
65  			if (!unsubscribed) {
66  				unsubscribed = true;
67  				try {
68  					for (QueueWithResources<T> q : queues) {
69  						q.unsubscribe();
70  					}
71  					queues.clear();
72  				} catch (RuntimeException e) {
73  					RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
74  					throw e;
75  				} catch (Error e) {
76  					RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
77  					throw e;
78  				}
79  			}
80  		}
81  	}
82  
83  	@Override
84  	public boolean isUnsubscribed() {
85  		if (unsubscribed)
86  			return true;
87  		synchronized (queues) {
88  			return unsubscribed;
89  		}
90  	}
91  
92  	@Override
93  	public boolean offer(T t) {
94  		// limited thread safety (offer/poll/close/peek/isEmpty concurrent but
95  		// not offer and offer)
96  		if (unsubscribed) {
97  			return true;
98  		}
99  		count++;
100 		if (createAnotherQueue()) {
101 			count = 1;
102 			QueueWithResources<T> q = queueFactory.call();
103 			synchronized (queues) {
104 				if (!unsubscribed) {
105 					QueueWithResources<T> last = queues.peekLast();
106 					if (last != null) {
107 						last.freeResources();
108 					}
109 					queues.offerLast(q);
110 					return q.offer(t);
111 				} else {
112 					return true;
113 				}
114 			}
115 		} else {
116 			synchronized (queues) {
117 				if (unsubscribed) {
118 					return true;
119 				}
120 				return queues.peekLast().offer(t);
121 			}
122 		}
123 	}
124 
125 	private boolean createAnotherQueue() {
126 		if (count == 1) {
127 			// first call to offer
128 			return true;
129 		} else if (count == maxItemsPerQueue) {
130 			return true;
131 		} else if (maxSizeBytesPerQueue != Long.MAX_VALUE) {
132 			synchronized (queues) {
133 				if (unsubscribed) {
134 					return true;
135 				}
136 				return queues.peekLast().resourcesSize() >= maxSizeBytesPerQueue;
137 			}
138 		} else {
139 			return false;
140 		}
141 	}
142 
143 	@Override
144 	public T poll() {
145 		// limited thread safety (offer/poll/close/peek/isEmpty concurrent but
146 		// not poll and poll)
147 		if (unsubscribed) {
148 			return null;
149 		}
150 		while (true) {
151 			synchronized (queues) {
152 				if (unsubscribed) {
153 					return null;
154 				}
155 				QueueWithResources<T> first = queues.peekFirst();
156 				if (first == null) {
157 					return null;
158 				}
159 				T value = first.poll();
160 				if (value == null) {
161 					if (first == queues.peekLast()) {
162 						return null;
163 					} else {
164 						QueueWithResources<T> removed = queues.pollFirst();
165 						if (removed != null)
166 							removed.unsubscribe();
167 					}
168 				} else {
169 					return value;
170 				}
171 			}
172 		}
173 	}
174 
175 	@Override
176 	public boolean isEmpty() {
177 		// thread-safe (will just return true if queue has been closed)
178 		if (unsubscribed) {
179 			return true;
180 		}
181 		synchronized (queues) {
182 			if (unsubscribed) {
183 				return true;
184 			}
185 			QueueWithResources<T> first = queues.peekFirst();
186 			if (first == null) {
187 				return true;
188 			} else {
189 				return queues.peekLast() == first && first.isEmpty();
190 			}
191 		}
192 	}
193 
194 	@Override
195 	public void clear() {
196 		throw new UnsupportedOperationException();
197 	}
198 
199 	@Override
200 	public int size() {
201 		throw new UnsupportedOperationException();
202 	}
203 
204 	@Override
205 	public T peek() {
206 		throw new UnsupportedOperationException();
207 	}
208 
209 	@Override
210 	public boolean contains(Object o) {
211 		throw new UnsupportedOperationException();
212 	}
213 
214 	@Override
215 	public Iterator<T> iterator() {
216 		throw new UnsupportedOperationException();
217 	}
218 
219 	@Override
220 	public Object[] toArray() {
221 		throw new UnsupportedOperationException();
222 	}
223 
224 	@SuppressWarnings("hiding")
225 	@Override
226 	public <T> T[] toArray(T[] a) {
227 		throw new UnsupportedOperationException();
228 	}
229 
230 	@Override
231 	public boolean remove(Object o) {
232 		throw new UnsupportedOperationException();
233 	}
234 
235 	@Override
236 	public boolean containsAll(Collection<?> c) {
237 		throw new UnsupportedOperationException();
238 	}
239 
240 	@Override
241 	public boolean addAll(Collection<? extends T> c) {
242 		throw new UnsupportedOperationException();
243 	}
244 
245 	@Override
246 	public boolean removeAll(Collection<?> c) {
247 		throw new UnsupportedOperationException();
248 	}
249 
250 	@Override
251 	public boolean retainAll(Collection<?> c) {
252 		throw new UnsupportedOperationException();
253 	}
254 
255 	@Override
256 	public boolean add(T e) {
257 		throw new UnsupportedOperationException();
258 	}
259 
260 	@Override
261 	public T remove() {
262 		throw new UnsupportedOperationException();
263 	}
264 
265 	@Override
266 	public T element() {
267 		throw new UnsupportedOperationException();
268 	}
269 
270 	@Override
271 	public void freeResources() {
272 		// do nothing
273 	}
274 
275 	@Override
276 	public long resourcesSize() {
277 		throw new UnsupportedOperationException();
278 	}
279 
280 }