1 package org.davidmoten.rx.jdbc.pool;
2
3 import java.sql.Connection;
4 import java.util.Optional;
5 import java.util.Properties;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors;
8 import java.util.concurrent.TimeUnit;
9 import java.util.concurrent.atomic.AtomicReference;
10 import java.util.function.Function;
11
12 import javax.sql.DataSource;
13
14 import org.davidmoten.rx.jdbc.ConnectionProvider;
15 import org.davidmoten.rx.jdbc.Util;
16 import org.davidmoten.rx.jdbc.internal.SingletonConnectionProvider;
17 import org.davidmoten.rx.jdbc.pool.internal.HealthCheckPredicate;
18 import org.davidmoten.rx.jdbc.pool.internal.PooledConnection;
19 import org.davidmoten.rx.jdbc.pool.internal.SerializedConnectionListener;
20 import org.davidmoten.rx.pool.Member;
21 import org.davidmoten.rx.pool.NonBlockingPool;
22 import org.davidmoten.rx.pool.Pool;
23
24 import com.github.davidmoten.guavamini.Preconditions;
25
26 import io.reactivex.Scheduler;
27 import io.reactivex.Single;
28 import io.reactivex.functions.Consumer;
29 import io.reactivex.functions.Predicate;
30 import io.reactivex.internal.schedulers.ExecutorScheduler;
31 import io.reactivex.plugins.RxJavaPlugins;
32 import io.reactivex.schedulers.Schedulers;
33
34 public final class NonBlockingConnectionPool implements Pool<Connection> {
35
36 private final AtomicReference<NonBlockingPool<Connection>> pool = new AtomicReference<>();
37
38 NonBlockingConnectionPool(org.davidmoten.rx.pool.NonBlockingPool.Builder<Connection> builder) {
39 pool.set(builder.build());
40 }
41
42 public static Builder<NonBlockingConnectionPool> builder() {
43 return new Builder<NonBlockingConnectionPool>(x -> x);
44 }
45
46 public static final class Builder<T> {
47
48 private ConnectionProvider cp;
49 private Predicate<? super Connection> healthCheck = c -> true;
50 private int maxPoolSize = 5;
51 private long idleTimeBeforeHealthCheckMs = 60000;
52 private long maxIdleTimeMs = 30 * 60000;
53 private long connectionRetryIntervalMs = 30000;
54 private Consumer<? super Connection> disposer = Util::closeSilently;
55 private Scheduler scheduler = null;
56 private Properties properties = new Properties();
57 private final Function<NonBlockingConnectionPool, T> transform;
58 private String url;
59 private Consumer<? super Optional<Throwable>> c;
60
61 public Builder(Function<NonBlockingConnectionPool, T> transform) {
62 this.transform = transform;
63 }
64
65 /**
66 * Sets the provider of {@link Connection} objects to be used by the pool.
67 *
68 * @param cp
69 * connection provider
70 * @return this
71 */
72 public Builder<T> connectionProvider(ConnectionProvider cp) {
73 Preconditions.checkArgument(!(cp instanceof SingletonConnectionProvider), //
74 "connection provider should not return a singleton connection because " //
75 + "a pool needs control over the creation and closing of connections. " //
76 + "Use ConnectionProvider.from(url,...) instead.");
77 this.cp = cp;
78 return this;
79 }
80
81 /**
82 * Sets the provider of {@link Connection} objects to be used by the pool.
83 *
84 * @param ds
85 * dataSource that providers Connections
86 * @return this
87 */
88 public Builder<T> connectionProvider(DataSource ds) {
89 return connectionProvider(Util.connectionProvider(ds));
90 }
91
92 /**
93 * Sets the jdbc url of the {@link Connection} objects to be used by the pool.
94 *
95 * @param url
96 * jdbc url
97 * @return this
98 */
99 public Builder<T> url(String url) {
100 this.url = url;
101 return this;
102 }
103
104 public Builder<T> user(String user) {
105 properties.put("user", user);
106 return this;
107 }
108
109 public Builder<T> password(String password) {
110 properties.put("password", password);
111 return this;
112 }
113
114
115
116 /**
117 * Sets the JDBC properties that will be passed to
118 * {@link java.sql.DriverManager#getConnection}. The properties will only be
119 * used if the {@code url} has been set in the builder.
120 *
121 * @param properties
122 * the jdbc properties
123 * @return this
124 */
125 public Builder<T> properties(Properties properties) {
126 this.properties = properties;
127 return this;
128 }
129
130 /**
131 * Adds the given property specified by key and value to the JDBC properties
132 * that will be passed to {@link java.sql.DriverManager#getConnection}. The
133 * properties will only be used if the {@code url} has been set in the builder.
134 *
135 * @param key
136 * property key
137 * @param value
138 * property value
139 * @return this
140 */
141 public Builder<T> property(Object key, Object value) {
142 this.properties.put(key, value);
143 return this;
144 }
145
146 /**
147 * Sets the max time a {@link Connection} can be idle (checked in to pool)
148 * before it is released from the pool (the Connection is closed).
149 *
150 * @param duration
151 * the period before which an idle Connection is released from the
152 * pool (closed).
153 * @param unit
154 * time unit
155 * @return this
156 */
157 public Builder<T> maxIdleTime(long duration, TimeUnit unit) {
158 this.maxIdleTimeMs = unit.toMillis(duration);
159 return this;
160 }
161
162 /**
163 * Sets the minimum time that a connection must be idle (checked in) before on
164 * the next checkout its validity is checked using the health check function. If
165 * the health check fails then the Connection is closed (ignoring error) and
166 * released from the pool. Another Connection is then scheduled for creation
167 * (using the createRetryInterval delay).
168 *
169 * @param duration
170 * minimum time a connection must be idle before its validity is
171 * checked on checkout from the pool
172 * @param unit
173 * time unit
174 * @return this
175 */
176 public Builder<T> idleTimeBeforeHealthCheck(long duration, TimeUnit unit) {
177 this.idleTimeBeforeHealthCheckMs = unit.toMillis(duration);
178 return this;
179 }
180
181 /**
182 * Sets the retry interval in the case that creating/reestablishing a
183 * {@link Connection} for use in the pool fails.
184 *
185 * @param duration
186 * Connection creation retry interval
187 * @param unit
188 * time unit
189 * @return this
190 */
191 public Builder<T> connectionRetryInterval(long duration, TimeUnit unit) {
192 this.connectionRetryIntervalMs = unit.toMillis(duration);
193 return this;
194 }
195
196 /**
197 * Sets the health check for a Connection in the pool that is run only if the
198 * time since the last checkout of this Connection finished is more than
199 * idleTimeBeforeHealthCheck and a checkout of this Connection has just been
200 * requested.
201 *
202 * @param healthCheck
203 * check to run on Connection. Returns true if and only if the
204 * Connection is valid/healthy.
205 * @return this
206 */
207 public Builder<T> healthCheck(Predicate<? super Connection> healthCheck) {
208 this.healthCheck = healthCheck;
209 return this;
210 }
211
212 /**
213 * Sets the health check for a Connection in the pool that is run only if the
214 * time since the last checkout of this Connection finished is more than
215 * idleTimeBeforeHealthCheck and a checkout of this Connection has just been
216 * requested.
217 *
218 * @param databaseType
219 * the check to run is chosen based on the database type
220 * @return this
221 */
222 public Builder<T> healthCheck(DatabaseType databaseType) {
223 return healthCheck(databaseType.healthCheck());
224 }
225
226 /**
227 * Sets the health check for a Connection in the pool that is run only if the
228 * time since the last checkout of this Connection finished is more than
229 * idleTimeBeforeHealthCheck and a checkout of this Connection has just been
230 * requested.
231 *
232 * @param sql
233 * sql to run to check the validity of the connection. If the sql is
234 * run without error then the connection is assumed healthy.
235 * @return this
236 */
237 public Builder<T> healthCheck(String sql) {
238 return healthCheck(new HealthCheckPredicate(sql));
239 }
240
241 /**
242 * Sets a listener for connection success and failure. Success and failure
243 * events are reported serially to the listener. If the consumer throws it will
244 * be reported to {@code RxJavaPlugins.onError}. This consumer should not block
245 * otherwise it will block the connection pool itself.
246 *
247 * @param c
248 * listener for connection events
249 * @return this
250 */
251 public Builder<T> connectionListener(Consumer<? super Optional<Throwable>> c) {
252 Preconditions.checkArgument(c != null, "listener can only be set once");
253 this.c = c;
254 return this;
255 }
256
257 /**
258 * Sets the maximum connection pool size. Default is 5.
259 *
260 * @param maxPoolSize
261 * maximum number of connections in the pool
262 * @return this
263 */
264 public Builder<T> maxPoolSize(int maxPoolSize) {
265 this.maxPoolSize = maxPoolSize;
266 return this;
267 }
268
269 /**
270 * Sets the scheduler used for emitting connections (must be scheduled to
271 * another thread to break the chain of stack calls otherwise can get
272 * StackOverflowError) and for scheduling timeouts and retries. Defaults to
273 * {@code Schedulers.from(Executors.newFixedThreadPool(maxPoolSize))}. Do not
274 * set the scheduler to {@code Schedulers.trampoline()} because queries will
275 * block waiting for timeout workers. Also, do not use a single-threaded
276 * {@link Scheduler} because you may encounter {@link StackOverflowError}.
277 *
278 * @param scheduler
279 * scheduler to use for emitting connections and for scheduling
280 * timeouts and retries. Defaults to
281 * {@code Schedulers.from(Executors.newFixedThreadPool(maxPoolSize))}.
282 * Do not use {@code Schedulers.trampoline()}.
283 * @throws IllegalArgumentException
284 * if trampoline specified
285 * @return this
286 */
287 public Builder<T> scheduler(Scheduler scheduler) {
288 Preconditions.checkArgument(scheduler != Schedulers.trampoline(),
289 "do not use trampoline scheduler because of risk of stack overflow");
290 this.scheduler = scheduler;
291 return this;
292 }
293
294 public T build() {
295 if (scheduler == null) {
296 ExecutorService executor = Executors.newFixedThreadPool(maxPoolSize);
297 scheduler = new ExecutorScheduler(executor, false);
298 }
299 if (url != null) {
300 cp = Util.connectionProvider(url, properties);
301 }
302 Consumer<Optional<Throwable>> listener;
303 if (c == null) {
304 listener = null;
305 } else {
306 listener = new SerializedConnectionListener(c);
307 }
308 NonBlockingConnectionPool p = new NonBlockingConnectionPool(NonBlockingPool //
309 .factory(() -> {
310 try {
311 Connection con = cp.get();
312 if (listener != null) {
313 try {
314 listener.accept(Optional.empty());
315 } catch (Throwable e) {
316 RxJavaPlugins.onError(e);
317 }
318 }
319 return con;
320 } catch (Throwable e) {
321 if (listener != null) {
322 try {
323 listener.accept(Optional.of(e));
324 } catch (Throwable e2) {
325 RxJavaPlugins.onError(e2);
326 }
327 }
328 throw e;
329 }
330 }) //
331 .checkinDecorator((con, checkin) -> new PooledConnection(con, checkin)) //
332 .idleTimeBeforeHealthCheck(idleTimeBeforeHealthCheckMs, TimeUnit.MILLISECONDS) //
333 .maxIdleTime(maxIdleTimeMs, TimeUnit.MILLISECONDS) //
334 .createRetryInterval(connectionRetryIntervalMs, TimeUnit.MILLISECONDS) //
335 .scheduler(scheduler) //
336 .disposer(disposer)//
337 .healthCheck(healthCheck) //
338 .scheduler(scheduler) //
339 .maxSize(maxPoolSize) //
340 );
341 return transform.apply(p);
342 }
343
344 }
345
346 @Override
347 public Single<Member<Connection>> member() {
348 return pool.get().member();
349 }
350
351 @Override
352 public void close() {
353 pool.get().close();
354 }
355
356 }