1 package com.github.davidmoten.rx2.internal.flowable.buffertofile;
2
3 import java.io.File;
4 import java.nio.ByteOrder;
5 import java.util.concurrent.Callable;
6 import java.util.concurrent.atomic.AtomicInteger;
7
8 import com.github.davidmoten.guavamini.Preconditions;
9
10 @SuppressWarnings("serial")
11
12 public class PagedQueue extends AtomicInteger {
13
14 private static final boolean isLittleEndian = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
15
16 private static final int EXTRA_PADDING_LIMIT = 64;
17 private static final int SIZE_MESSAGE_SIZE_FIELD = 4;
18 private static final int SIZE_PADDING_SIZE_FIELD = 1;
19 private static final int SIZE_MESSAGE_TYPE_FIELD = 1;
20 private static final int ALIGN_BYTES = 4;
21 private static final int MAX_PADDING_PER_FULL_MESSAGE = 32;
22 private static final int SIZE_HEADER_PRIMARY_PART = SIZE_MESSAGE_SIZE_FIELD + SIZE_MESSAGE_TYPE_FIELD
23 + SIZE_PADDING_SIZE_FIELD;
24
25 private final Pages pages;
26
27 private boolean readingFragments;
28 private byte[] messageBytesAccumulated;
29 private int indexBytesAccumulated;
30
31 public PagedQueue(Callable<File> fileFactory, int pageSize) {
32 this.pages = new Pages(fileFactory, pageSize);
33 }
34
35 public void offer(byte[] bytes) {
36 if (getAndIncrement() != 0) {
37 return;
38 }
39 try {
40 int padding = padding(bytes.length);
41 int avail = pages.avail();
42 int fullMessageSize = fullMessageSize(bytes.length, padding);
43
44 int availAfter = avail - fullMessageSize;
45
46 if (availAfter >= 0) {
47 if (availAfter <= MAX_PADDING_PER_FULL_MESSAGE) {
48 padding += availAfter;
49 }
50 writeFullMessage(bytes, padding);
51 } else {
52 writeFragments(bytes, avail);
53 }
54 } finally {
55 decrementAndGet();
56 }
57 }
58
59 private void writeFullMessage(byte[] bytes, int padding) {
60 write(bytes, 0, bytes.length, padding, MessageType.FULL_MESSAGE, bytes.length);
61 }
62
63 private void writeFragments(byte[] bytes, int avail) {
64 int start = 0;
65 int length = bytes.length;
66 do {
67 int extraHeaderBytes = start == 0 ? 4 : 0;
68 int count = Math.min(avail - 8 - extraHeaderBytes, length);
69 int padding = padding(count);
70 int remaining = Math.max(0, avail - count - 6 - padding - extraHeaderBytes);
71 if (remaining <= EXTRA_PADDING_LIMIT)
72 padding += remaining;
73
74
75
76 write(bytes, start, count, padding, MessageType.FRAGMENT, bytes.length);
77 start += count;
78 length -= count;
79 if (length > 0) {
80 avail = pages.avail();
81 }
82 } while (length > 0);
83 }
84
85 private int fullMessageSize(int payloadLength, int padding) {
86 return SIZE_HEADER_PRIMARY_PART + padding + payloadLength;
87 }
88
89 private static int padding(int payloadLength) {
90 int rem = (payloadLength + SIZE_PADDING_SIZE_FIELD + SIZE_MESSAGE_TYPE_FIELD) % ALIGN_BYTES;
91 int padding;
92 if (rem == 0) {
93 padding = 0;
94 } else {
95 padding = ALIGN_BYTES - rem;
96 }
97 return padding;
98 }
99
100 private void write(byte[] bytes, int offset, int length, int padding, final MessageType messageType,
101 int totalLength) {
102 Preconditions.checkArgument(length != 0);
103 pages.markForRewriteAndAdvance4Bytes();
104
105
106
107 if (padding == 2 && isLittleEndian) {
108 pages.putInt(((messageType.value() & 0xFF) << 0) | (((byte) padding)) << 8);
109 } else {
110 pages.putByte(messageType.value());
111 pages.putByte((byte) padding);
112 if (padding > 0) {
113 pages.moveWritePosition(padding);
114 }
115 }
116 if (messageType == MessageType.FRAGMENT && offset == 0) {
117
118 pages.putInt(totalLength);
119 }
120 pages.put(bytes, offset, length);
121
122
123 pages.putIntOrderedAtRewriteMark(length);
124 }
125
126 public byte[] poll() {
127
128 while (true) {
129 int length = pages.getIntVolatile();
130 if (length == 0) {
131
132 pages.moveReadPosition(-4);
133 return null;
134 } else if (length == -1) {
135
136
137 return null;
138 } else {
139 MessageType messageType;
140 byte padding;
141 if (length % 4 == 0 && isLittleEndian) {
142
143 int i = pages.getInt();
144 messageType = MessageType.from((byte) i);
145 padding = (byte) ((i >> 8) & 0xFF);
146 if (padding > 2) {
147 pages.moveReadPosition(padding - 2);
148 }
149 } else {
150
151 messageType = MessageType.from(pages.getByte());
152 padding = pages.getByte();
153 if (padding > 0) {
154 pages.moveReadPosition(padding);
155 }
156 }
157 if (!readingFragments && messageType == MessageType.FRAGMENT) {
158
159 int lengthRemaining = pages.getInt();
160 if (messageBytesAccumulated == null) {
161 messageBytesAccumulated = new byte[lengthRemaining];
162 indexBytesAccumulated = 0;
163 }
164 readingFragments = true;
165 }
166 byte[] result = pages.get(length);
167 if (result.length == 0) {
168 return null;
169 } else {
170 if (readingFragments) {
171 System.arraycopy(result, 0, messageBytesAccumulated, indexBytesAccumulated, result.length);
172 indexBytesAccumulated += result.length;
173 if (indexBytesAccumulated == messageBytesAccumulated.length) {
174 readingFragments = false;
175 byte[] b = messageBytesAccumulated;
176 messageBytesAccumulated = null;
177 return b;
178 }
179 } else {
180 return result;
181 }
182 }
183 }
184 }
185 }
186
187 private void closeWrite() {
188 incrementAndGet();
189 while (!compareAndSet(1, 2))
190 ;
191 }
192
193 public void close() {
194
195
196 closeWrite();
197 pages.close();
198 messageBytesAccumulated = null;
199 }
200
201 private static enum MessageType {
202
203 FULL_MESSAGE(0), FRAGMENT(1);
204
205 private final byte value;
206
207 private MessageType(int value) {
208 this.value = (byte) value;
209 }
210
211 byte value() {
212 return value;
213 }
214
215 static MessageType from(byte b) {
216 if (b == 0)
217 return MessageType.FULL_MESSAGE;
218 else if (b == 1)
219 return MessageType.FRAGMENT;
220 else
221 throw new RuntimeException("unexpected");
222 }
223 }
224
225 }