View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.io.DataInputStream;
4   import java.io.DataOutputStream;
5   import java.io.EOFException;
6   import java.io.File;
7   import java.io.FileNotFoundException;
8   import java.io.IOException;
9   import java.io.InputStream;
10  import java.io.OutputStream;
11  import java.io.RandomAccessFile;
12  import java.util.Collection;
13  import java.util.Iterator;
14  import java.util.concurrent.atomic.AtomicLong;
15  
16  import com.github.davidmoten.rx.buffertofile.DataSerializer;
17  import com.github.davidmoten.util.Preconditions;
18  
19  class FileBasedSPSCQueue<T> implements QueueWithResources<T> {
20  
21  	final File file;
22  	final DataSerializer<T> serializer;
23  	final AtomicLong size;
24  	final byte[] writeBuffer;
25  	final byte[] readBuffer;
26  	final Object writeLock = new Object();
27  	private final Object accessLock = new Object();
28  	private final DataOutputStream output;
29  	private final DataInputStream input;
30  
31  	// mutable state
32  
33  	int readBufferPosition = 0;
34  	long readPosition = 0;
35  	int readBufferLength = 0;
36  	volatile long writePosition;
37  	volatile int writeBufferPosition;
38  	// guarded by accessLock
39  	private FileAccessor accessor;
40  	private volatile boolean unsubscribed = false;
41  
42  	FileBasedSPSCQueue(int bufferSizeBytes, File file, DataSerializer<T> serializer) {
43  		Preconditions.checkArgument(bufferSizeBytes > 0, "bufferSizeBytes must be greater than zero");
44  		Preconditions.checkNotNull(file);
45  		Preconditions.checkNotNull(serializer);
46  		this.readBuffer = new byte[bufferSizeBytes];
47  		this.writeBuffer = new byte[bufferSizeBytes];
48  		try {
49  			file.getParentFile().mkdirs();
50  			file.createNewFile();
51  			this.file = file;
52  		} catch (IOException e) {
53  			throw new RuntimeException(e);
54  		}
55  		this.accessor = new FileAccessor(file);
56  		this.serializer = serializer;
57  		this.size = new AtomicLong(0);
58  		this.output = new DataOutputStream(new QueueWriter());
59  		this.input = new DataInputStream(new QueueReader());
60  	}
61  
62  	private final static class FileAccessor {
63  		final RandomAccessFile fWrite;
64  		final RandomAccessFile fRead;
65  
66  		FileAccessor(File file) {
67  			try {
68  				this.fWrite = new RandomAccessFile(file, "rw");
69  				this.fRead = new RandomAccessFile(file, "r");
70  			} catch (FileNotFoundException e) {
71  				throw new RuntimeException(e);
72  			}
73  		}
74  
75  		public void close() {
76  			try {
77  				fWrite.close();
78  				fRead.close();
79  			} catch (IOException e) {
80  				throw new RuntimeException(e);
81  			}
82  		}
83  	}
84  
85  	private final class QueueWriter extends OutputStream {
86  
87  		@Override
88  		public void write(int b) throws IOException {
89  			// minimize reads of volatile writeBufferPosition
90  			int wbp = writeBufferPosition;
91  			if (wbp < writeBuffer.length) {
92  				writeBuffer[wbp] = (byte) b;
93  				writeBufferPosition = wbp + 1;
94  			} else
95  				synchronized (writeLock) {
96  					// minimize reads of volatile writePosition
97  					long wp = writePosition;
98  					accessor.fWrite.seek(wp);
99  					accessor.fWrite.write(writeBuffer);
100 					writeBuffer[0] = (byte) b;
101 					writeBufferPosition = 1;
102 					writePosition = wp + writeBuffer.length;
103 				}
104 		}
105 	}
106 
107 	// create the exception once to avoid building many Exception objects
108 	private static final EOFException EOF = new EOFException();
109 
110 	private final class QueueReader extends InputStream {
111 
112 		@Override
113 		public int read() throws IOException {
114 			if (size.get() == 0) {
115 				throw EOF;
116 			} else {
117 				if (readBufferPosition < readBufferLength) {
118 					byte b = readBuffer[readBufferPosition];
119 					readBufferPosition++;
120 					return toUnsignedInteger(b);
121 				} else {
122 					// before reading more from file we see if we can emit
123 					// directly from the writeBuffer by checking if the read
124 					// position is past the write position
125 					while (true) {
126 						long wp;
127 						int wbp;
128 						synchronized (writeLock) {
129 							wp = writePosition;
130 							wbp = writeBufferPosition;
131 						}
132 						long over = wp - readPosition;
133 						if (over > 0) {
134 							// read position is not past the write position
135 							readBufferLength = (int) Math.min(readBuffer.length, over);
136 							synchronized (accessLock) {
137 								if (accessor == null) {
138 									accessor = new FileAccessor(file);
139 								}
140 								accessor.fRead.seek(readPosition);
141 								accessor.fRead.read(readBuffer, 0, readBufferLength);
142 							}
143 							readPosition += readBufferLength;
144 							readBufferPosition = 1;
145 							return toUnsignedInteger(readBuffer[0]);
146 						} else {
147 							// read position is at or past the write position
148 							int index = -(int) over;
149 							if (index >= writeBuffer.length) {
150 								throw EOF;
151 							} else {
152 								int b = toUnsignedInteger(writeBuffer[index]);
153 								final boolean writeBufferUnchanged;
154 								synchronized (writeLock) {
155 									writeBufferUnchanged = wp == writePosition && wbp == writeBufferPosition;
156 									// if (writeBufferUnchanged) {
157 									// // reset write buffer a bit and the
158 									// readPosition so that we avoid writing
159 									// // the full contents of the write buffer
160 									// if (index >= writeBuffer.length / 2 &&
161 									// index < writeBufferPosition) {
162 									// System.arraycopy(writeBuffer, index + 1,
163 									// writeBuffer, 0,
164 									// writeBufferPosition - index - 1);
165 									// writeBufferPosition -= index + 1;
166 									// readPosition = writePosition;
167 									// } else {
168 									// readPosition++;
169 									// }
170 									// }
171 								}
172 								if (writeBufferUnchanged) {
173 									readPosition++;
174 									return b;
175 								}
176 							}
177 						}
178 					}
179 				}
180 			}
181 		}
182 	}
183 
184 	private static int toUnsignedInteger(byte b) {
185 		return b & 0x000000FF;
186 	}
187 
188 	@Override
189 	public void unsubscribe() {
190 		// must not run concurrently with offer/poll
191 		if (unsubscribed) {
192 			return;
193 		}
194 		unsubscribed = true;
195 		synchronized (accessLock) {
196 			if (accessor != null) {
197 				accessor.close();
198 				accessor = null;
199 			}
200 			size.set(0);
201 		}
202 		if (!file.delete()) {
203 			throw new RuntimeException("could not delete file " + file);
204 		}
205 	}
206 
207 	@Override
208 	public boolean isUnsubscribed() {
209 		return unsubscribed;
210 	}
211 
212 	@Override
213 	public boolean offer(T t) {
214 		// limited thread-safety
215 		// offer calls must be sequential but can happen concurrently with other
216 		// methods except unsubscribe
217 		try {
218 			serializer.serialize(output, t);
219 			size.incrementAndGet();
220 			return true;
221 		} catch (IOException e) {
222 			throw new RuntimeException(e);
223 		}
224 	}
225 
226 	@Override
227 	public T poll() {
228 		// limited thread-safety
229 		// poll calls must be sequential but can happen concurrently with other
230 		// methods except unsubscribe
231 		try {
232 			T t = serializer.deserialize(input);
233 			size.decrementAndGet();
234 			if (t == null) {
235 				// this is a trick that we can get away with due to type erasure
236 				// in java as long as the return value of poll() is checked
237 				// using NullSentinel.isNullSentinel(t) (?)
238 				return NullSentinel.instance();
239 			} else {
240 				return t;
241 			}
242 		} catch (EOFException e) {
243 			return null;
244 		} catch (IOException e) {
245 			throw new RuntimeException(e);
246 		}
247 	}
248 
249 	@Override
250 	public boolean isEmpty() {
251 		return size.get() == 0;
252 	}
253 
254 	@Override
255 	public void freeResources() {
256 		synchronized (accessLock) {
257 			if (accessor != null) {
258 				accessor.close();
259 			}
260 			accessor = null;
261 		}
262 	}
263 
264 	@Override
265 	public long resourcesSize() {
266 		return writePosition;
267 	}
268 
269 	@Override
270 	public T element() {
271 		throw new UnsupportedOperationException();
272 	}
273 
274 	@Override
275 	public T peek() {
276 		throw new UnsupportedOperationException();
277 	}
278 
279 	@Override
280 	public int size() {
281 		throw new UnsupportedOperationException();
282 	}
283 
284 	@Override
285 	public boolean add(T e) {
286 		throw new UnsupportedOperationException();
287 	}
288 
289 	@Override
290 	public T remove() {
291 		throw new UnsupportedOperationException();
292 	}
293 
294 	@Override
295 	public boolean contains(Object o) {
296 		throw new UnsupportedOperationException();
297 	}
298 
299 	@Override
300 	public Iterator<T> iterator() {
301 		throw new UnsupportedOperationException();
302 	}
303 
304 	@Override
305 	public Object[] toArray() {
306 		throw new UnsupportedOperationException();
307 	}
308 
309 	@SuppressWarnings("hiding")
310 	@Override
311 	public <T> T[] toArray(T[] a) {
312 		throw new UnsupportedOperationException();
313 	}
314 
315 	@Override
316 	public boolean remove(Object o) {
317 		throw new UnsupportedOperationException();
318 	}
319 
320 	@Override
321 	public boolean containsAll(Collection<?> c) {
322 		throw new UnsupportedOperationException();
323 	}
324 
325 	@Override
326 	public boolean addAll(Collection<? extends T> c) {
327 		throw new UnsupportedOperationException();
328 	}
329 
330 	@Override
331 	public boolean removeAll(Collection<?> c) {
332 		throw new UnsupportedOperationException();
333 	}
334 
335 	@Override
336 	public boolean retainAll(Collection<?> c) {
337 		throw new UnsupportedOperationException();
338 	}
339 
340 	@Override
341 	public void clear() {
342 		throw new UnsupportedOperationException();
343 	}
344 
345 }