1 package com.github.davidmoten.rx.jdbc;
2
3 import java.sql.Connection;
4 import java.sql.PreparedStatement;
5 import java.sql.ResultSet;
6 import java.sql.SQLException;
7 import java.util.concurrent.atomic.AtomicLong;
8
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11
12 import rx.Producer;
13 import rx.Subscriber;
14
15 import com.github.davidmoten.rx.RxUtil;
16
17 class QuerySelectProducer<T> implements Producer {
18
19 private static final Logger log = LoggerFactory.getLogger(QuerySelectProducer.class);
20
21 private final ResultSetMapper<? extends T> function;
22 private final Subscriber<? super T> subscriber;
23 private final Connection con;
24 private final PreparedStatement ps;
25 private final ResultSet rs;
26 private volatile boolean keepGoing = true;
27
28 private final AtomicLong requested = new AtomicLong(0);
29
30 QuerySelectProducer(ResultSetMapper<? extends T> function, Subscriber<? super T> subscriber,
31 Connection con, PreparedStatement ps, ResultSet rs) {
32 this.function = function;
33 this.subscriber = subscriber;
34 this.con = con;
35 this.ps = ps;
36 this.rs = rs;
37 }
38
39 @Override
40 public void request(long n) {
41 if (requested.get() == Long.MAX_VALUE)
42
43 return;
44 else if (n == Long.MAX_VALUE && requested.compareAndSet(0, Long.MAX_VALUE)) {
45 requestAll();
46 } else if (n > 0) {
47 requestSome(n);
48 }
49 }
50
51 private void requestAll() {
52
53 try {
54 while (keepGoing) {
55 processRow(subscriber);
56 }
57 closeQuietly();
58 complete(subscriber);
59 } catch (Exception e) {
60 closeAndHandleException(e);
61 }
62 }
63
64 private void requestSome(long n) {
65
66
67 long previousCount = RxUtil.getAndAddRequest(requested, n);
68 if (previousCount == 0) {
69 try {
70 while (true) {
71 long r = requested.get();
72 long numToEmit = r;
73
74 while (keepGoing && --numToEmit >= 0) {
75 processRow(subscriber);
76 }
77 if (keepGoing) {
78 if (requested.addAndGet(-r) == 0) {
79 return;
80 }
81 } else {
82 closeQuietly();
83 complete(subscriber);
84 return;
85 }
86 }
87 } catch (Exception e) {
88 closeAndHandleException(e);
89 }
90 }
91 }
92
93 private void closeAndHandleException(Exception e) {
94 try {
95 closeQuietly();
96 } finally {
97 handleException(e, subscriber);
98 }
99 }
100
101
102
103
104
105
106
107
108 private void processRow(Subscriber<? super T> subscriber) throws SQLException {
109 checkSubscription(subscriber);
110 if (!keepGoing)
111 return;
112 if (rs.next()) {
113 log.trace("onNext");
114 subscriber.onNext(function.call(rs));
115 } else
116 keepGoing = false;
117 }
118
119
120
121
122
123
124 private void complete(Subscriber<? super T> subscriber) {
125 if (subscriber.isUnsubscribed()) {
126 log.debug("unsubscribed");
127 } else {
128 log.debug("onCompleted");
129 subscriber.onCompleted();
130 }
131 }
132
133
134
135
136
137
138
139 private void handleException(Exception e, Subscriber<? super T> subscriber) {
140 log.debug("onError: " + e.getMessage());
141 if (subscriber.isUnsubscribed())
142 log.debug("unsubscribed");
143 else {
144 subscriber.onError(e);
145 }
146 }
147
148
149
150
151
152 private void closeQuietly() {
153 log.debug("closing rs");
154 Util.closeQuietly(rs);
155 log.debug("closing ps");
156 Util.closeQuietly(ps);
157 log.debug("closing con");
158 Util.closeQuietlyIfAutoCommit(con);
159 log.debug("closed");
160 }
161
162
163
164
165
166
167 private void checkSubscription(Subscriber<? super T> subscriber) {
168 if (subscriber.isUnsubscribed()) {
169 keepGoing = false;
170 log.debug("unsubscribing");
171 }
172 }
173
174 }