View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable.buffertofile;
2   
3   import java.io.File;
4   import java.util.concurrent.Callable;
5   
6   import com.github.davidmoten.guavamini.Preconditions;
7   
8   import io.reactivex.internal.fuseable.SimplePlainQueue;
9   import io.reactivex.internal.queue.SpscLinkedArrayQueue;
10  
11  public final class Pages {
12  
13      private static final boolean CHECK = false;
14  
15      private static final int QUEUE_INITIAL_CAPACITY = 16;
16      private static final byte[] EMPTY = new byte[0];
17  
18      private final Callable<File> fileFactory;
19      private final int pageSize;
20  
21      // read queue must be SPSC because is added to from the write thread
22      private final SimplePlainQueue<Page> queue = new SpscLinkedArrayQueue<Page>(QUEUE_INITIAL_CAPACITY);
23  
24      Page writePage;
25      int writePosition;
26  
27      Page readPage;
28      int readPosition;
29  
30      Page markPage;
31      int markPosition;
32  
33      public Pages(Callable<File> fileFactory, int pageSize) {
34          Preconditions.checkArgument(pageSize >= 4);
35          Preconditions.checkArgument(pageSize % 4 == 0);
36          this.fileFactory = fileFactory;
37          this.pageSize = pageSize;
38      }
39  
40      public int avail() {
41          return writePage().avail(writePosition);
42      }
43  
44      public void markForRewriteAndAdvance4Bytes() {
45          markPage = writePage();
46          markPosition = writePosition;
47          writePosition += 4;
48          // putInt(markPage, 0);
49      }
50  
51      public void putInt(int value) {
52          putInt(writePage(), value);
53      }
54  
55      private void putInt(Page page, int value) {
56          if (CHECK) {
57              int avail = page.length() - writePosition;
58              if (avail < 0)
59                  throw new RuntimeException("unexpected");
60          }
61          page.putInt(writePosition, value);
62          writePosition += 4;
63      }
64  
65      public void put(byte[] bytes, int offset, int length) {
66          Page page = writePage();
67          if (CHECK) {
68              if (length == 0)
69                  throw new IllegalArgumentException();
70              int avail = page.length() - writePosition;
71              if (avail < 0)
72                  throw new RuntimeException("unexpected");
73          }
74          page.put(writePosition, bytes, offset, length);
75          writePosition += length;
76      }
77  
78      public void putIntOrderedAtRewriteMark(int value) {
79          // if there is any space at all in current page then it will be enough
80          // for 4 bytes because we pad all offerings to the queue
81          markPage.putIntOrdered(markPosition, value);
82          markPage = null;
83      }
84  
85      private Page writePage() {
86          if (writePage == null || writePosition == pageSize) {
87              createNewPage();
88          }
89          return writePage;
90      }
91  
92      private void createNewPage() {
93          File file;
94          try {
95              file = fileFactory.call();
96          } catch (Exception e) {
97              throw new RuntimeException(e);
98          }
99          writePage = new Page(file, pageSize);
100         writePosition = 0;
101         queue.offer(writePage);
102         // System.out.println(Thread.currentThread().getName() + ": created
103         // page "
104         // + currentWritePage.hashCode());
105     }
106 
107     public int getInt() {
108         if (readPage() == null) {
109             return -1;
110         }
111         int rp = readPosition;
112         if (CHECK) {
113             int avail = readPage.length() - rp;
114             if (avail < 4)
115                 throw new RuntimeException("unexpected");
116         }
117         readPosition = rp + 4;
118         return readPage.getInt(rp);
119     }
120 
121     public byte[] get(int length) {
122         byte[] result = new byte[length];
123         if (readPage() == null) {
124             return EMPTY;
125         }
126         if (CHECK) {
127             int avail = readPage.length() - readPosition;
128             if (avail < length)
129                 throw new RuntimeException("unexpected");
130         }
131         readPage.get(result, 0, readPosition, length);
132         readPosition += length;
133         return result;
134     }
135 
136     private Page readPage() {
137         if (readPage == null || readPosition >= pageSize) {
138             if (readPage != null) {
139                 readPage.close();
140             }
141             readPage = queue.poll();
142             readPosition = readPosition % pageSize;
143         }
144         return readPage;
145     }
146 
147     public void putByte(byte b) {
148         Page page = writePage();
149         if (CHECK) {
150             int avail = page.length() - writePosition;
151             if (avail < 0)
152                 throw new RuntimeException("unexpected");
153         }
154         page.putByte(writePosition, b);
155         writePosition += 1;
156     }
157 
158     public byte getByte() {
159         Page page = readPage();
160         if (CHECK) {
161             int avail = page.length() - readPosition;
162             if (avail < 1)
163                 throw new RuntimeException("unexpected");
164         }
165         byte result = page.getByte(readPosition);
166         readPosition += 1;
167         return result;
168     }
169 
170     public void moveReadPosition(int forward) {
171         readPosition += forward;
172     }
173 
174     public int getIntVolatile() {
175         if (readPage() == null) {
176             return -1;
177         } else {
178             int result = readPage.getIntVolatile(readPosition);
179             readPosition += 4;
180             return result;
181         }
182     }
183 
184     public void moveWritePosition(int forward) {
185         writePosition += forward;
186     }
187 
188     public void close() {
189         // called from read thread
190         if (readPage != null) {
191             readPage.close();
192             readPage = null;
193         }
194         Page page;
195         while ((page = queue.poll()) != null) {
196             page.close();
197         }
198     }
199 
200 }