1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.util.concurrent.atomic.AtomicBoolean;
4 import java.util.concurrent.atomic.AtomicInteger;
5
6
7
8
9
10
11
12
13
14 final class QueueWithResourcesNonBlockingUnsubscribe<T> extends AbstractQueueWithResources<T> {
15
16 private volatile boolean unsubscribing;
17
18
19
20
21 private final AtomicInteger currentCalls = new AtomicInteger(0);
22
23 private final AtomicBoolean unsubscribed;
24
25 QueueWithResourcesNonBlockingUnsubscribe(QueueWithResources<T> queue) {
26 super(queue);
27 this.unsubscribing = false;
28 this.unsubscribed = new AtomicBoolean(false);
29 }
30
31 @Override
32 public T poll() {
33 try {
34 if (unsubscribing) {
35 return null;
36 } else {
37 try {
38 currentCalls.incrementAndGet();
39 return super.poll();
40 } finally {
41 currentCalls.decrementAndGet();
42 }
43 }
44 } finally {
45 checkUnsubscribe();
46 }
47 }
48
49 @Override
50 public boolean offer(T t) {
51 try {
52 if (unsubscribing) {
53 return true;
54 } else {
55 try {
56 currentCalls.incrementAndGet();
57 return super.offer(t);
58 } finally {
59 currentCalls.decrementAndGet();
60 }
61 }
62 } finally {
63 checkUnsubscribe();
64 }
65 }
66
67 @Override
68 public boolean isEmpty() {
69 try {
70 if (unsubscribing) {
71 return true;
72 } else {
73 try {
74 currentCalls.incrementAndGet();
75 return super.isEmpty();
76 } finally {
77 currentCalls.decrementAndGet();
78 }
79 }
80 } finally {
81 checkUnsubscribe();
82 }
83 }
84
85 @Override
86 public void unsubscribe() {
87 unsubscribing = true;
88 checkUnsubscribe();
89 }
90
91 private void checkUnsubscribe() {
92 if (unsubscribing && currentCalls.get() == 0 && unsubscribed.compareAndSet(false, true)) {
93 super.unsubscribe();
94 }
95 }
96
97 @Override
98 public void freeResources() {
99 super.freeResources();
100 }
101
102 @Override
103 public boolean isUnsubscribed() {
104 return unsubscribed.get();
105
106 }
107
108 @Override
109 public long resourcesSize() {
110 return super.resourcesSize();
111 }
112 }