View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable.buffertofile;
2   
3   import java.io.File;
4   import java.nio.ByteOrder;
5   import java.util.concurrent.Callable;
6   import java.util.concurrent.atomic.AtomicInteger;
7   
8   import com.github.davidmoten.guavamini.Preconditions;
9   
10  @SuppressWarnings("serial")
11  // non-final class for unit testing
12  public class PagedQueue extends AtomicInteger {
13  
14      private static final boolean isLittleEndian = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
15  
16      private static final int EXTRA_PADDING_LIMIT = 64;
17      private static final int SIZE_MESSAGE_SIZE_FIELD = 4;
18      private static final int SIZE_PADDING_SIZE_FIELD = 1;
19      private static final int SIZE_MESSAGE_TYPE_FIELD = 1;
20      private static final int ALIGN_BYTES = 4;
21      private static final int MAX_PADDING_PER_FULL_MESSAGE = 32;
22      private static final int SIZE_HEADER_PRIMARY_PART = SIZE_MESSAGE_SIZE_FIELD + SIZE_MESSAGE_TYPE_FIELD
23              + SIZE_PADDING_SIZE_FIELD;
24  
25      private final Pages pages;
26  
27      private boolean readingFragments;
28      private byte[] messageBytesAccumulated;
29      private int indexBytesAccumulated;
30  
31      public PagedQueue(Callable<File> fileFactory, int pageSize) {
32          this.pages = new Pages(fileFactory, pageSize);
33      }
34  
35      public void offer(byte[] bytes) {
36          if (getAndIncrement() != 0) {
37              return;
38          }
39          try {
40              int padding = padding(bytes.length);
41              int avail = pages.avail();
42              int fullMessageSize = fullMessageSize(bytes.length, padding);
43              // header plus 4 bytes
44              int availAfter = avail - fullMessageSize;
45  
46              if (availAfter >= 0) {
47                  if (availAfter <= MAX_PADDING_PER_FULL_MESSAGE) {
48                      padding += availAfter;
49                  }
50                  writeFullMessage(bytes, padding);
51              } else {
52                  writeFragments(bytes, avail);
53              }
54          } finally {
55              decrementAndGet();
56          }
57      }
58  
59      private void writeFullMessage(byte[] bytes, int padding) {
60          write(bytes, 0, bytes.length, padding, MessageType.FULL_MESSAGE, bytes.length);
61      }
62  
63      private void writeFragments(byte[] bytes, int avail) {
64          int start = 0;
65          int length = bytes.length;
66          do {
67              int extraHeaderBytes = start == 0 ? 4 : 0;
68              int count = Math.min(avail - 8 - extraHeaderBytes, length);
69              int padding = padding(count);
70              int remaining = Math.max(0, avail - count - 6 - padding - extraHeaderBytes);
71              if (remaining <= EXTRA_PADDING_LIMIT)
72                  padding += remaining;
73              // System.out.println(String.format(
74              // "length=%s,start=%s,count=%s,padding=%s,remaining=%s,extraHeaderBytes=%s",
75              // length, start, count, padding, remaining, extraHeaderBytes));
76              write(bytes, start, count, padding, MessageType.FRAGMENT, bytes.length);
77              start += count;
78              length -= count;
79              if (length > 0) {
80                  avail = pages.avail();
81              }
82          } while (length > 0);
83      }
84  
85      private int fullMessageSize(int payloadLength, int padding) {
86          return SIZE_HEADER_PRIMARY_PART + padding + payloadLength;
87      }
88  
89      private static int padding(int payloadLength) {
90          int rem = (payloadLength + SIZE_PADDING_SIZE_FIELD + SIZE_MESSAGE_TYPE_FIELD) % ALIGN_BYTES;
91          int padding;
92          if (rem == 0) {
93              padding = 0;
94          } else {
95              padding = ALIGN_BYTES - rem;
96          }
97          return padding;
98      }
99  
100     private void write(byte[] bytes, int offset, int length, int padding, final MessageType messageType,
101             int totalLength) {
102         Preconditions.checkArgument(length != 0);
103         pages.markForRewriteAndAdvance4Bytes();// messageSize left as 0
104         // storeFence not required at this point like Aeron uses.
105         // UnsafeAccess.unsafe().storeFence();
106         // TODO optimize for BigEndian as well
107         if (padding == 2 && isLittleEndian) {
108             pages.putInt(((messageType.value() & 0xFF) << 0) | (((byte) padding)) << 8);
109         } else {
110             pages.putByte(messageType.value()); // message type
111             pages.putByte((byte) padding);
112             if (padding > 0) {
113                 pages.moveWritePosition(padding);
114             }
115         }
116         if (messageType == MessageType.FRAGMENT && offset == 0) {
117             // first fragment only of a sequence of fragments
118             pages.putInt(totalLength);
119         }
120         pages.put(bytes, offset, length);
121         // now that the message bytes are written we can set the length field in
122         // the header to indicate that the message is ready to be read
123         pages.putIntOrderedAtRewriteMark(length);
124     }
125 
126     public byte[] poll() {
127         // loop here accumulating fragments if necessary
128         while (true) {
129             int length = pages.getIntVolatile();
130             if (length == 0) {
131                 // not ready for read
132                 pages.moveReadPosition(-4);
133                 return null;
134             } else if (length == -1) {
135                 // at end of read queue
136                 // System.out.println("at end of read queue");
137                 return null;
138             } else {
139                 MessageType messageType;
140                 byte padding;
141                 if (length % 4 == 0 && isLittleEndian) {
142                     // read message type and padding in one int read
143                     int i = pages.getInt();
144                     messageType = MessageType.from((byte) i);
145                     padding = (byte) ((i >> 8) & 0xFF);
146                     if (padding > 2) {
147                         pages.moveReadPosition(padding - 2);
148                     }
149                 } else {
150                     // read message type and padding separately
151                     messageType = MessageType.from(pages.getByte());
152                     padding = pages.getByte();
153                     if (padding > 0) {
154                         pages.moveReadPosition(padding);
155                     }
156                 }
157                 if (!readingFragments && messageType == MessageType.FRAGMENT) {
158                     // is first fragment
159                     int lengthRemaining = pages.getInt();
160                     if (messageBytesAccumulated == null) {
161                         messageBytesAccumulated = new byte[lengthRemaining];
162                         indexBytesAccumulated = 0;
163                     }
164                     readingFragments = true;
165                 }
166                 byte[] result = pages.get(length);
167                 if (result.length == 0) {
168                     return null;
169                 } else {
170                     if (readingFragments) {
171                         System.arraycopy(result, 0, messageBytesAccumulated, indexBytesAccumulated, result.length);
172                         indexBytesAccumulated += result.length;
173                         if (indexBytesAccumulated == messageBytesAccumulated.length) {
174                             readingFragments = false;
175                             byte[] b = messageBytesAccumulated;
176                             messageBytesAccumulated = null;
177                             return b;
178                         }
179                     } else {
180                         return result;
181                     }
182                 }
183             }
184         }
185     }
186 
187     private void closeWrite() {
188         incrementAndGet();
189         while (!compareAndSet(1, 2))
190             ;
191     }
192 
193     public void close() {
194         // to get to here no more reads will happen because close is called from
195         // the drain loop
196         closeWrite();
197         pages.close();
198         messageBytesAccumulated = null;
199     }
200 
201     private static enum MessageType {
202 
203         FULL_MESSAGE(0), FRAGMENT(1);
204 
205         private final byte value;
206 
207         private MessageType(int value) {
208             this.value = (byte) value;
209         }
210 
211         byte value() {
212             return value;
213         }
214 
215         static MessageType from(byte b) {
216             if (b == 0)
217                 return MessageType.FULL_MESSAGE;
218             else if (b == 1)
219                 return MessageType.FRAGMENT;
220             else
221                 throw new RuntimeException("unexpected");
222         }
223     }
224 
225 }