1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.io.File;
4 import java.util.ArrayDeque;
5 import java.util.Collection;
6 import java.util.Deque;
7 import java.util.Iterator;
8 import java.util.LinkedList;
9 import java.util.Queue;
10 import java.util.concurrent.atomic.AtomicInteger;
11 import java.util.concurrent.atomic.AtomicLong;
12
13 import com.github.davidmoten.rx.buffertofile.DataSerializer;
14 import com.github.davidmoten.rx.internal.operators.FileBasedSPSCQueueMemoryMappedReaderWriter.EOFRuntimeException;
15 import com.github.davidmoten.util.Preconditions;
16
17 import rx.functions.Func0;
18
19 public final class FileBasedSPSCQueueMemoryMapped<T> implements QueueWithSubscription<T> {
20
21 private final Queue<FileBasedSPSCQueueMemoryMappedReaderWriter<T>> inactive = new LinkedList<FileBasedSPSCQueueMemoryMappedReaderWriter<T>>();
22 private final Deque<FileBasedSPSCQueueMemoryMappedReaderWriter<T>> toRead = new ArrayDeque<FileBasedSPSCQueueMemoryMappedReaderWriter<T>>();
23 private final Object lock = new Object();
24 private final Func0<File> factory;
25 private final int size;
26
27 private FileBasedSPSCQueueMemoryMappedReaderWriter<T> reader;
28
29 private FileBasedSPSCQueueMemoryMappedReaderWriter<T> writer;
30 private final AtomicInteger wip = new AtomicInteger();
31 private volatile boolean unsubscribed = false;
32 private final AtomicLong count = new AtomicLong();
33
34 private final DataSerializer<T> serializer;
35
36 public FileBasedSPSCQueueMemoryMapped(Func0<File> factory, int size,
37 DataSerializer<T> serializer) {
38 Preconditions.checkNotNull(factory);
39 Preconditions.checkNotNull(serializer);
40 this.factory = factory;
41 this.size = size;
42 this.serializer = serializer;
43 File file = factory.call();
44 this.writer = new FileBasedSPSCQueueMemoryMappedReaderWriter<T>(file, size, serializer);
45 this.reader = writer.openForWrite().openForRead();
46
47 wip.lazySet(0);
48 }
49
50 @Override
51 public void unsubscribe() {
52 wip.incrementAndGet();
53 unsubscribed = true;
54 checkUnsubscribe();
55 }
56
57 @Override
58 public boolean isUnsubscribed() {
59 return unsubscribed;
60 }
61
62 @Override
63 public synchronized boolean offer(T t) {
64
65 try {
66 wip.incrementAndGet();
67 if (unsubscribed)
68 return true;
69 if (!writer.offer(t)) {
70
71
72 FileBasedSPSCQueueMemoryMappedReaderWriter<T> nextWriter;
73 synchronized (lock) {
74 nextWriter = inactive.poll();
75 if (nextWriter == null) {
76 nextWriter = new FileBasedSPSCQueueMemoryMappedReaderWriter<T>(
77 factory.call(), size, serializer);
78 }
79 toRead.offerLast(nextWriter);
80 nextWriter.openForWrite();
81 }
82 writer = nextWriter;
83 return writer.offer(t);
84 } else {
85 return true;
86 }
87 } finally {
88 checkUnsubscribe();
89 count.incrementAndGet();
90 }
91 }
92
93 private void checkUnsubscribe() {
94
95 if (unsubscribed & wip.decrementAndGet() == 0) {
96 close();
97 }
98 }
99
100 private void close() {
101 writer.close();
102 reader.close();
103 }
104
105 @Override
106 public synchronized T poll() {
107 T value = null;
108
109 try {
110 wip.incrementAndGet();
111 if (unsubscribed)
112 return null;
113 value = reader.poll();
114 return value;
115 } catch (EOFRuntimeException e) {
116 FileBasedSPSCQueueMemoryMappedReaderWriter<T> nextReader;
117 synchronized (lock) {
118 if (toRead.isEmpty()) {
119 return null;
120 } else {
121 nextReader = toRead.pollFirst();
122 }
123 reader.closeForRead();
124 inactive.offer(reader);
125 }
126 reader = nextReader;
127 reader.openForRead();
128 value = reader.poll();
129 return value;
130 } finally {
131 checkUnsubscribe();
132 if (value != null) {
133 count.decrementAndGet();
134 }
135 }
136 }
137
138 @Override
139 public int size() {
140 throw new UnsupportedOperationException();
141 }
142
143 @Override
144 public boolean isEmpty() {
145 return count.get() == 0;
146 }
147
148 @Override
149 public boolean contains(Object o) {
150 throw new UnsupportedOperationException();
151 }
152
153 @Override
154 public Iterator<T> iterator() {
155 throw new UnsupportedOperationException();
156 }
157
158 @Override
159 public Object[] toArray() {
160 throw new UnsupportedOperationException();
161 }
162
163 @SuppressWarnings("hiding")
164 @Override
165 public <T> T[] toArray(T[] a) {
166 throw new UnsupportedOperationException();
167 }
168
169 @Override
170 public boolean remove(Object o) {
171 throw new UnsupportedOperationException();
172 }
173
174 @Override
175 public boolean containsAll(Collection<?> c) {
176 throw new UnsupportedOperationException();
177 }
178
179 @Override
180 public boolean addAll(Collection<? extends T> c) {
181 throw new UnsupportedOperationException();
182 }
183
184 @Override
185 public boolean removeAll(Collection<?> c) {
186 throw new UnsupportedOperationException();
187 }
188
189 @Override
190 public boolean retainAll(Collection<?> c) {
191 throw new UnsupportedOperationException();
192 }
193
194 @Override
195 public void clear() {
196 throw new UnsupportedOperationException();
197 }
198
199 @Override
200 public boolean add(T e) {
201 throw new UnsupportedOperationException();
202 }
203
204 @Override
205 public T remove() {
206 throw new UnsupportedOperationException();
207 }
208
209 @Override
210 public T element() {
211 throw new UnsupportedOperationException();
212 }
213
214 @Override
215 public T peek() {
216 throw new UnsupportedOperationException();
217 }
218
219 }