1 package com.github.davidmoten.rx.jdbc;
2
3 import java.sql.ResultSet;
4 import java.sql.SQLException;
5 import java.util.List;
6
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9
10 import rx.Observable;
11 import rx.Observable.OnSubscribe;
12 import rx.Subscriber;
13 import rx.functions.Action0;
14 import rx.subscriptions.Subscriptions;
15
16
17
18
19 final class QuerySelectOnSubscribe<T> implements OnSubscribe<T> {
20
21 private static final Logger log = LoggerFactory.getLogger(QuerySelectOnSubscribe.class);
22
23
24
25
26
27
28
29
30
31 static <T> Observable<T> execute(QuerySelect query, List<Parameter> parameters,
32 ResultSetMapper<? extends T> function) {
33 return Observable.create(new QuerySelectOnSubscribe<T>(query, parameters, function));
34 }
35
36 private final ResultSetMapper<? extends T> function;
37 private final QuerySelect query;
38 private final List<Parameter> parameters;
39 private final boolean stateProvided;
40
41
42
43
44
45
46
47 private QuerySelectOnSubscribe(QuerySelect query, List<Parameter> parameters,
48 ResultSetMapper<? extends T> function) {
49 this.query = query;
50 this.parameters = parameters;
51 this.function = function;
52 this.stateProvided = query.sql().equals(QuerySelect.RETURN_GENERATED_KEYS);
53 }
54
55 @Override
56 public void call(Subscriber<? super T> subscriber) {
57 State state = null;
58 try {
59 if (stateProvided) {
60 state = (State) parameters.get(0).value();
61 setupUnsubscription(subscriber, state);
62 } else {
63 state = new State();
64 connectAndPrepareStatement(subscriber, state);
65 setupUnsubscription(subscriber, state);
66 executeQuery(subscriber, state);
67 }
68 subscriber.setProducer(new QuerySelectProducer<T>(function, subscriber, state.con,
69 state.ps, state.rs));
70 } catch (Exception e) {
71 query.context().endTransactionObserve();
72 query.context().endTransactionSubscribe();
73 try {
74 if (state != null)
75 closeQuietly(state);
76 } finally {
77 handleException(e, subscriber);
78 }
79 }
80 }
81
82 private static <T> void setupUnsubscription(Subscriber<T> subscriber, final State state) {
83 subscriber.add(Subscriptions.create(new Action0() {
84 @Override
85 public void call() {
86 closeQuietly(state);
87 }
88 }));
89 }
90
91
92
93
94
95
96
97
98
99
100 private void connectAndPrepareStatement(Subscriber<? super T> subscriber, State state)
101 throws SQLException {
102 log.debug("connectionProvider={}", query.context().connectionProvider());
103 if (!subscriber.isUnsubscribed()) {
104 log.debug("getting connection");
105 state.con = query.context().connectionProvider().get();
106 log.debug("preparing statement,sql={}", query.sql());
107 state.ps = state.con.prepareStatement(query.sql(),ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
108 log.debug("setting parameters");
109 Util.setParameters(state.ps, parameters, query.names());
110 }
111 }
112
113
114
115
116
117
118
119
120
121
122
123 private void executeQuery(Subscriber<? super T> subscriber, State state) throws SQLException {
124 if (!subscriber.isUnsubscribed()) {
125 try {
126 log.debug("executing ps");
127 state.rs = state.ps.executeQuery();
128 log.debug("executed ps={}", state.ps);
129 } catch (SQLException e) {
130 throw new SQLException("failed to run sql=" + query.sql(), e);
131 }
132 }
133 }
134
135
136
137
138
139
140
141 private void handleException(Exception e, Subscriber<? super T> subscriber) {
142 log.debug("onError: " + e.getMessage());
143 if (subscriber.isUnsubscribed())
144 log.debug("unsubscribed");
145 else {
146 subscriber.onError(e);
147 }
148 }
149
150
151
152
153
154
155
156 private static void closeQuietly(State state) {
157
158 if (state.closed.compareAndSet(false, true)) {
159
160
161 log.debug("closing rs");
162 Util.closeQuietly(state.rs);
163 log.debug("closing ps");
164 Util.closeQuietly(state.ps);
165 log.debug("closing con");
166 Util.closeQuietlyIfAutoCommit(state.con);
167 log.debug("closed");
168 }
169 }
170
171 }