1 package org.davidmoten.rx.jdbc; 2 3 import java.sql.Array; 4 import java.sql.Blob; 5 import java.sql.CallableStatement; 6 import java.sql.Clob; 7 import java.sql.Connection; 8 import java.sql.DatabaseMetaData; 9 import java.sql.NClob; 10 import java.sql.SQLClientInfoException; 11 import java.sql.SQLException; 12 import java.sql.SQLWarning; 13 import java.sql.SQLXML; 14 import java.sql.Savepoint; 15 import java.sql.Statement; 16 import java.sql.Struct; 17 import java.util.Map; 18 import java.util.Properties; 19 import java.util.concurrent.Executor; 20 import java.util.concurrent.atomic.AtomicInteger; 21 22 import org.davidmoten.rx.jdbc.exceptions.CannotForkTransactedConnection; 23 import org.slf4j.Logger; 24 import org.slf4j.LoggerFactory; 25 26 final class TransactedConnection implements Connection { 27 28 private static final Logger log = LoggerFactory.getLogger(TransactedConnection.class); 29 30 private final Connection con; 31 private final AtomicInteger counter; 32 33 TransactedConnection(Connection con, AtomicInteger counter) { 34 log.debug("constructing TransactedConnection from {}, {}", con, counter); 35 this.con = con; 36 this.counter = counter; 37 } 38 39 public TransactedConnection(Connection con) { 40 this(con, new AtomicInteger(1)); 41 } 42 43 public int counter() { 44 return counter.get(); 45 } 46 47 @Override 48 public void abort(Executor executor) throws SQLException { 49 con.abort(executor); 50 } 51 52 @Override 53 public void clearWarnings() throws SQLException { 54 con.clearWarnings(); 55 } 56 57 public TransactedConnection fork() { 58 log.debug("forking connection"); 59 if (counter.getAndIncrement() > 0) { 60 return new TransactedConnection(con, counter); 61 } else { 62 throw new CannotForkTransactedConnection( 63 "cannot fork TransactedConnection because already closed"); 64 } 65 } 66 67 @Override 68 public void close() throws SQLException { 69 log.debug("TransactedConnection attempt close"); 70 if (counter.get() == 0) { 71 log.debug("TransactedConnection close"); 72 con.close(); 73 } 74 } 75 76 @Override 77 public void commit() throws SQLException { 78 log.debug("TransactedConnection commit attempt, counter={}", counter.get()); 79 if (counter.decrementAndGet() == 0) { 80 log.debug("TransactedConnection actual commit"); 81 con.commit(); 82 } 83 } 84 85 @Override 86 public void rollback() throws SQLException { 87 log.debug("TransactedConnection rollback attempt, counter={}", counter.get()); 88 if (counter.decrementAndGet() == 0) { 89 log.debug("TransactedConnection actual rollback"); 90 con.rollback(); 91 } 92 } 93 94 @Override 95 public Array createArrayOf(String typeName, Object[] elements) throws SQLException { 96 return con.createArrayOf(typeName, elements); 97 } 98 99 @Override 100 public Blob createBlob() throws SQLException { 101 return con.createBlob(); 102 } 103 104 @Override 105 public Clob createClob() throws SQLException { 106 return con.createClob(); 107 } 108 109 @Override 110 public NClob createNClob() throws SQLException { 111 return con.createNClob(); 112 } 113 114 @Override 115 public SQLXML createSQLXML() throws SQLException { 116 return con.createSQLXML(); 117 } 118 119 @Override 120 public Statement createStatement() throws SQLException { 121 return con.createStatement(); 122 } 123 124 @Override 125 public Statement createStatement(int resultSetType, int resultSetConcurrency, 126 int resultSetHoldability) throws SQLException { 127 return con.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); 128 } 129 130 @Override 131 public Statement createStatement(int resultSetType, int resultSetConcurrency) 132 throws SQLException { 133 return con.createStatement(resultSetType, resultSetConcurrency); 134 } 135 136 @Override 137 public Struct createStruct(String typeName, Object[] attributes) throws SQLException { 138 return con.createStruct(typeName, attributes); 139 } 140 141 @Override 142 public boolean getAutoCommit() throws SQLException { 143 return con.getAutoCommit(); 144 } 145 146 @Override 147 public String getCatalog() throws SQLException { 148 return con.getCatalog(); 149 } 150 151 @Override 152 public Properties getClientInfo() throws SQLException { 153 return con.getClientInfo(); 154 } 155 156 @Override 157 public String getClientInfo(String name) throws SQLException { 158 return con.getClientInfo(name); 159 } 160 161 @Override 162 public int getHoldability() throws SQLException { 163 return con.getHoldability(); 164 } 165 166 @Override 167 public DatabaseMetaData getMetaData() throws SQLException { 168 return con.getMetaData(); 169 } 170 171 @Override 172 public int getNetworkTimeout() throws SQLException { 173 return con.getNetworkTimeout(); 174 } 175 176 @Override 177 public String getSchema() throws SQLException { 178 return con.getSchema(); 179 } 180 181 @Override 182 public int getTransactionIsolation() throws SQLException { 183 return con.getTransactionIsolation(); 184 } 185 186 @Override 187 public Map<String, Class<?>> getTypeMap() throws SQLException { 188 return con.getTypeMap(); 189 } 190 191 @Override 192 public SQLWarning getWarnings() throws SQLException { 193 return con.getWarnings(); 194 } 195 196 @Override 197 public boolean isClosed() throws SQLException { 198 return con.isClosed(); 199 } 200 201 @Override 202 public boolean isReadOnly() throws SQLException { 203 return con.isReadOnly(); 204 } 205 206 @Override 207 public boolean isValid(int timeout) throws SQLException { 208 return con.isValid(timeout); 209 } 210 211 @Override 212 public boolean isWrapperFor(Class<?> arg0) throws SQLException { 213 return con.isWrapperFor(arg0); 214 } 215 216 @Override 217 public String nativeSQL(String sql) throws SQLException { 218 return con.nativeSQL(sql); 219 } 220 221 @Override 222 public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, 223 int resultSetHoldability) throws SQLException { 224 return con.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability); 225 } 226 227 @Override 228 public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) 229 throws SQLException { 230 return con.prepareCall(sql, resultSetType, resultSetConcurrency); 231 } 232 233 @Override 234 public CallableStatement prepareCall(String sql) throws SQLException { 235 return con.prepareCall(sql); 236 } 237 238 @Override 239 public TransactedPreparedStatement prepareStatement(String sql, int resultSetType, 240 int resultSetConcurrency, int resultSetHoldability) throws SQLException { 241 return new TransactedPreparedStatement(this, con.prepareStatement(sql, resultSetType, 242 resultSetConcurrency, resultSetHoldability)); 243 } 244 245 @Override 246 public TransactedPreparedStatement prepareStatement(String sql, int resultSetType, 247 int resultSetConcurrency) throws SQLException { 248 return new TransactedPreparedStatement(this, 249 con.prepareStatement(sql, resultSetType, resultSetConcurrency)); 250 } 251 252 @Override 253 public TransactedPreparedStatement prepareStatement(String sql, int autoGeneratedKeys) 254 throws SQLException { 255 return new TransactedPreparedStatement(this, con.prepareStatement(sql, autoGeneratedKeys)); 256 } 257 258 @Override 259 public TransactedPreparedStatement prepareStatement(String sql, int[] columnIndexes) 260 throws SQLException { 261 return new TransactedPreparedStatement(this, con.prepareStatement(sql, columnIndexes)); 262 } 263 264 @Override 265 public TransactedPreparedStatement prepareStatement(String sql, String[] columnNames) 266 throws SQLException { 267 return new TransactedPreparedStatement(this, con.prepareStatement(sql, columnNames)); 268 } 269 270 @Override 271 public TransactedPreparedStatement prepareStatement(String sql) throws SQLException { 272 return new TransactedPreparedStatement(this, con.prepareStatement(sql)); 273 } 274 275 @Override 276 public void releaseSavepoint(Savepoint savepoint) throws SQLException { 277 con.releaseSavepoint(savepoint); 278 } 279 280 @Override 281 public void rollback(Savepoint savepoint) throws SQLException { 282 con.rollback(savepoint); 283 } 284 285 @Override 286 public void setAutoCommit(boolean autoCommit) throws SQLException { 287 con.setAutoCommit(autoCommit); 288 } 289 290 @Override 291 public void setCatalog(String catalog) throws SQLException { 292 con.setCatalog(catalog); 293 } 294 295 @Override 296 public void setClientInfo(Properties properties) throws SQLClientInfoException { 297 con.setClientInfo(properties); 298 } 299 300 @Override 301 public void setClientInfo(String name, String value) throws SQLClientInfoException { 302 con.setClientInfo(name, value); 303 } 304 305 @Override 306 public void setHoldability(int holdability) throws SQLException { 307 con.setHoldability(holdability); 308 } 309 310 @Override 311 public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { 312 con.setNetworkTimeout(executor, milliseconds); 313 } 314 315 @Override 316 public void setReadOnly(boolean readOnly) throws SQLException { 317 con.setReadOnly(readOnly); 318 } 319 320 @Override 321 public Savepoint setSavepoint() throws SQLException { 322 return con.setSavepoint(); 323 } 324 325 @Override 326 public Savepoint setSavepoint(String name) throws SQLException { 327 return con.setSavepoint(name); 328 } 329 330 @Override 331 public void setSchema(String schema) throws SQLException { 332 con.setSchema(schema); 333 } 334 335 @Override 336 public void setTransactionIsolation(int level) throws SQLException { 337 con.setTransactionIsolation(level); 338 } 339 340 @Override 341 public void setTypeMap(Map<String, Class<?>> map) throws SQLException { 342 con.setTypeMap(map); 343 } 344 345 @Override 346 public <T> T unwrap(Class<T> arg0) throws SQLException { 347 return con.unwrap(arg0); 348 } 349 350 public void incrementCounter() { 351 counter.incrementAndGet(); 352 } 353 354 public void decrementCounter() { 355 counter.decrementAndGet(); 356 } 357 358 @Override 359 public String toString() { 360 return "TransactedConnection [con=" + con + ", counter=" + counter + "]"; 361 } 362 363 }