1 package org.davidmoten.rx.jdbc.pool.internal;
2
3 import java.sql.Connection;
4 import java.sql.SQLException;
5 import java.util.concurrent.atomic.AtomicBoolean;
6
7 import org.davidmoten.rx.jdbc.ConnectionProvider;
8 import org.davidmoten.rx.jdbc.exceptions.SQLRuntimeException;
9 import org.davidmoten.rx.jdbc.internal.DelegatedConnection;
10 import org.davidmoten.rx.pool.Member;
11 import org.davidmoten.rx.pool.Pool;
12
13 import com.github.davidmoten.guavamini.Preconditions;
14
15 import io.reactivex.Single;
16 import io.reactivex.plugins.RxJavaPlugins;
17
18 public final class ConnectionProviderBlockingPool implements Pool<Connection> {
19
20 private final ConnectionProvider connectionProvider;
21
22 public ConnectionProviderBlockingPool(ConnectionProvider connectionProvider) {
23 this.connectionProvider = connectionProvider;
24 }
25
26 @Override
27 public Single<Member<Connection>> member() {
28 return Single.fromCallable(() -> new MemberWithValueConnection(connectionProvider));
29 }
30
31 @Override
32 public void close() throws Exception {
33 connectionProvider.close();
34 }
35
36 static final class MemberWithValueConnection implements Member<Connection>, DelegatedConnection {
37
38 private final ConnectionProvider connectionProvider;
39
40 public MemberWithValueConnection(ConnectionProvider cp) {
41 this.connectionProvider = cp;
42 }
43
44 volatile PooledConnection connection;
45 final AtomicBoolean hasConnection = new AtomicBoolean();
46
47 @Override
48 public Connection con() {
49 if (hasConnection.compareAndSet(false, true)) {
50
51 Connection c = connectionProvider.get();
52 Preconditions.checkNotNull(c, "connectionProvider should not return null");
53 connection = new PooledConnection(c, this);
54 }
55 return connection;
56 }
57
58 @Override
59 public void checkin() {
60 try {
61 connection.con().close();
62 } catch (SQLException e) {
63 throw new SQLRuntimeException(e);
64 }
65 }
66
67 @Override
68 public Connection value() {
69 return con();
70 }
71
72 @Override
73 public void disposeValue() {
74 try {
75 connection.con().close();
76 } catch (SQLException e) {
77 RxJavaPlugins.onError(e);
78 }
79 }
80 }
81 }