View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.io.File;
4   import java.util.ArrayDeque;
5   import java.util.Collection;
6   import java.util.Deque;
7   import java.util.Iterator;
8   import java.util.LinkedList;
9   import java.util.Queue;
10  import java.util.concurrent.atomic.AtomicInteger;
11  import java.util.concurrent.atomic.AtomicLong;
12  
13  import com.github.davidmoten.rx.buffertofile.DataSerializer;
14  import com.github.davidmoten.rx.internal.operators.FileBasedSPSCQueueMemoryMappedReaderWriter.EOFRuntimeException;
15  import com.github.davidmoten.util.Preconditions;
16  
17  import rx.functions.Func0;
18  
19  public final class FileBasedSPSCQueueMemoryMapped<T> implements QueueWithSubscription<T> {
20  
21      private final Queue<FileBasedSPSCQueueMemoryMappedReaderWriter<T>> inactive = new LinkedList<FileBasedSPSCQueueMemoryMappedReaderWriter<T>>();
22      private final Deque<FileBasedSPSCQueueMemoryMappedReaderWriter<T>> toRead = new ArrayDeque<FileBasedSPSCQueueMemoryMappedReaderWriter<T>>();
23      private final Object lock = new Object();
24      private final Func0<File> factory;
25      private final int size;
26      // only needs to be visible to thread calling poll()
27      private FileBasedSPSCQueueMemoryMappedReaderWriter<T> reader;
28      // only needs to be visible to thread calling offer()
29      private FileBasedSPSCQueueMemoryMappedReaderWriter<T> writer;
30      private final AtomicInteger wip = new AtomicInteger();
31      private volatile boolean unsubscribed = false;
32      private final AtomicLong count = new AtomicLong();
33  
34      private final DataSerializer<T> serializer;
35  
36      public FileBasedSPSCQueueMemoryMapped(Func0<File> factory, int size,
37              DataSerializer<T> serializer) {
38          Preconditions.checkNotNull(factory);
39          Preconditions.checkNotNull(serializer);
40          this.factory = factory;
41          this.size = size;
42          this.serializer = serializer;
43          File file = factory.call();
44          this.writer = new FileBasedSPSCQueueMemoryMappedReaderWriter<T>(file, size, serializer);
45          this.reader = writer.openForWrite().openForRead();
46          // store store barrier
47          wip.lazySet(0);
48      }
49  
50      @Override
51      public void unsubscribe() {
52          wip.incrementAndGet();
53          unsubscribed = true;
54          checkUnsubscribe();
55      }
56  
57      @Override
58      public boolean isUnsubscribed() {
59          return unsubscribed;
60      }
61  
62      @Override
63      public synchronized boolean offer(T t) {
64          // thread safe with poll() and unsubscribe()
65          try {
66              wip.incrementAndGet();
67              if (unsubscribed)
68                  return true;
69              if (!writer.offer(t)) {
70                  // note that writer will be in a closed state if we follow this
71                  // path
72                  FileBasedSPSCQueueMemoryMappedReaderWriter<T> nextWriter;
73                  synchronized (lock) {
74                      nextWriter = inactive.poll();
75                      if (nextWriter == null) {
76                          nextWriter = new FileBasedSPSCQueueMemoryMappedReaderWriter<T>(
77                                  factory.call(), size, serializer);
78                      }
79                      toRead.offerLast(nextWriter);
80                      nextWriter.openForWrite();
81                  }
82                  writer = nextWriter;
83                  return writer.offer(t);
84              } else {
85                  return true;
86              }
87          } finally {
88              checkUnsubscribe();
89              count.incrementAndGet();
90          }
91      }
92  
93      private void checkUnsubscribe() {
94          // single ampersand because we must call wip.decrementAndGet
95          if (unsubscribed & wip.decrementAndGet() == 0) {
96              close();
97          }
98      }
99  
100     private void close() {
101         writer.close();
102         reader.close();
103     }
104 
105     @Override
106     public synchronized T poll() {
107         T value = null;
108         // thread safe with offer() and unsubscribe()
109         try {
110             wip.incrementAndGet();
111             if (unsubscribed)
112                 return null;
113             value = reader.poll();
114             return value;
115         } catch (EOFRuntimeException e) {
116             FileBasedSPSCQueueMemoryMappedReaderWriter<T> nextReader;
117             synchronized (lock) {
118                 if (toRead.isEmpty()) {
119                     return null;
120                 } else {
121                     nextReader = toRead.pollFirst();
122                 }
123                 reader.closeForRead();
124                 inactive.offer(reader);
125             }
126             reader = nextReader;
127             reader.openForRead();
128             value = reader.poll();
129             return value;
130         } finally {
131             checkUnsubscribe();
132             if (value != null) {
133                 count.decrementAndGet();
134             }
135         }
136     }
137 
138     @Override
139     public int size() {
140         throw new UnsupportedOperationException();
141     }
142 
143     @Override
144     public boolean isEmpty() {
145         return count.get() == 0;
146     }
147 
148     @Override
149     public boolean contains(Object o) {
150         throw new UnsupportedOperationException();
151     }
152 
153     @Override
154     public Iterator<T> iterator() {
155         throw new UnsupportedOperationException();
156     }
157 
158     @Override
159     public Object[] toArray() {
160         throw new UnsupportedOperationException();
161     }
162 
163     @SuppressWarnings("hiding")
164     @Override
165     public <T> T[] toArray(T[] a) {
166         throw new UnsupportedOperationException();
167     }
168 
169     @Override
170     public boolean remove(Object o) {
171         throw new UnsupportedOperationException();
172     }
173 
174     @Override
175     public boolean containsAll(Collection<?> c) {
176         throw new UnsupportedOperationException();
177     }
178 
179     @Override
180     public boolean addAll(Collection<? extends T> c) {
181         throw new UnsupportedOperationException();
182     }
183 
184     @Override
185     public boolean removeAll(Collection<?> c) {
186         throw new UnsupportedOperationException();
187     }
188 
189     @Override
190     public boolean retainAll(Collection<?> c) {
191         throw new UnsupportedOperationException();
192     }
193 
194     @Override
195     public void clear() {
196         throw new UnsupportedOperationException();
197     }
198 
199     @Override
200     public boolean add(T e) {
201         throw new UnsupportedOperationException();
202     }
203 
204     @Override
205     public T remove() {
206         throw new UnsupportedOperationException();
207     }
208 
209     @Override
210     public T element() {
211         throw new UnsupportedOperationException();
212     }
213 
214     @Override
215     public T peek() {
216         throw new UnsupportedOperationException();
217     }
218 
219 }