View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.util.concurrent.atomic.AtomicBoolean;
4   import java.util.concurrent.atomic.AtomicInteger;
5   
6   /**
7    * Wraps a Queue (like a file based queue) to provide concurrency guarantees
8    * around calls to the close() method. Extends AtomicBoolean to save allocation.
9    * The AtomicBoolean represents the closed status of the queue.
10   * 
11   * @param <T>
12   *            type of item on queue
13   */
14  final class QueueWithResourcesNonBlockingUnsubscribe<T> extends AbstractQueueWithResources<T> {
15  
16      private volatile boolean unsubscribing;
17  
18      // ensures queue.close() doesn't occur until outstanding peek(),offer(),
19      // poll(), isEmpty() calls have finished. When currentCalls is zero a
20      // close request can be actioned.
21      private final AtomicInteger currentCalls = new AtomicInteger(0);
22  
23      private final AtomicBoolean unsubscribed;
24  
25      QueueWithResourcesNonBlockingUnsubscribe(QueueWithResources<T> queue) {
26          super(queue);
27          this.unsubscribing = false;
28          this.unsubscribed = new AtomicBoolean(false);
29      }
30  
31      @Override
32      public T poll() {
33          try {
34              if (unsubscribing) {
35                  return null;
36              } else {
37                  try {
38                      currentCalls.incrementAndGet();
39                      return super.poll();
40                  } finally {
41                      currentCalls.decrementAndGet();
42                  }
43              }
44          } finally {
45              checkUnsubscribe();
46          }
47      }
48  
49      @Override
50      public boolean offer(T t) {
51          try {
52              if (unsubscribing) {
53                  return true;
54              } else {
55                  try {
56                      currentCalls.incrementAndGet();
57                      return super.offer(t);
58                  } finally {
59                      currentCalls.decrementAndGet();
60                  }
61              }
62          } finally {
63              checkUnsubscribe();
64          }
65      }
66  
67      @Override
68      public boolean isEmpty() {
69          try {
70              if (unsubscribing) {
71                  return true;
72              } else {
73                  try {
74                      currentCalls.incrementAndGet();
75                      return super.isEmpty();
76                  } finally {
77                      currentCalls.decrementAndGet();
78                  }
79              }
80          } finally {
81              checkUnsubscribe();
82          }
83     }
84  
85      @Override
86      public void unsubscribe() {
87          unsubscribing = true;
88          checkUnsubscribe();
89      }
90  
91      private void checkUnsubscribe() {
92          if (unsubscribing && currentCalls.get() == 0 && unsubscribed.compareAndSet(false, true)) {
93              super.unsubscribe();
94          }
95      }
96  
97      @Override
98      public void freeResources() {
99          super.freeResources();
100     }
101 
102     @Override
103     public boolean isUnsubscribed() {
104         return unsubscribed.get();
105 
106     }
107 
108     @Override
109     public long resourcesSize() {
110         return super.resourcesSize();
111     }
112 }