View Javadoc
1   package org.davidmoten.rx.jdbc.pool;
2   
3   import java.sql.Connection;
4   import java.util.Optional;
5   import java.util.Properties;
6   import java.util.concurrent.ExecutorService;
7   import java.util.concurrent.Executors;
8   import java.util.concurrent.TimeUnit;
9   import java.util.concurrent.atomic.AtomicReference;
10  import java.util.function.Function;
11  
12  import javax.sql.DataSource;
13  
14  import org.davidmoten.rx.jdbc.ConnectionProvider;
15  import org.davidmoten.rx.jdbc.Util;
16  import org.davidmoten.rx.jdbc.internal.SingletonConnectionProvider;
17  import org.davidmoten.rx.jdbc.pool.internal.HealthCheckPredicate;
18  import org.davidmoten.rx.jdbc.pool.internal.PooledConnection;
19  import org.davidmoten.rx.jdbc.pool.internal.SerializedConnectionListener;
20  import org.davidmoten.rx.pool.Member;
21  import org.davidmoten.rx.pool.NonBlockingPool;
22  import org.davidmoten.rx.pool.Pool;
23  
24  import com.github.davidmoten.guavamini.Preconditions;
25  
26  import io.reactivex.Scheduler;
27  import io.reactivex.Single;
28  import io.reactivex.functions.Consumer;
29  import io.reactivex.functions.Predicate;
30  import io.reactivex.internal.schedulers.ExecutorScheduler;
31  import io.reactivex.plugins.RxJavaPlugins;
32  import io.reactivex.schedulers.Schedulers;
33  
34  public final class NonBlockingConnectionPool implements Pool<Connection> {
35  
36      private final AtomicReference<NonBlockingPool<Connection>> pool = new AtomicReference<>();
37  
38      NonBlockingConnectionPool(org.davidmoten.rx.pool.NonBlockingPool.Builder<Connection> builder) {
39          pool.set(builder.build());
40      }
41  
42      public static Builder<NonBlockingConnectionPool> builder() {
43          return new Builder<NonBlockingConnectionPool>(x -> x);
44      }
45  
46      public static final class Builder<T> {
47  
48          private ConnectionProvider cp;
49          private Predicate<? super Connection> healthCheck = c -> true;
50          private int maxPoolSize = 5;
51          private long idleTimeBeforeHealthCheckMs = 60000;
52          private long maxIdleTimeMs = 30 * 60000;
53          private long connectionRetryIntervalMs = 30000;
54          private Consumer<? super Connection> disposer = Util::closeSilently;
55          private Scheduler scheduler = null;
56          private Properties properties = new Properties();
57          private final Function<NonBlockingConnectionPool, T> transform;
58          private String url;
59          private Consumer<? super Optional<Throwable>> c;
60  
61          public Builder(Function<NonBlockingConnectionPool, T> transform) {
62              this.transform = transform;
63          }
64  
65          /**
66           * Sets the provider of {@link Connection} objects to be used by the pool.
67           * 
68           * @param cp
69           *            connection provider
70           * @return this
71           */
72          public Builder<T> connectionProvider(ConnectionProvider cp) {
73              Preconditions.checkArgument(!(cp instanceof SingletonConnectionProvider), //
74                      "connection provider should not return a singleton connection because " //
75                      + "a pool needs control over the creation and closing of connections. " //
76                      + "Use ConnectionProvider.from(url,...) instead.");
77              this.cp = cp;
78              return this;
79          }
80  
81          /**
82           * Sets the provider of {@link Connection} objects to be used by the pool.
83           * 
84           * @param ds
85           *            dataSource that providers Connections
86           * @return this
87           */
88          public Builder<T> connectionProvider(DataSource ds) {
89              return connectionProvider(Util.connectionProvider(ds));
90          }
91  
92          /**
93           * Sets the jdbc url of the {@link Connection} objects to be used by the pool.
94           * 
95           * @param url
96           *            jdbc url
97           * @return this
98           */
99          public Builder<T> url(String url) {
100             this.url = url;
101             return this;
102         }
103         
104         public Builder<T> user(String user) {
105             properties.put("user", user);
106             return this;
107         }
108         
109         public Builder<T> password(String password) {
110             properties.put("password", password);
111             return this;
112         }
113         
114         
115 
116         /**
117          * Sets the JDBC properties that will be passed to
118          * {@link java.sql.DriverManager#getConnection}. The properties will only be
119          * used if the {@code url} has been set in the builder.
120          * 
121          * @param properties
122          *            the jdbc properties
123          * @return this
124          */
125         public Builder<T> properties(Properties properties) {
126             this.properties = properties;
127             return this;
128         }
129 
130         /**
131          * Adds the given property specified by key and value to the JDBC properties
132          * that will be passed to {@link java.sql.DriverManager#getConnection}. The
133          * properties will only be used if the {@code url} has been set in the builder.
134          * 
135          * @param key
136          *            property key
137          * @param value
138          *            property value
139          * @return this
140          */
141         public Builder<T> property(Object key, Object value) {
142             this.properties.put(key, value);
143             return this;
144         }
145 
146         /**
147          * Sets the max time a {@link Connection} can be idle (checked in to pool)
148          * before it is released from the pool (the Connection is closed).
149          * 
150          * @param duration
151          *            the period before which an idle Connection is released from the
152          *            pool (closed).
153          * @param unit
154          *            time unit
155          * @return this
156          */
157         public Builder<T> maxIdleTime(long duration, TimeUnit unit) {
158             this.maxIdleTimeMs = unit.toMillis(duration);
159             return this;
160         }
161 
162         /**
163          * Sets the minimum time that a connection must be idle (checked in) before on
164          * the next checkout its validity is checked using the health check function. If
165          * the health check fails then the Connection is closed (ignoring error) and
166          * released from the pool. Another Connection is then scheduled for creation
167          * (using the createRetryInterval delay).
168          * 
169          * @param duration
170          *            minimum time a connection must be idle before its validity is
171          *            checked on checkout from the pool
172          * @param unit
173          *            time unit
174          * @return this
175          */
176         public Builder<T> idleTimeBeforeHealthCheck(long duration, TimeUnit unit) {
177             this.idleTimeBeforeHealthCheckMs = unit.toMillis(duration);
178             return this;
179         }
180 
181         /**
182          * Sets the retry interval in the case that creating/reestablishing a
183          * {@link Connection} for use in the pool fails.
184          * 
185          * @param duration
186          *            Connection creation retry interval
187          * @param unit
188          *            time unit
189          * @return this
190          */
191         public Builder<T> connectionRetryInterval(long duration, TimeUnit unit) {
192             this.connectionRetryIntervalMs = unit.toMillis(duration);
193             return this;
194         }
195 
196         /**
197          * Sets the health check for a Connection in the pool that is run only if the
198          * time since the last checkout of this Connection finished is more than
199          * idleTimeBeforeHealthCheck and a checkout of this Connection has just been
200          * requested.
201          * 
202          * @param healthCheck
203          *            check to run on Connection. Returns true if and only if the
204          *            Connection is valid/healthy.
205          * @return this
206          */
207         public Builder<T> healthCheck(Predicate<? super Connection> healthCheck) {
208             this.healthCheck = healthCheck;
209             return this;
210         }
211 
212         /**
213          * Sets the health check for a Connection in the pool that is run only if the
214          * time since the last checkout of this Connection finished is more than
215          * idleTimeBeforeHealthCheck and a checkout of this Connection has just been
216          * requested.
217          * 
218          * @param databaseType
219          *            the check to run is chosen based on the database type
220          * @return this
221          */
222         public Builder<T> healthCheck(DatabaseType databaseType) {
223             return healthCheck(databaseType.healthCheck());
224         }
225 
226         /**
227          * Sets the health check for a Connection in the pool that is run only if the
228          * time since the last checkout of this Connection finished is more than
229          * idleTimeBeforeHealthCheck and a checkout of this Connection has just been
230          * requested.
231          * 
232          * @param sql
233          *            sql to run to check the validity of the connection. If the sql is
234          *            run without error then the connection is assumed healthy.
235          * @return this
236          */
237         public Builder<T> healthCheck(String sql) {
238             return healthCheck(new HealthCheckPredicate(sql));
239         }
240 
241         /**
242          * Sets a listener for connection success and failure. Success and failure
243          * events are reported serially to the listener. If the consumer throws it will
244          * be reported to {@code RxJavaPlugins.onError}. This consumer should not block
245          * otherwise it will block the connection pool itself.
246          * 
247          * @param c
248          *            listener for connection events
249          * @return this
250          */
251         public Builder<T> connectionListener(Consumer<? super Optional<Throwable>> c) {
252             Preconditions.checkArgument(c != null, "listener can only be set once");
253             this.c = c;
254             return this;
255         }
256 
257         /**
258          * Sets the maximum connection pool size. Default is 5.
259          * 
260          * @param maxPoolSize
261          *            maximum number of connections in the pool
262          * @return this
263          */
264         public Builder<T> maxPoolSize(int maxPoolSize) {
265             this.maxPoolSize = maxPoolSize;
266             return this;
267         }
268 
269         /**
270          * Sets the scheduler used for emitting connections (must be scheduled to
271          * another thread to break the chain of stack calls otherwise can get
272          * StackOverflowError) and for scheduling timeouts and retries. Defaults to
273          * {@code Schedulers.from(Executors.newFixedThreadPool(maxPoolSize))}. Do not
274          * set the scheduler to {@code Schedulers.trampoline()} because queries will
275          * block waiting for timeout workers. Also, do not use a single-threaded
276          * {@link Scheduler} because you may encounter {@link StackOverflowError}.
277          * 
278          * @param scheduler
279          *            scheduler to use for emitting connections and for scheduling
280          *            timeouts and retries. Defaults to
281          *            {@code Schedulers.from(Executors.newFixedThreadPool(maxPoolSize))}.
282          *            Do not use {@code Schedulers.trampoline()}.
283          * @throws IllegalArgumentException
284          *             if trampoline specified
285          * @return this
286          */
287         public Builder<T> scheduler(Scheduler scheduler) {
288             Preconditions.checkArgument(scheduler != Schedulers.trampoline(),
289                     "do not use trampoline scheduler because of risk of stack overflow");
290             this.scheduler = scheduler;
291             return this;
292         }
293 
294         public T build() {
295             if (scheduler == null) {
296                 ExecutorService executor = Executors.newFixedThreadPool(maxPoolSize);
297                 scheduler = new ExecutorScheduler(executor, false);
298             }
299             if (url != null) {
300                 cp = Util.connectionProvider(url, properties);
301             }
302             Consumer<Optional<Throwable>> listener;
303             if (c == null) {
304                 listener = null;
305             } else {
306                 listener = new SerializedConnectionListener(c);
307             }
308             NonBlockingConnectionPool p = new NonBlockingConnectionPool(NonBlockingPool //
309                     .factory(() -> {
310                         try {
311                             Connection con = cp.get();
312                             if (listener != null) {
313                                 try {
314                                     listener.accept(Optional.empty());
315                                 } catch (Throwable e) {
316                                     RxJavaPlugins.onError(e);
317                                 }
318                             }
319                             return con;
320                         } catch (Throwable e) {
321                             if (listener != null) {
322                                 try {
323                                     listener.accept(Optional.of(e));
324                                 } catch (Throwable e2) {
325                                     RxJavaPlugins.onError(e2);
326                                 }
327                             }
328                             throw e;
329                         }
330                     }) //
331                     .checkinDecorator((con, checkin) -> new PooledConnection(con, checkin)) //
332                     .idleTimeBeforeHealthCheck(idleTimeBeforeHealthCheckMs, TimeUnit.MILLISECONDS) //
333                     .maxIdleTime(maxIdleTimeMs, TimeUnit.MILLISECONDS) //
334                     .createRetryInterval(connectionRetryIntervalMs, TimeUnit.MILLISECONDS) //
335                     .scheduler(scheduler) //
336                     .disposer(disposer)//
337                     .healthCheck(healthCheck) //
338                     .scheduler(scheduler) //
339                     .maxSize(maxPoolSize) //
340             );
341             return transform.apply(p);
342         }
343 
344     }
345 
346     @Override
347     public Single<Member<Connection>> member() {
348         return pool.get().member();
349     }
350 
351     @Override
352     public void close() {
353         pool.get().close();
354     }
355 
356 }