1 package org.davidmoten.rx.jdbc;
2
3 import java.sql.Array;
4 import java.sql.Blob;
5 import java.sql.CallableStatement;
6 import java.sql.Clob;
7 import java.sql.Connection;
8 import java.sql.DatabaseMetaData;
9 import java.sql.NClob;
10 import java.sql.SQLClientInfoException;
11 import java.sql.SQLException;
12 import java.sql.SQLWarning;
13 import java.sql.SQLXML;
14 import java.sql.Savepoint;
15 import java.sql.Statement;
16 import java.sql.Struct;
17 import java.util.Map;
18 import java.util.Properties;
19 import java.util.concurrent.Executor;
20 import java.util.concurrent.atomic.AtomicInteger;
21
22 import org.davidmoten.rx.jdbc.exceptions.CannotForkTransactedConnection;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 final class TransactedConnection implements Connection {
27
28 private static final Logger log = LoggerFactory.getLogger(TransactedConnection.class);
29
30 private final Connection con;
31 private final AtomicInteger counter;
32
33 TransactedConnection(Connection con, AtomicInteger counter) {
34 log.debug("constructing TransactedConnection from {}, {}", con, counter);
35 this.con = con;
36 this.counter = counter;
37 }
38
39 public TransactedConnection(Connection con) {
40 this(con, new AtomicInteger(1));
41 }
42
43 public int counter() {
44 return counter.get();
45 }
46
47 @Override
48 public void abort(Executor executor) throws SQLException {
49 con.abort(executor);
50 }
51
52 @Override
53 public void clearWarnings() throws SQLException {
54 con.clearWarnings();
55 }
56
57 public TransactedConnection fork() {
58 log.debug("forking connection");
59 if (counter.getAndIncrement() > 0) {
60 return new TransactedConnection(con, counter);
61 } else {
62 throw new CannotForkTransactedConnection(
63 "cannot fork TransactedConnection because already closed");
64 }
65 }
66
67 @Override
68 public void close() throws SQLException {
69 log.debug("TransactedConnection attempt close");
70 if (counter.get() == 0) {
71 log.debug("TransactedConnection close");
72 con.close();
73 }
74 }
75
76 @Override
77 public void commit() throws SQLException {
78 log.debug("TransactedConnection commit attempt, counter={}", counter.get());
79 if (counter.decrementAndGet() == 0) {
80 log.debug("TransactedConnection actual commit");
81 con.commit();
82 }
83 }
84
85 @Override
86 public void rollback() throws SQLException {
87 log.debug("TransactedConnection rollback attempt, counter={}", counter.get());
88 if (counter.decrementAndGet() == 0) {
89 log.debug("TransactedConnection actual rollback");
90 con.rollback();
91 }
92 }
93
94 @Override
95 public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
96 return con.createArrayOf(typeName, elements);
97 }
98
99 @Override
100 public Blob createBlob() throws SQLException {
101 return con.createBlob();
102 }
103
104 @Override
105 public Clob createClob() throws SQLException {
106 return con.createClob();
107 }
108
109 @Override
110 public NClob createNClob() throws SQLException {
111 return con.createNClob();
112 }
113
114 @Override
115 public SQLXML createSQLXML() throws SQLException {
116 return con.createSQLXML();
117 }
118
119 @Override
120 public Statement createStatement() throws SQLException {
121 return con.createStatement();
122 }
123
124 @Override
125 public Statement createStatement(int resultSetType, int resultSetConcurrency,
126 int resultSetHoldability) throws SQLException {
127 return con.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
128 }
129
130 @Override
131 public Statement createStatement(int resultSetType, int resultSetConcurrency)
132 throws SQLException {
133 return con.createStatement(resultSetType, resultSetConcurrency);
134 }
135
136 @Override
137 public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
138 return con.createStruct(typeName, attributes);
139 }
140
141 @Override
142 public boolean getAutoCommit() throws SQLException {
143 return con.getAutoCommit();
144 }
145
146 @Override
147 public String getCatalog() throws SQLException {
148 return con.getCatalog();
149 }
150
151 @Override
152 public Properties getClientInfo() throws SQLException {
153 return con.getClientInfo();
154 }
155
156 @Override
157 public String getClientInfo(String name) throws SQLException {
158 return con.getClientInfo(name);
159 }
160
161 @Override
162 public int getHoldability() throws SQLException {
163 return con.getHoldability();
164 }
165
166 @Override
167 public DatabaseMetaData getMetaData() throws SQLException {
168 return con.getMetaData();
169 }
170
171 @Override
172 public int getNetworkTimeout() throws SQLException {
173 return con.getNetworkTimeout();
174 }
175
176 @Override
177 public String getSchema() throws SQLException {
178 return con.getSchema();
179 }
180
181 @Override
182 public int getTransactionIsolation() throws SQLException {
183 return con.getTransactionIsolation();
184 }
185
186 @Override
187 public Map<String, Class<?>> getTypeMap() throws SQLException {
188 return con.getTypeMap();
189 }
190
191 @Override
192 public SQLWarning getWarnings() throws SQLException {
193 return con.getWarnings();
194 }
195
196 @Override
197 public boolean isClosed() throws SQLException {
198 return con.isClosed();
199 }
200
201 @Override
202 public boolean isReadOnly() throws SQLException {
203 return con.isReadOnly();
204 }
205
206 @Override
207 public boolean isValid(int timeout) throws SQLException {
208 return con.isValid(timeout);
209 }
210
211 @Override
212 public boolean isWrapperFor(Class<?> arg0) throws SQLException {
213 return con.isWrapperFor(arg0);
214 }
215
216 @Override
217 public String nativeSQL(String sql) throws SQLException {
218 return con.nativeSQL(sql);
219 }
220
221 @Override
222 public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
223 int resultSetHoldability) throws SQLException {
224 return con.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
225 }
226
227 @Override
228 public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency)
229 throws SQLException {
230 return con.prepareCall(sql, resultSetType, resultSetConcurrency);
231 }
232
233 @Override
234 public CallableStatement prepareCall(String sql) throws SQLException {
235 return con.prepareCall(sql);
236 }
237
238 @Override
239 public TransactedPreparedStatement prepareStatement(String sql, int resultSetType,
240 int resultSetConcurrency, int resultSetHoldability) throws SQLException {
241 return new TransactedPreparedStatement(this, con.prepareStatement(sql, resultSetType,
242 resultSetConcurrency, resultSetHoldability));
243 }
244
245 @Override
246 public TransactedPreparedStatement prepareStatement(String sql, int resultSetType,
247 int resultSetConcurrency) throws SQLException {
248 return new TransactedPreparedStatement(this,
249 con.prepareStatement(sql, resultSetType, resultSetConcurrency));
250 }
251
252 @Override
253 public TransactedPreparedStatement prepareStatement(String sql, int autoGeneratedKeys)
254 throws SQLException {
255 return new TransactedPreparedStatement(this, con.prepareStatement(sql, autoGeneratedKeys));
256 }
257
258 @Override
259 public TransactedPreparedStatement prepareStatement(String sql, int[] columnIndexes)
260 throws SQLException {
261 return new TransactedPreparedStatement(this, con.prepareStatement(sql, columnIndexes));
262 }
263
264 @Override
265 public TransactedPreparedStatement prepareStatement(String sql, String[] columnNames)
266 throws SQLException {
267 return new TransactedPreparedStatement(this, con.prepareStatement(sql, columnNames));
268 }
269
270 @Override
271 public TransactedPreparedStatement prepareStatement(String sql) throws SQLException {
272 return new TransactedPreparedStatement(this, con.prepareStatement(sql));
273 }
274
275 @Override
276 public void releaseSavepoint(Savepoint savepoint) throws SQLException {
277 con.releaseSavepoint(savepoint);
278 }
279
280 @Override
281 public void rollback(Savepoint savepoint) throws SQLException {
282 con.rollback(savepoint);
283 }
284
285 @Override
286 public void setAutoCommit(boolean autoCommit) throws SQLException {
287 con.setAutoCommit(autoCommit);
288 }
289
290 @Override
291 public void setCatalog(String catalog) throws SQLException {
292 con.setCatalog(catalog);
293 }
294
295 @Override
296 public void setClientInfo(Properties properties) throws SQLClientInfoException {
297 con.setClientInfo(properties);
298 }
299
300 @Override
301 public void setClientInfo(String name, String value) throws SQLClientInfoException {
302 con.setClientInfo(name, value);
303 }
304
305 @Override
306 public void setHoldability(int holdability) throws SQLException {
307 con.setHoldability(holdability);
308 }
309
310 @Override
311 public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
312 con.setNetworkTimeout(executor, milliseconds);
313 }
314
315 @Override
316 public void setReadOnly(boolean readOnly) throws SQLException {
317 con.setReadOnly(readOnly);
318 }
319
320 @Override
321 public Savepoint setSavepoint() throws SQLException {
322 return con.setSavepoint();
323 }
324
325 @Override
326 public Savepoint setSavepoint(String name) throws SQLException {
327 return con.setSavepoint(name);
328 }
329
330 @Override
331 public void setSchema(String schema) throws SQLException {
332 con.setSchema(schema);
333 }
334
335 @Override
336 public void setTransactionIsolation(int level) throws SQLException {
337 con.setTransactionIsolation(level);
338 }
339
340 @Override
341 public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
342 con.setTypeMap(map);
343 }
344
345 @Override
346 public <T> T unwrap(Class<T> arg0) throws SQLException {
347 return con.unwrap(arg0);
348 }
349
350 public void incrementCounter() {
351 counter.incrementAndGet();
352 }
353
354 public void decrementCounter() {
355 counter.decrementAndGet();
356 }
357
358 @Override
359 public String toString() {
360 return "TransactedConnection [con=" + con + ", counter=" + counter + "]";
361 }
362
363 }