1 package org.davidmoten.rxjava3.jdbc.pool;
2
3 import java.sql.Connection;
4 import java.util.Optional;
5 import java.util.Properties;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors;
8 import java.util.concurrent.TimeUnit;
9 import java.util.concurrent.atomic.AtomicReference;
10 import java.util.function.Function;
11
12 import javax.sql.DataSource;
13
14 import org.davidmoten.rxjava3.jdbc.ConnectionProvider;
15 import org.davidmoten.rxjava3.jdbc.Util;
16 import org.davidmoten.rxjava3.jdbc.internal.SingletonConnectionProvider;
17 import org.davidmoten.rxjava3.jdbc.pool.internal.HealthCheckPredicate;
18 import org.davidmoten.rxjava3.jdbc.pool.internal.PooledConnection;
19 import org.davidmoten.rxjava3.jdbc.pool.internal.SerializedConnectionListener;
20 import org.davidmoten.rxjava3.pool.Member;
21 import org.davidmoten.rxjava3.pool.NonBlockingPool;
22 import org.davidmoten.rxjava3.pool.Pool;
23
24 import com.github.davidmoten.guavamini.Preconditions;
25
26 import io.reactivex.rxjava3.core.Scheduler;
27 import io.reactivex.rxjava3.core.Single;
28 import io.reactivex.rxjava3.functions.Consumer;
29 import io.reactivex.rxjava3.functions.Predicate;
30 import io.reactivex.rxjava3.plugins.RxJavaPlugins;
31 import io.reactivex.rxjava3.schedulers.Schedulers;
32
33 public final class NonBlockingConnectionPool implements Pool<Connection> {
34
35 private final AtomicReference<NonBlockingPool<Connection>> pool = new AtomicReference<>();
36
37 NonBlockingConnectionPool(NonBlockingPool.Builder<Connection> builder) {
38 pool.set(builder.build());
39 }
40
41 public static Builder<NonBlockingConnectionPool> builder() {
42 return new Builder<NonBlockingConnectionPool>(x -> x);
43 }
44
45 public static final class Builder<T> {
46
47 private ConnectionProvider cp;
48 private Predicate<? super Connection> healthCheck = c -> true;
49 private int maxPoolSize = 5;
50 private long idleTimeBeforeHealthCheckMs = 60000;
51 private long maxIdleTimeMs = 30 * 60000;
52 private long connectionRetryIntervalMs = 30000;
53 private Consumer<? super Connection> disposer = Util::closeSilently;
54 private Scheduler scheduler = null;
55 private Properties properties = new Properties();
56 private final Function<NonBlockingConnectionPool, T> transform;
57 private String url;
58 private Consumer<? super Optional<Throwable>> c;
59
60 public Builder(Function<NonBlockingConnectionPool, T> transform) {
61 this.transform = transform;
62 this.cp = null;
63 }
64
65 /**
66 * Sets the provider of {@link Connection} objects to be used by the pool.
67 *
68 * @param cp
69 * connection provider
70 * @return this
71 */
72 public Builder<T> connectionProvider(ConnectionProvider cp) {
73 Preconditions.checkArgument(!(cp instanceof SingletonConnectionProvider), //
74 "connection provider should not return a singleton connection because " //
75 + "a pool needs control over the creation and closing of connections. " //
76 + "Use ConnectionProvider.from(url,...) instead.");
77 this.cp = cp;
78 return this;
79 }
80
81 /**
82 * Sets the provider of {@link Connection} objects to be used by the pool.
83 *
84 * @param ds
85 * dataSource that providers Connections
86 * @return this
87 */
88 public Builder<T> connectionProvider(DataSource ds) {
89 return connectionProvider(Util.connectionProvider(ds));
90 }
91
92 /**
93 * Sets the jdbc url of the {@link Connection} objects to be used by the pool.
94 *
95 * @param url
96 * jdbc url
97 * @return this
98 */
99 public Builder<T> url(String url) {
100 this.url = url;
101 return this;
102 }
103
104 public Builder<T> user(String user) {
105 properties.put("user", user);
106 return this;
107 }
108
109 public Builder<T> password(String password) {
110 properties.put("password", password);
111 return this;
112 }
113
114
115
116 /**
117 * Sets the JDBC properties that will be passed to
118 * {@link java.sql.DriverManager#getConnection}. The properties will only be
119 * used if the {@code url} has been set in the builder.
120 *
121 * @param properties
122 * the jdbc properties
123 * @return this
124 */
125 public Builder<T> properties(Properties properties) {
126 this.properties = properties;
127 return this;
128 }
129
130 /**
131 * Adds the given property specified by key and value to the JDBC properties
132 * that will be passed to {@link java.sql.DriverManager#getConnection}. The
133 * properties will only be used if the {@code url} has been set in the builder.
134 *
135 * @param key
136 * property key
137 * @param value
138 * property value
139 * @return this
140 */
141 public Builder<T> property(Object key, Object value) {
142 this.properties.put(key, value);
143 return this;
144 }
145
146 /**
147 * Sets the max time a {@link Connection} can be idle (checked in to pool)
148 * before it is released from the pool (the Connection is closed).
149 *
150 * @param duration
151 * the period before which an idle Connection is released from the
152 * pool (closed).
153 * @param unit
154 * time unit
155 * @return this
156 */
157 public Builder<T> maxIdleTime(long duration, TimeUnit unit) {
158 this.maxIdleTimeMs = unit.toMillis(duration);
159 return this;
160 }
161
162 /**
163 * Sets the minimum time that a connection must be idle (checked in) before on
164 * the next checkout its validity is checked using the health check function. If
165 * the health check fails then the Connection is closed (ignoring error) and
166 * released from the pool. Another Connection is then scheduled for creation
167 * (using the createRetryInterval delay).
168 *
169 * @param duration
170 * minimum time a connection must be idle before its validity is
171 * checked on checkout from the pool
172 * @param unit
173 * time unit
174 * @return this
175 */
176 public Builder<T> idleTimeBeforeHealthCheck(long duration, TimeUnit unit) {
177 this.idleTimeBeforeHealthCheckMs = unit.toMillis(duration);
178 return this;
179 }
180
181 /**
182 * Sets the retry interval in the case that creating/reestablishing a
183 * {@link Connection} for use in the pool fails.
184 *
185 * @param duration
186 * Connection creation retry interval
187 * @param unit
188 * time unit
189 * @return this
190 */
191 public Builder<T> connectionRetryInterval(long duration, TimeUnit unit) {
192 this.connectionRetryIntervalMs = unit.toMillis(duration);
193 return this;
194 }
195
196 /**
197 * Sets the health check for a Connection in the pool that is run only if the
198 * time since the last checkout of this Connection finished is more than
199 * idleTimeBeforeHealthCheck and a checkout of this Connection has just been
200 * requested.
201 *
202 * @param healthCheck
203 * check to run on Connection. Returns true if and only if the
204 * Connection is valid/healthy.
205 * @return this
206 */
207 public Builder<T> healthCheck(Predicate<? super Connection> healthCheck) {
208 this.healthCheck = healthCheck;
209 return this;
210 }
211
212 /**
213 * Sets the health check for a Connection in the pool that is run only if the
214 * time since the last checkout of this Connection finished is more than
215 * idleTimeBeforeHealthCheck and a checkout of this Connection has just been
216 * requested.
217 *
218 * @param databaseType
219 * the check to run is chosen based on the database type
220 * @return this
221 */
222 public Builder<T> healthCheck(DatabaseType databaseType) {
223 return healthCheck(databaseType.healthCheck());
224 }
225
226 /**
227 * Sets the health check for a Connection in the pool that is run only if the
228 * time since the last checkout of this Connection finished is more than
229 * idleTimeBeforeHealthCheck and a checkout of this Connection has just been
230 * requested.
231 *
232 * @param sql
233 * sql to run to check the validity of the connection. If the sql is
234 * run without error then the connection is assumed healthy.
235 * @return this
236 */
237 public Builder<T> healthCheck(String sql) {
238 return healthCheck(new HealthCheckPredicate(sql));
239 }
240
241 /**
242 * Sets a listener for connection success and failure. Success and failure
243 * events are reported serially to the listener. If the consumer throws it will
244 * be reported to {@code RxJavaPlugins.onError}. This consumer should not block
245 * otherwise it will block the connection pool itself.
246 *
247 * @param c
248 * listener for connection events
249 * @return this
250 */
251 public Builder<T> connectionListener(Consumer<? super Optional<Throwable>> c) {
252 Preconditions.checkArgument(c != null, "listener can only be set once");
253 this.c = c;
254 return this;
255 }
256
257 /**
258 * Sets the maximum connection pool size. Default is 5.
259 *
260 * @param maxPoolSize
261 * maximum number of connections in the pool
262 * @return this
263 */
264 public Builder<T> maxPoolSize(int maxPoolSize) {
265 this.maxPoolSize = maxPoolSize;
266 return this;
267 }
268
269 /**
270 * Sets the scheduler used for emitting connections (must be scheduled to
271 * another thread to break the chain of stack calls otherwise can get
272 * StackOverflowError) and for scheduling timeouts and retries. Defaults to
273 * {@code Schedulers.from(Executors.newFixedThreadPool(maxPoolSize))}. Do not
274 * set the scheduler to {@code Schedulers.trampoline()} because queries will
275 * block waiting for timeout workers. Also, do not use a single-threaded
276 * {@link Scheduler} because you may encounter {@link StackOverflowError}.
277 *
278 * @param scheduler
279 * scheduler to use for emitting connections and for scheduling
280 * timeouts and retries. Defaults to
281 * {@code Schedulers.from(Executors.newFixedThreadPool(maxPoolSize))}.
282 * Do not use {@code Schedulers.trampoline()}.
283 * @throws IllegalArgumentException
284 * if trampoline specified
285 * @return this
286 */
287 public Builder<T> scheduler(Scheduler scheduler) {
288 Preconditions.checkArgument(scheduler != Schedulers.trampoline(),
289 "do not use trampoline scheduler because of risk of stack overflow");
290 this.scheduler = scheduler;
291 return this;
292 }
293
294 public T build() {
295 if (scheduler == null) {
296 ExecutorService executor = Executors.newFixedThreadPool(maxPoolSize);
297 scheduler = Schedulers.from(executor, false);
298 }
299 if (url != null) {
300 cp = Util.connectionProvider(url, properties);
301 } else if (cp == null) {
302 throw new IllegalArgumentException("connectionProvider or url must be set");
303 }
304 Consumer<Optional<Throwable>> listener;
305 if (c == null) {
306 listener = null;
307 } else {
308 listener = new SerializedConnectionListener(c);
309 }
310 NonBlockingConnectionPool p = new NonBlockingConnectionPool(NonBlockingPool //
311 .factory(() -> {
312 try {
313 Connection con = cp.get();
314 if (listener != null) {
315 try {
316 listener.accept(Optional.empty());
317 } catch (Throwable e) {
318 RxJavaPlugins.onError(e);
319 }
320 }
321 return con;
322 } catch (Throwable e) {
323 if (listener != null) {
324 try {
325 listener.accept(Optional.of(e));
326 } catch (Throwable e2) {
327 RxJavaPlugins.onError(e2);
328 }
329 }
330 throw e;
331 }
332 }) //
333 .checkinDecorator((con, checkin) -> new PooledConnection(con, checkin)) //
334 .idleTimeBeforeHealthCheck(idleTimeBeforeHealthCheckMs, TimeUnit.MILLISECONDS) //
335 .maxIdleTime(maxIdleTimeMs, TimeUnit.MILLISECONDS) //
336 .createRetryInterval(connectionRetryIntervalMs, TimeUnit.MILLISECONDS) //
337 .scheduler(scheduler) //
338 .disposer(disposer)//
339 .healthCheck(healthCheck) //
340 .scheduler(scheduler) //
341 .maxSize(maxPoolSize) //
342 );
343 return transform.apply(p);
344 }
345
346 }
347
348 @Override
349 public Single<Member<Connection>> member() {
350 return pool.get().member();
351 }
352
353 @Override
354 public void close() {
355 pool.get().close();
356 }
357
358 }