View Javadoc
1   package org.davidmoten.rx.jdbc;
2   
3   import java.sql.Array;
4   import java.sql.Blob;
5   import java.sql.CallableStatement;
6   import java.sql.Clob;
7   import java.sql.Connection;
8   import java.sql.DatabaseMetaData;
9   import java.sql.NClob;
10  import java.sql.SQLClientInfoException;
11  import java.sql.SQLException;
12  import java.sql.SQLWarning;
13  import java.sql.SQLXML;
14  import java.sql.Savepoint;
15  import java.sql.Statement;
16  import java.sql.Struct;
17  import java.util.Map;
18  import java.util.Properties;
19  import java.util.concurrent.Executor;
20  import java.util.concurrent.atomic.AtomicInteger;
21  
22  import org.davidmoten.rx.jdbc.exceptions.CannotForkTransactedConnection;
23  import org.slf4j.Logger;
24  import org.slf4j.LoggerFactory;
25  
26  final class TransactedConnection implements Connection {
27  
28      private static final Logger log = LoggerFactory.getLogger(TransactedConnection.class);
29  
30      private final Connection con;
31      private final AtomicInteger counter;
32  
33      TransactedConnection(Connection con, AtomicInteger counter) {
34          log.debug("constructing TransactedConnection from {}, {}", con, counter);
35          this.con = con;
36          this.counter = counter;
37      }
38  
39      public TransactedConnection(Connection con) {
40          this(con, new AtomicInteger(1));
41      }
42  
43      public int counter() {
44          return counter.get();
45      }
46  
47      @Override
48      public void abort(Executor executor) throws SQLException {
49          con.abort(executor);
50      }
51  
52      @Override
53      public void clearWarnings() throws SQLException {
54          con.clearWarnings();
55      }
56  
57      public TransactedConnection fork() {
58          log.debug("forking connection");
59          if (counter.getAndIncrement() > 0) {
60              return new TransactedConnection(con, counter);
61          } else {
62              throw new CannotForkTransactedConnection(
63                      "cannot fork TransactedConnection because already closed");
64          }
65      }
66  
67      @Override
68      public void close() throws SQLException {
69          log.debug("TransactedConnection attempt close");
70          if (counter.get() == 0) {
71              log.debug("TransactedConnection close");
72              con.close();
73          }
74      }
75  
76      @Override
77      public void commit() throws SQLException {
78          log.debug("TransactedConnection commit attempt, counter={}", counter.get());
79          if (counter.decrementAndGet() == 0) {
80              log.debug("TransactedConnection actual commit");
81              con.commit();
82          }
83      }
84  
85      @Override
86      public void rollback() throws SQLException {
87          log.debug("TransactedConnection rollback attempt, counter={}", counter.get());
88          if (counter.decrementAndGet() == 0) {
89              log.debug("TransactedConnection actual rollback");
90              con.rollback();
91          }
92      }
93  
94      @Override
95      public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
96          return con.createArrayOf(typeName, elements);
97      }
98  
99      @Override
100     public Blob createBlob() throws SQLException {
101         return con.createBlob();
102     }
103 
104     @Override
105     public Clob createClob() throws SQLException {
106         return con.createClob();
107     }
108 
109     @Override
110     public NClob createNClob() throws SQLException {
111         return con.createNClob();
112     }
113 
114     @Override
115     public SQLXML createSQLXML() throws SQLException {
116         return con.createSQLXML();
117     }
118 
119     @Override
120     public Statement createStatement() throws SQLException {
121         return con.createStatement();
122     }
123 
124     @Override
125     public Statement createStatement(int resultSetType, int resultSetConcurrency,
126             int resultSetHoldability) throws SQLException {
127         return con.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
128     }
129 
130     @Override
131     public Statement createStatement(int resultSetType, int resultSetConcurrency)
132             throws SQLException {
133         return con.createStatement(resultSetType, resultSetConcurrency);
134     }
135 
136     @Override
137     public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
138         return con.createStruct(typeName, attributes);
139     }
140 
141     @Override
142     public boolean getAutoCommit() throws SQLException {
143         return con.getAutoCommit();
144     }
145 
146     @Override
147     public String getCatalog() throws SQLException {
148         return con.getCatalog();
149     }
150 
151     @Override
152     public Properties getClientInfo() throws SQLException {
153         return con.getClientInfo();
154     }
155 
156     @Override
157     public String getClientInfo(String name) throws SQLException {
158         return con.getClientInfo(name);
159     }
160 
161     @Override
162     public int getHoldability() throws SQLException {
163         return con.getHoldability();
164     }
165 
166     @Override
167     public DatabaseMetaData getMetaData() throws SQLException {
168         return con.getMetaData();
169     }
170 
171     @Override
172     public int getNetworkTimeout() throws SQLException {
173         return con.getNetworkTimeout();
174     }
175 
176     @Override
177     public String getSchema() throws SQLException {
178         return con.getSchema();
179     }
180 
181     @Override
182     public int getTransactionIsolation() throws SQLException {
183         return con.getTransactionIsolation();
184     }
185 
186     @Override
187     public Map<String, Class<?>> getTypeMap() throws SQLException {
188         return con.getTypeMap();
189     }
190 
191     @Override
192     public SQLWarning getWarnings() throws SQLException {
193         return con.getWarnings();
194     }
195 
196     @Override
197     public boolean isClosed() throws SQLException {
198         return con.isClosed();
199     }
200 
201     @Override
202     public boolean isReadOnly() throws SQLException {
203         return con.isReadOnly();
204     }
205 
206     @Override
207     public boolean isValid(int timeout) throws SQLException {
208         return con.isValid(timeout);
209     }
210 
211     @Override
212     public boolean isWrapperFor(Class<?> arg0) throws SQLException {
213         return con.isWrapperFor(arg0);
214     }
215 
216     @Override
217     public String nativeSQL(String sql) throws SQLException {
218         return con.nativeSQL(sql);
219     }
220 
221     @Override
222     public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
223             int resultSetHoldability) throws SQLException {
224         return con.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
225     }
226 
227     @Override
228     public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency)
229             throws SQLException {
230         return con.prepareCall(sql, resultSetType, resultSetConcurrency);
231     }
232 
233     @Override
234     public CallableStatement prepareCall(String sql) throws SQLException {
235         return con.prepareCall(sql);
236     }
237 
238     @Override
239     public TransactedPreparedStatement prepareStatement(String sql, int resultSetType,
240             int resultSetConcurrency, int resultSetHoldability) throws SQLException {
241         return new TransactedPreparedStatement(this, con.prepareStatement(sql, resultSetType,
242                 resultSetConcurrency, resultSetHoldability));
243     }
244 
245     @Override
246     public TransactedPreparedStatement prepareStatement(String sql, int resultSetType,
247             int resultSetConcurrency) throws SQLException {
248         return new TransactedPreparedStatement(this,
249                 con.prepareStatement(sql, resultSetType, resultSetConcurrency));
250     }
251 
252     @Override
253     public TransactedPreparedStatement prepareStatement(String sql, int autoGeneratedKeys)
254             throws SQLException {
255         return new TransactedPreparedStatement(this, con.prepareStatement(sql, autoGeneratedKeys));
256     }
257 
258     @Override
259     public TransactedPreparedStatement prepareStatement(String sql, int[] columnIndexes)
260             throws SQLException {
261         return new TransactedPreparedStatement(this, con.prepareStatement(sql, columnIndexes));
262     }
263 
264     @Override
265     public TransactedPreparedStatement prepareStatement(String sql, String[] columnNames)
266             throws SQLException {
267         return new TransactedPreparedStatement(this, con.prepareStatement(sql, columnNames));
268     }
269 
270     @Override
271     public TransactedPreparedStatement prepareStatement(String sql) throws SQLException {
272         return new TransactedPreparedStatement(this, con.prepareStatement(sql));
273     }
274 
275     @Override
276     public void releaseSavepoint(Savepoint savepoint) throws SQLException {
277         con.releaseSavepoint(savepoint);
278     }
279 
280     @Override
281     public void rollback(Savepoint savepoint) throws SQLException {
282         con.rollback(savepoint);
283     }
284 
285     @Override
286     public void setAutoCommit(boolean autoCommit) throws SQLException {
287         con.setAutoCommit(autoCommit);
288     }
289 
290     @Override
291     public void setCatalog(String catalog) throws SQLException {
292         con.setCatalog(catalog);
293     }
294 
295     @Override
296     public void setClientInfo(Properties properties) throws SQLClientInfoException {
297         con.setClientInfo(properties);
298     }
299 
300     @Override
301     public void setClientInfo(String name, String value) throws SQLClientInfoException {
302         con.setClientInfo(name, value);
303     }
304 
305     @Override
306     public void setHoldability(int holdability) throws SQLException {
307         con.setHoldability(holdability);
308     }
309 
310     @Override
311     public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
312         con.setNetworkTimeout(executor, milliseconds);
313     }
314 
315     @Override
316     public void setReadOnly(boolean readOnly) throws SQLException {
317         con.setReadOnly(readOnly);
318     }
319 
320     @Override
321     public Savepoint setSavepoint() throws SQLException {
322         return con.setSavepoint();
323     }
324 
325     @Override
326     public Savepoint setSavepoint(String name) throws SQLException {
327         return con.setSavepoint(name);
328     }
329 
330     @Override
331     public void setSchema(String schema) throws SQLException {
332         con.setSchema(schema);
333     }
334 
335     @Override
336     public void setTransactionIsolation(int level) throws SQLException {
337         con.setTransactionIsolation(level);
338     }
339 
340     @Override
341     public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
342         con.setTypeMap(map);
343     }
344 
345     @Override
346     public <T> T unwrap(Class<T> arg0) throws SQLException {
347         return con.unwrap(arg0);
348     }
349 
350     public void incrementCounter() {
351         counter.incrementAndGet();
352     }
353 
354     public void decrementCounter() {
355         counter.decrementAndGet();
356     }
357 
358     @Override
359     public String toString() {
360         return "TransactedConnection [con=" + con + ", counter=" + counter + "]";
361     }
362 
363 }