1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.io.DataInputStream;
4 import java.io.DataOutput;
5 import java.io.DataOutputStream;
6 import java.io.File;
7 import java.io.IOException;
8 import java.io.InputStream;
9 import java.io.OutputStream;
10 import java.io.RandomAccessFile;
11 import java.nio.MappedByteBuffer;
12 import java.nio.channels.FileChannel;
13 import java.nio.channels.FileChannel.MapMode;
14 import java.util.concurrent.atomic.AtomicInteger;
15
16 import com.github.davidmoten.rx.buffertofile.DataSerializer;
17 import com.github.davidmoten.util.ByteArrayOutputStreamNoCopyUnsynchronized;
18 import com.github.davidmoten.util.Preconditions;
19
20 public class FileBasedSPSCQueueMemoryMappedReaderWriter<T> {
21
22 private volatile RandomAccessFile f;
23 private volatile FileChannel channel;
24 private DataInputStream input;
25 private DataOutputStream output;
26 private MappedByteBuffer read;
27 private MappedByteBuffer write;
28 private final DataSerializer<T> serializer;
29 private final File file;
30 private final int fileSize;
31 private final DataOutput buffer;
32
33 private final ByteArrayOutputStreamNoCopyUnsynchronized bytes;
34 private final AtomicInteger status = new AtomicInteger(WRITTEN_READ);
35 private final Object markerLock = new Object();
36
37 static final int WRITTEN_READ = 0;
38 static final int WRITTEN_READ_NOT_STARTED = 1;
39 static final int WRITTEN_READING = 2;
40 static final int WRITING_NOT_READING = 3;
41 static final int WRITING_READING = 4;
42
43 public FileBasedSPSCQueueMemoryMappedReaderWriter(File file, int fileSize, DataSerializer<T> serializer) {
44 Preconditions.checkArgument(serializer.size() == 0 || serializer.size() <= fileSize - 2 * MARKER_HEADER_SIZE,
45 "serializer.size() must be less than or equal to file based queue size - 2");
46 this.file = file;
47 this.serializer = serializer;
48 this.fileSize = fileSize;
49 this.bytes = new ByteArrayOutputStreamNoCopyUnsynchronized();
50 this.buffer = new DataOutputStream(bytes);
51 }
52
53 public FileBasedSPSCQueueMemoryMappedReaderWriter<T> openForRead() {
54
55
56 if (!status.compareAndSet(WRITTEN_READ_NOT_STARTED, WRITTEN_READING))
57 status.compareAndSet(WRITING_NOT_READING, WRITING_READING);
58 while (true) {
59 int st = status.get();
60 int newStatus;
61 if (st == WRITTEN_READ_NOT_STARTED)
62 newStatus = WRITTEN_READING;
63 else if (st == WRITING_NOT_READING)
64 newStatus = WRITING_READING;
65 else
66 newStatus = st;
67 if (status.compareAndSet(st, newStatus)) {
68 checkClose(newStatus);
69 break;
70 }
71 }
72 try {
73 if (f == null) {
74 f = new RandomAccessFile(file, "r");
75 }
76 if (channel == null) {
77 channel = f.getChannel();
78 }
79 read = channel.map(MapMode.READ_ONLY, 0, channel.size());
80 input = new DataInputStream(new MappedByteBufferInputStream(read));
81
82 } catch (IOException e) {
83 throw new RuntimeException(e);
84 }
85 return this;
86 }
87
88 public void closeForRead() {
89
90 while (true) {
91 int st = status.get();
92 int newStatus;
93 if (st == WRITTEN_READING)
94 newStatus = WRITTEN_READ;
95 else
96 newStatus = st;
97 if (status.compareAndSet(st, newStatus)) {
98 checkClose(newStatus);
99 break;
100 }
101 }
102 }
103
104 public FileBasedSPSCQueueMemoryMappedReaderWriter<T> openForWrite() {
105
106 while (true) {
107 int st = status.get();
108 int newStatus;
109 if (st == WRITTEN_READ)
110 newStatus = WRITING_NOT_READING;
111 else
112 newStatus = st;
113 if (status.compareAndSet(st, newStatus)) {
114 checkClose(newStatus);
115 break;
116 }
117 }
118 try {
119 if (f == null) {
120 f = new RandomAccessFile(file, "rw");
121 }
122 if (channel == null) {
123 channel = f.getChannel();
124 }
125 write = channel.map(MapMode.READ_WRITE, 0, fileSize);
126 output = new DataOutputStream(new MappedByteBufferOutputStream(write));
127 synchronized (markerLock) {
128 output.write(MARKER_END_OF_QUEUE);
129 }
130
131 return this;
132 } catch (IOException e) {
133 throw new RuntimeException(e);
134 }
135 }
136
137 public void closeForWrite() {
138
139 while (true) {
140 int st = status.get();
141 int newStatus;
142 if (st == WRITING_READING)
143 newStatus = WRITTEN_READING;
144 else if (st == WRITING_NOT_READING)
145 newStatus = WRITTEN_READ_NOT_STARTED;
146 newStatus = st;
147 if (status.compareAndSet(st, newStatus)) {
148 checkClose(newStatus);
149 break;
150 }
151 }
152
153 }
154
155 private void checkClose(int newStatus) {
156
157
158 if (newStatus == WRITTEN_READ) {
159 try {
160 channel.close();
161 channel = null;
162 read = null;
163 input = null;
164 f.close();
165 f = null;
166 } catch (IOException e) {
167 throw new RuntimeException(e);
168 }
169 }
170 }
171
172 private static class MappedByteBufferOutputStream extends OutputStream {
173
174 private final MappedByteBuffer write;
175
176 MappedByteBufferOutputStream(MappedByteBuffer write) {
177 this.write = write;
178 }
179
180 @Override
181 public void write(int b) throws IOException {
182 write.put((byte) b);
183 }
184 }
185
186 private static class MappedByteBufferInputStream extends InputStream {
187
188 private final MappedByteBuffer read;
189
190 MappedByteBufferInputStream(MappedByteBuffer read) {
191 this.read = read;
192 }
193
194 @Override
195 public int read() throws IOException {
196 return toUnsignedInteger(read.get());
197 }
198
199 }
200
201 private static int toUnsignedInteger(byte b) {
202 return b & 0x000000FF;
203 }
204
205 private static final EOFRuntimeException EOF = new EOFRuntimeException();
206
207 static final class EOFRuntimeException extends RuntimeException {
208
209 private static final long serialVersionUID = -6943467453336359472L;
210
211 }
212
213
214
215 static final byte MARKER_END_OF_QUEUE = 0;
216 static final byte MARKER_END_OF_FILE = 1;
217 static final byte MARKER_ITEM_PRESENT = 2;
218 static final int MARKER_HEADER_SIZE = 1;
219 static final int UNKNOWN_LENGTH = 0;
220
221 public T poll() {
222 int position = read.position();
223 byte marker;
224 synchronized (markerLock) {
225 marker = read.get();
226 }
227 if (marker == MARKER_END_OF_QUEUE) {
228 read.position(position);
229 return null;
230 } else if (marker == MARKER_END_OF_FILE) {
231 throw EOF;
232 } else if (marker == MARKER_ITEM_PRESENT) {
233 try {
234 T t = serializer.deserialize(input);
235 if (t == null) {
236
237
238
239 return NullSentinel.instance();
240 } else {
241 return t;
242 }
243 } catch (IOException e) {
244 throw new RuntimeException(e);
245 }
246 } else {
247 throw new RuntimeException("unexpected");
248 }
249 }
250
251
252
253
254
255
256
257
258
259 public boolean offer(T t) {
260
261
262 int serializedLength = serializer.size();
263 if (serializedLength == UNKNOWN_LENGTH) {
264 return offerUnknownLength(t);
265 } else {
266 return offerKnownLength(t, serializedLength);
267 }
268 }
269
270 private boolean offerKnownLength(T t, int serializedLength) {
271 try {
272 if (notEnoughSpace(serializedLength)) {
273 markFileAsCompletedAndClose();
274 return false;
275 }
276 int position = write.position();
277
278 serializer.serialize(output, t);
279 int length = write.position() - position;
280 checkLength(serializedLength, length);
281 updateMarkers(serializedLength);
282 return true;
283 } catch (IOException e) {
284 throw new RuntimeException(e);
285 }
286 }
287
288 private boolean offerUnknownLength(T t) {
289 try {
290 bytes.reset();
291
292 serializer.serialize(buffer, t);
293 int serializedLength = bytes.size();
294 if (notEnoughSpace(serializedLength)) {
295 markFileAsCompletedAndClose();
296 return false;
297 } else {
298 write.put(bytes.toByteArrayNoCopy(), 0, bytes.size());
299 updateMarkers(serializedLength);
300 return true;
301 }
302 } catch (IOException e) {
303 throw new RuntimeException(e);
304 }
305 }
306
307 private void checkLength(int serializedLength, int length) {
308 if (length > serializedLength) {
309 throw new IllegalArgumentException(
310 "serialized length of value being offered to file queue was greater than serializer.size() value (which was non-zero)");
311 }
312 }
313
314 private void markFileAsCompletedAndClose() {
315 write.position(write.position() - MARKER_HEADER_SIZE);
316 synchronized (markerLock) {
317 write.put(MARKER_END_OF_FILE);
318 }
319 closeForWrite();
320 }
321
322 private boolean notEnoughSpace(int serializedLength) {
323 if (serializedLength > fileSize - 2 * MARKER_HEADER_SIZE)
324 throw new RuntimeException("serialized length is larger than can fit in one file");
325 return serializedLength + MARKER_HEADER_SIZE > write.remaining();
326 }
327
328 private void updateMarkers(int serializedLength) throws IOException {
329
330 write.put(MARKER_END_OF_QUEUE);
331
332 int newWritePosition = write.position();
333
334 write.position(write.position() - serializedLength - 2 * MARKER_HEADER_SIZE);
335
336 synchronized (markerLock) {
337 write.put(MARKER_ITEM_PRESENT);
338 }
339
340
341 write.position(newWritePosition);
342 }
343
344 public void close() {
345 try {
346 f.close();
347 } catch (IOException e) {
348 throw new RuntimeException(e);
349 }
350 }
351
352 }