1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.io.DataInputStream;
4 import java.io.DataOutputStream;
5 import java.io.EOFException;
6 import java.io.File;
7 import java.io.FileNotFoundException;
8 import java.io.IOException;
9 import java.io.InputStream;
10 import java.io.OutputStream;
11 import java.io.RandomAccessFile;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.concurrent.atomic.AtomicLong;
15
16 import com.github.davidmoten.rx.buffertofile.DataSerializer;
17 import com.github.davidmoten.util.Preconditions;
18
19 class FileBasedSPSCQueue<T> implements QueueWithResources<T> {
20
21 final File file;
22 final DataSerializer<T> serializer;
23 final AtomicLong size;
24 final byte[] writeBuffer;
25 final byte[] readBuffer;
26 final Object writeLock = new Object();
27 private final Object accessLock = new Object();
28 private final DataOutputStream output;
29 private final DataInputStream input;
30
31
32
33 int readBufferPosition = 0;
34 long readPosition = 0;
35 int readBufferLength = 0;
36 volatile long writePosition;
37 volatile int writeBufferPosition;
38
39 private FileAccessor accessor;
40 private volatile boolean unsubscribed = false;
41
42 FileBasedSPSCQueue(int bufferSizeBytes, File file, DataSerializer<T> serializer) {
43 Preconditions.checkArgument(bufferSizeBytes > 0, "bufferSizeBytes must be greater than zero");
44 Preconditions.checkNotNull(file);
45 Preconditions.checkNotNull(serializer);
46 this.readBuffer = new byte[bufferSizeBytes];
47 this.writeBuffer = new byte[bufferSizeBytes];
48 try {
49 file.getParentFile().mkdirs();
50 file.createNewFile();
51 this.file = file;
52 } catch (IOException e) {
53 throw new RuntimeException(e);
54 }
55 this.accessor = new FileAccessor(file);
56 this.serializer = serializer;
57 this.size = new AtomicLong(0);
58 this.output = new DataOutputStream(new QueueWriter());
59 this.input = new DataInputStream(new QueueReader());
60 }
61
62 private final static class FileAccessor {
63 final RandomAccessFile fWrite;
64 final RandomAccessFile fRead;
65
66 FileAccessor(File file) {
67 try {
68 this.fWrite = new RandomAccessFile(file, "rw");
69 this.fRead = new RandomAccessFile(file, "r");
70 } catch (FileNotFoundException e) {
71 throw new RuntimeException(e);
72 }
73 }
74
75 public void close() {
76 try {
77 fWrite.close();
78 fRead.close();
79 } catch (IOException e) {
80 throw new RuntimeException(e);
81 }
82 }
83 }
84
85 private final class QueueWriter extends OutputStream {
86
87 @Override
88 public void write(int b) throws IOException {
89
90 int wbp = writeBufferPosition;
91 if (wbp < writeBuffer.length) {
92 writeBuffer[wbp] = (byte) b;
93 writeBufferPosition = wbp + 1;
94 } else
95 synchronized (writeLock) {
96
97 long wp = writePosition;
98 accessor.fWrite.seek(wp);
99 accessor.fWrite.write(writeBuffer);
100 writeBuffer[0] = (byte) b;
101 writeBufferPosition = 1;
102 writePosition = wp + writeBuffer.length;
103 }
104 }
105 }
106
107
108 private static final EOFException EOF = new EOFException();
109
110 private final class QueueReader extends InputStream {
111
112 @Override
113 public int read() throws IOException {
114 if (size.get() == 0) {
115 throw EOF;
116 } else {
117 if (readBufferPosition < readBufferLength) {
118 byte b = readBuffer[readBufferPosition];
119 readBufferPosition++;
120 return toUnsignedInteger(b);
121 } else {
122
123
124
125 while (true) {
126 long wp;
127 int wbp;
128 synchronized (writeLock) {
129 wp = writePosition;
130 wbp = writeBufferPosition;
131 }
132 long over = wp - readPosition;
133 if (over > 0) {
134
135 readBufferLength = (int) Math.min(readBuffer.length, over);
136 synchronized (accessLock) {
137 if (accessor == null) {
138 accessor = new FileAccessor(file);
139 }
140 accessor.fRead.seek(readPosition);
141 accessor.fRead.read(readBuffer, 0, readBufferLength);
142 }
143 readPosition += readBufferLength;
144 readBufferPosition = 1;
145 return toUnsignedInteger(readBuffer[0]);
146 } else {
147
148 int index = -(int) over;
149 if (index >= writeBuffer.length) {
150 throw EOF;
151 } else {
152 int b = toUnsignedInteger(writeBuffer[index]);
153 final boolean writeBufferUnchanged;
154 synchronized (writeLock) {
155 writeBufferUnchanged = wp == writePosition && wbp == writeBufferPosition;
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171 }
172 if (writeBufferUnchanged) {
173 readPosition++;
174 return b;
175 }
176 }
177 }
178 }
179 }
180 }
181 }
182 }
183
184 private static int toUnsignedInteger(byte b) {
185 return b & 0x000000FF;
186 }
187
188 @Override
189 public void unsubscribe() {
190
191 if (unsubscribed) {
192 return;
193 }
194 unsubscribed = true;
195 synchronized (accessLock) {
196 if (accessor != null) {
197 accessor.close();
198 accessor = null;
199 }
200 size.set(0);
201 }
202 if (!file.delete()) {
203 throw new RuntimeException("could not delete file " + file);
204 }
205 }
206
207 @Override
208 public boolean isUnsubscribed() {
209 return unsubscribed;
210 }
211
212 @Override
213 public boolean offer(T t) {
214
215
216
217 try {
218 serializer.serialize(output, t);
219 size.incrementAndGet();
220 return true;
221 } catch (IOException e) {
222 throw new RuntimeException(e);
223 }
224 }
225
226 @Override
227 public T poll() {
228
229
230
231 try {
232 T t = serializer.deserialize(input);
233 size.decrementAndGet();
234 if (t == null) {
235
236
237
238 return NullSentinel.instance();
239 } else {
240 return t;
241 }
242 } catch (EOFException e) {
243 return null;
244 } catch (IOException e) {
245 throw new RuntimeException(e);
246 }
247 }
248
249 @Override
250 public boolean isEmpty() {
251 return size.get() == 0;
252 }
253
254 @Override
255 public void freeResources() {
256 synchronized (accessLock) {
257 if (accessor != null) {
258 accessor.close();
259 }
260 accessor = null;
261 }
262 }
263
264 @Override
265 public long resourcesSize() {
266 return writePosition;
267 }
268
269 @Override
270 public T element() {
271 throw new UnsupportedOperationException();
272 }
273
274 @Override
275 public T peek() {
276 throw new UnsupportedOperationException();
277 }
278
279 @Override
280 public int size() {
281 throw new UnsupportedOperationException();
282 }
283
284 @Override
285 public boolean add(T e) {
286 throw new UnsupportedOperationException();
287 }
288
289 @Override
290 public T remove() {
291 throw new UnsupportedOperationException();
292 }
293
294 @Override
295 public boolean contains(Object o) {
296 throw new UnsupportedOperationException();
297 }
298
299 @Override
300 public Iterator<T> iterator() {
301 throw new UnsupportedOperationException();
302 }
303
304 @Override
305 public Object[] toArray() {
306 throw new UnsupportedOperationException();
307 }
308
309 @SuppressWarnings("hiding")
310 @Override
311 public <T> T[] toArray(T[] a) {
312 throw new UnsupportedOperationException();
313 }
314
315 @Override
316 public boolean remove(Object o) {
317 throw new UnsupportedOperationException();
318 }
319
320 @Override
321 public boolean containsAll(Collection<?> c) {
322 throw new UnsupportedOperationException();
323 }
324
325 @Override
326 public boolean addAll(Collection<? extends T> c) {
327 throw new UnsupportedOperationException();
328 }
329
330 @Override
331 public boolean removeAll(Collection<?> c) {
332 throw new UnsupportedOperationException();
333 }
334
335 @Override
336 public boolean retainAll(Collection<?> c) {
337 throw new UnsupportedOperationException();
338 }
339
340 @Override
341 public void clear() {
342 throw new UnsupportedOperationException();
343 }
344
345 }