View Javadoc
1   package org.davidmoten.rx.jdbc.pool.internal;
2   
3   import java.sql.Connection;
4   import java.sql.SQLException;
5   import java.util.concurrent.atomic.AtomicBoolean;
6   
7   import org.davidmoten.rx.jdbc.ConnectionProvider;
8   import org.davidmoten.rx.jdbc.exceptions.SQLRuntimeException;
9   import org.davidmoten.rx.jdbc.internal.DelegatedConnection;
10  import org.davidmoten.rx.pool.Member;
11  import org.davidmoten.rx.pool.Pool;
12  
13  import com.github.davidmoten.guavamini.Preconditions;
14  
15  import io.reactivex.Single;
16  import io.reactivex.plugins.RxJavaPlugins;
17  
18  public final class ConnectionProviderBlockingPool implements Pool<Connection> {
19  
20      private final ConnectionProvider connectionProvider;
21  
22      public ConnectionProviderBlockingPool(ConnectionProvider connectionProvider) {
23          this.connectionProvider = connectionProvider;
24      }
25  
26      @Override
27      public Single<Member<Connection>> member() {
28          return Single.fromCallable(() -> new MemberWithValueConnection(connectionProvider));
29      }
30  
31      @Override
32      public void close() throws Exception {
33          connectionProvider.close();
34      }
35  
36      static final class MemberWithValueConnection implements Member<Connection>, DelegatedConnection {
37  
38          private final ConnectionProvider connectionProvider;
39  
40          public MemberWithValueConnection(ConnectionProvider cp) {
41              this.connectionProvider = cp;
42          }
43  
44          volatile PooledConnection connection;
45          final AtomicBoolean hasConnection = new AtomicBoolean();
46  
47          @Override
48          public Connection con() {
49              if (hasConnection.compareAndSet(false, true)) {
50                  // blocking
51                  Connection c = connectionProvider.get();
52                  Preconditions.checkNotNull(c, "connectionProvider should not return null");
53                  connection = new PooledConnection(c, this);
54              }
55              return connection;
56          }
57  
58          @Override
59          public void checkin() {
60              try {
61                  connection.con().close();
62              } catch (SQLException e) {
63                  throw new SQLRuntimeException(e);
64              }
65          }
66  
67          @Override
68          public Connection value() {
69              return con();
70          }
71  
72          @Override
73          public void disposeValue() {
74              try {
75                  connection.con().close();
76              } catch (SQLException e) {
77                  RxJavaPlugins.onError(e);
78              }
79          }
80      }
81  }