1 package com.github.davidmoten.rx2.internal.flowable.buffertofile;
2
3 import java.io.File;
4 import java.util.concurrent.Callable;
5
6 import com.github.davidmoten.guavamini.Preconditions;
7
8 import io.reactivex.internal.fuseable.SimplePlainQueue;
9 import io.reactivex.internal.queue.SpscLinkedArrayQueue;
10
11 public final class Pages {
12
13 private static final boolean CHECK = false;
14
15 private static final int QUEUE_INITIAL_CAPACITY = 16;
16 private static final byte[] EMPTY = new byte[0];
17
18 private final Callable<File> fileFactory;
19 private final int pageSize;
20
21
22 private final SimplePlainQueue<Page> queue = new SpscLinkedArrayQueue<Page>(QUEUE_INITIAL_CAPACITY);
23
24 Page writePage;
25 int writePosition;
26
27 Page readPage;
28 int readPosition;
29
30 Page markPage;
31 int markPosition;
32
33 public Pages(Callable<File> fileFactory, int pageSize) {
34 Preconditions.checkArgument(pageSize >= 4);
35 Preconditions.checkArgument(pageSize % 4 == 0);
36 this.fileFactory = fileFactory;
37 this.pageSize = pageSize;
38 }
39
40 public int avail() {
41 return writePage().avail(writePosition);
42 }
43
44 public void markForRewriteAndAdvance4Bytes() {
45 markPage = writePage();
46 markPosition = writePosition;
47 writePosition += 4;
48
49 }
50
51 public void putInt(int value) {
52 putInt(writePage(), value);
53 }
54
55 private void putInt(Page page, int value) {
56 if (CHECK) {
57 int avail = page.length() - writePosition;
58 if (avail < 0)
59 throw new RuntimeException("unexpected");
60 }
61 page.putInt(writePosition, value);
62 writePosition += 4;
63 }
64
65 public void put(byte[] bytes, int offset, int length) {
66 Page page = writePage();
67 if (CHECK) {
68 if (length == 0)
69 throw new IllegalArgumentException();
70 int avail = page.length() - writePosition;
71 if (avail < 0)
72 throw new RuntimeException("unexpected");
73 }
74 page.put(writePosition, bytes, offset, length);
75 writePosition += length;
76 }
77
78 public void putIntOrderedAtRewriteMark(int value) {
79
80
81 markPage.putIntOrdered(markPosition, value);
82 markPage = null;
83 }
84
85 private Page writePage() {
86 if (writePage == null || writePosition == pageSize) {
87 createNewPage();
88 }
89 return writePage;
90 }
91
92 private void createNewPage() {
93 File file;
94 try {
95 file = fileFactory.call();
96 } catch (Exception e) {
97 throw new RuntimeException(e);
98 }
99 writePage = new Page(file, pageSize);
100 writePosition = 0;
101 queue.offer(writePage);
102
103
104
105 }
106
107 public int getInt() {
108 if (readPage() == null) {
109 return -1;
110 }
111 int rp = readPosition;
112 if (CHECK) {
113 int avail = readPage.length() - rp;
114 if (avail < 4)
115 throw new RuntimeException("unexpected");
116 }
117 readPosition = rp + 4;
118 return readPage.getInt(rp);
119 }
120
121 public byte[] get(int length) {
122 byte[] result = new byte[length];
123 if (readPage() == null) {
124 return EMPTY;
125 }
126 if (CHECK) {
127 int avail = readPage.length() - readPosition;
128 if (avail < length)
129 throw new RuntimeException("unexpected");
130 }
131 readPage.get(result, 0, readPosition, length);
132 readPosition += length;
133 return result;
134 }
135
136 private Page readPage() {
137 if (readPage == null || readPosition >= pageSize) {
138 if (readPage != null) {
139 readPage.close();
140 }
141 readPage = queue.poll();
142 readPosition = readPosition % pageSize;
143 }
144 return readPage;
145 }
146
147 public void putByte(byte b) {
148 Page page = writePage();
149 if (CHECK) {
150 int avail = page.length() - writePosition;
151 if (avail < 0)
152 throw new RuntimeException("unexpected");
153 }
154 page.putByte(writePosition, b);
155 writePosition += 1;
156 }
157
158 public byte getByte() {
159 Page page = readPage();
160 if (CHECK) {
161 int avail = page.length() - readPosition;
162 if (avail < 1)
163 throw new RuntimeException("unexpected");
164 }
165 byte result = page.getByte(readPosition);
166 readPosition += 1;
167 return result;
168 }
169
170 public void moveReadPosition(int forward) {
171 readPosition += forward;
172 }
173
174 public int getIntVolatile() {
175 if (readPage() == null) {
176 return -1;
177 } else {
178 int result = readPage.getIntVolatile(readPosition);
179 readPosition += 4;
180 return result;
181 }
182 }
183
184 public void moveWritePosition(int forward) {
185 writePosition += forward;
186 }
187
188 public void close() {
189
190 if (readPage != null) {
191 readPage.close();
192 readPage = null;
193 }
194 Page page;
195 while ((page = queue.poll()) != null) {
196 page.close();
197 }
198 }
199
200 }