View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.io.DataInputStream;
4   import java.io.DataOutput;
5   import java.io.DataOutputStream;
6   import java.io.File;
7   import java.io.IOException;
8   import java.io.InputStream;
9   import java.io.OutputStream;
10  import java.io.RandomAccessFile;
11  import java.nio.MappedByteBuffer;
12  import java.nio.channels.FileChannel;
13  import java.nio.channels.FileChannel.MapMode;
14  import java.util.concurrent.atomic.AtomicInteger;
15  
16  import com.github.davidmoten.rx.buffertofile.DataSerializer;
17  import com.github.davidmoten.util.ByteArrayOutputStreamNoCopyUnsynchronized;
18  import com.github.davidmoten.util.Preconditions;
19  
20  public class FileBasedSPSCQueueMemoryMappedReaderWriter<T> {
21  
22  	private volatile RandomAccessFile f;
23  	private volatile FileChannel channel;
24  	private DataInputStream input;
25  	private DataOutputStream output;
26  	private MappedByteBuffer read;
27  	private MappedByteBuffer write;
28  	private final DataSerializer<T> serializer;
29  	private final File file;
30  	private final int fileSize;
31  	private final DataOutput buffer;
32  	// TODO can be passed in to constructor for reuse
33  	private final ByteArrayOutputStreamNoCopyUnsynchronized bytes;
34  	private final AtomicInteger status = new AtomicInteger(WRITTEN_READ);
35  	private final Object markerLock = new Object();
36  
37  	static final int WRITTEN_READ = 0;
38  	static final int WRITTEN_READ_NOT_STARTED = 1;
39  	static final int WRITTEN_READING = 2;
40  	static final int WRITING_NOT_READING = 3;
41  	static final int WRITING_READING = 4;
42  
43  	public FileBasedSPSCQueueMemoryMappedReaderWriter(File file, int fileSize, DataSerializer<T> serializer) {
44  		Preconditions.checkArgument(serializer.size() == 0 || serializer.size() <= fileSize - 2 * MARKER_HEADER_SIZE,
45  				"serializer.size() must be less than or equal to file based queue size - 2");
46  		this.file = file;
47  		this.serializer = serializer;
48  		this.fileSize = fileSize;
49  		this.bytes = new ByteArrayOutputStreamNoCopyUnsynchronized();
50  		this.buffer = new DataOutputStream(bytes);
51  	}
52  
53  	public FileBasedSPSCQueueMemoryMappedReaderWriter<T> openForRead() {
54  		// System.out.println("openForRead " + file);
55  
56  		if (!status.compareAndSet(WRITTEN_READ_NOT_STARTED, WRITTEN_READING))
57  			status.compareAndSet(WRITING_NOT_READING, WRITING_READING);
58  		while (true) {
59  			int st = status.get();
60  			int newStatus;
61  			if (st == WRITTEN_READ_NOT_STARTED)
62  				newStatus = WRITTEN_READING;
63  			else if (st == WRITING_NOT_READING)
64  				newStatus = WRITING_READING;
65  			else
66  				newStatus = st;
67  			if (status.compareAndSet(st, newStatus)) {
68  				checkClose(newStatus);
69  				break;
70  			}
71  		}
72  		try {
73  			if (f == null) {
74  				f = new RandomAccessFile(file, "r");
75  			}
76  			if (channel == null) {
77  				channel = f.getChannel();
78  			}
79  			read = channel.map(MapMode.READ_ONLY, 0, channel.size());
80  			input = new DataInputStream(new MappedByteBufferInputStream(read));
81  			// System.out.println("opened for read " + file.getName());
82  		} catch (IOException e) {
83  			throw new RuntimeException(e);
84  		}
85  		return this;
86  	}
87  
88  	public void closeForRead() {
89  		// System.out.println("closeForRead " + file);
90  		while (true) {
91  			int st = status.get();
92  			int newStatus;
93  			if (st == WRITTEN_READING)
94  				newStatus = WRITTEN_READ;
95  			else
96  				newStatus = st;
97  			if (status.compareAndSet(st, newStatus)) {
98  				checkClose(newStatus);
99  				break;
100 			}
101 		}
102 	}
103 
104 	public FileBasedSPSCQueueMemoryMappedReaderWriter<T> openForWrite() {
105 		// System.out.println("openForWrite " + file);
106 		while (true) {
107 			int st = status.get();
108 			int newStatus;
109 			if (st == WRITTEN_READ)
110 				newStatus = WRITING_NOT_READING;
111 			else
112 				newStatus = st;
113 			if (status.compareAndSet(st, newStatus)) {
114 				checkClose(newStatus);
115 				break;
116 			}
117 		}
118 		try {
119 			if (f == null) {
120 				f = new RandomAccessFile(file, "rw");
121 			}
122 			if (channel == null) {
123 				channel = f.getChannel();
124 			}
125 			write = channel.map(MapMode.READ_WRITE, 0, fileSize);
126 			output = new DataOutputStream(new MappedByteBufferOutputStream(write));
127 			synchronized (markerLock) {
128 				output.write(MARKER_END_OF_QUEUE);
129 			}
130 			// System.out.println("opened for write " + file.getName());
131 			return this;
132 		} catch (IOException e) {
133 			throw new RuntimeException(e);
134 		}
135 	}
136 
137 	public void closeForWrite() {
138 		// System.out.println("closeForWrite " + file);
139 		while (true) {
140 			int st = status.get();
141 			int newStatus;
142 			if (st == WRITING_READING)
143 				newStatus = WRITTEN_READING;
144 			else if (st == WRITING_NOT_READING)
145 				newStatus = WRITTEN_READ_NOT_STARTED;
146 			newStatus = st;
147 			if (status.compareAndSet(st, newStatus)) {
148 				checkClose(newStatus);
149 				break;
150 			}
151 		}
152 
153 	}
154 
155 	private void checkClose(int newStatus) {
156 		// System.out.println("close status = " + newStatus + " for " +
157 		// file.getName());
158 		if (newStatus == WRITTEN_READ) {
159 			try {
160 				channel.close();
161 				channel = null;
162 				read = null;
163 				input = null;
164 				f.close();
165 				f = null;
166 			} catch (IOException e) {
167 				throw new RuntimeException(e);
168 			}
169 		}
170 	}
171 
172 	private static class MappedByteBufferOutputStream extends OutputStream {
173 
174 		private final MappedByteBuffer write;
175 
176 		MappedByteBufferOutputStream(MappedByteBuffer write) {
177 			this.write = write;
178 		}
179 
180 		@Override
181 		public void write(int b) throws IOException {
182 			write.put((byte) b);
183 		}
184 	}
185 
186 	private static class MappedByteBufferInputStream extends InputStream {
187 
188 		private final MappedByteBuffer read;
189 
190 		MappedByteBufferInputStream(MappedByteBuffer read) {
191 			this.read = read;
192 		}
193 
194 		@Override
195 		public int read() throws IOException {
196 			return toUnsignedInteger(read.get());
197 		}
198 
199 	}
200 
201 	private static int toUnsignedInteger(byte b) {
202 		return b & 0x000000FF;
203 	}
204 
205 	private static final EOFRuntimeException EOF = new EOFRuntimeException();
206 
207 	static final class EOFRuntimeException extends RuntimeException {
208 
209 		private static final long serialVersionUID = -6943467453336359472L;
210 
211 	}
212 
213 	// markers must be powers of 2 so that we can detect
214 	// a partial write of the byte (which we will treat as END_OF_QUEUE)
215 	static final byte MARKER_END_OF_QUEUE = 0;
216 	static final byte MARKER_END_OF_FILE = 1;
217 	static final byte MARKER_ITEM_PRESENT = 2;
218 	static final int MARKER_HEADER_SIZE = 1;
219 	static final int UNKNOWN_LENGTH = 0;
220 
221 	public T poll() {
222 		int position = read.position();
223 		byte marker;
224 		synchronized (markerLock) {
225 			marker = read.get();
226 		}
227 		if (marker == MARKER_END_OF_QUEUE) {
228 			read.position(position);
229 			return null;
230 		} else if (marker == MARKER_END_OF_FILE) {
231 			throw EOF;
232 		} else if (marker == MARKER_ITEM_PRESENT) {
233 			try {
234 				T t = serializer.deserialize(input);
235 				if (t == null) {
236 					// this is a trick that we can get away with due to type
237 					// erasure in java as long as the return value of poll() is
238 					// checked using NullSentinel.isNullSentinel(t) (?)
239 					return NullSentinel.instance();
240 				} else {
241 					return t;
242 				}
243 			} catch (IOException e) {
244 				throw new RuntimeException(e);
245 			}
246 		} else {
247 			throw new RuntimeException("unexpected");
248 		}
249 	}
250 
251 	/**
252 	 * Returns true if value written to file or false if not enough space
253 	 * (writes and end-of-file marker in the fixed-length memory mapped file).
254 	 * 
255 	 * @param t
256 	 *            value to write to the serialized queue
257 	 * @return true if written, false if not enough space
258 	 */
259 	public boolean offer(T t) {
260 		// the current position will be just past the length bytes for this
261 		// item (length bytes will be 0 at the moment)
262 		int serializedLength = serializer.size();
263 		if (serializedLength == UNKNOWN_LENGTH) {
264 			return offerUnknownLength(t);
265 		} else {
266 			return offerKnownLength(t, serializedLength);
267 		}
268 	}
269 
270 	private boolean offerKnownLength(T t, int serializedLength) {
271 		try {
272 			if (notEnoughSpace(serializedLength)) {
273 				markFileAsCompletedAndClose();
274 				return false;
275 			}
276 			int position = write.position();
277 			// serialize the object t to the file
278 			serializer.serialize(output, t);
279 			int length = write.position() - position;
280 			checkLength(serializedLength, length);
281 			updateMarkers(serializedLength);
282 			return true;
283 		} catch (IOException e) {
284 			throw new RuntimeException(e);
285 		}
286 	}
287 
288 	private boolean offerUnknownLength(T t) {
289 		try {
290 			bytes.reset();
291 			// serialize to an in-memory buffer to calculate length
292 			serializer.serialize(buffer, t);
293 			int serializedLength = bytes.size();
294 			if (notEnoughSpace(serializedLength)) {
295 				markFileAsCompletedAndClose();
296 				return false;
297 			} else {
298 				write.put(bytes.toByteArrayNoCopy(), 0, bytes.size());
299 				updateMarkers(serializedLength);
300 				return true;
301 			}
302 		} catch (IOException e) {
303 			throw new RuntimeException(e);
304 		}
305 	}
306 
307 	private void checkLength(int serializedLength, int length) {
308 		if (length > serializedLength) {
309 			throw new IllegalArgumentException(
310 					"serialized length of value being offered to file queue was greater than serializer.size() value (which was non-zero)");
311 		}
312 	}
313 
314 	private void markFileAsCompletedAndClose() {
315 		write.position(write.position() - MARKER_HEADER_SIZE);
316 		synchronized (markerLock) {
317 			write.put(MARKER_END_OF_FILE);
318 		}
319 		closeForWrite();
320 	}
321 
322 	private boolean notEnoughSpace(int serializedLength) {
323 		if (serializedLength > fileSize - 2 * MARKER_HEADER_SIZE)
324 			throw new RuntimeException("serialized length is larger than can fit in one file");
325 		return serializedLength + MARKER_HEADER_SIZE > write.remaining();
326 	}
327 
328 	private void updateMarkers(int serializedLength) throws IOException {
329 		// write the marker for the next item
330 		write.put(MARKER_END_OF_QUEUE);
331 		// remember the position where the next write starts
332 		int newWritePosition = write.position();
333 		// rewind and update the length for the current item
334 		write.position(write.position() - serializedLength - 2 * MARKER_HEADER_SIZE);
335 		// now indicate to the reader that it can read this item
336 		synchronized (markerLock) {
337 			write.put(MARKER_ITEM_PRESENT);
338 		}
339 		// and update the position to the write position for the
340 		// next item
341 		write.position(newWritePosition);
342 	}
343 
344 	public void close() {
345 		try {
346 			f.close();
347 		} catch (IOException e) {
348 			throw new RuntimeException(e);
349 		}
350 	}
351 
352 }