1 package org.davidmoten.rx.jdbc.pool;
2
3 import java.sql.Connection;
4 import java.util.Optional;
5 import java.util.Properties;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors;
8 import java.util.concurrent.TimeUnit;
9 import java.util.concurrent.atomic.AtomicReference;
10 import java.util.function.Function;
11
12 import javax.sql.DataSource;
13
14 import org.davidmoten.rx.jdbc.ConnectionProvider;
15 import org.davidmoten.rx.jdbc.Util;
16 import org.davidmoten.rx.jdbc.internal.SingletonConnectionProvider;
17 import org.davidmoten.rx.jdbc.pool.internal.HealthCheckPredicate;
18 import org.davidmoten.rx.jdbc.pool.internal.PooledConnection;
19 import org.davidmoten.rx.jdbc.pool.internal.SerializedConnectionListener;
20 import org.davidmoten.rx.pool.Member;
21 import org.davidmoten.rx.pool.NonBlockingPool;
22 import org.davidmoten.rx.pool.Pool;
23
24 import com.github.davidmoten.guavamini.Preconditions;
25
26 import io.reactivex.Scheduler;
27 import io.reactivex.Single;
28 import io.reactivex.functions.Consumer;
29 import io.reactivex.functions.Predicate;
30 import io.reactivex.internal.schedulers.ExecutorScheduler;
31 import io.reactivex.plugins.RxJavaPlugins;
32 import io.reactivex.schedulers.Schedulers;
33
34 public final class NonBlockingConnectionPool implements Pool<Connection> {
35
36 private final AtomicReference<NonBlockingPool<Connection>> pool = new AtomicReference<>();
37
38 NonBlockingConnectionPool(org.davidmoten.rx.pool.NonBlockingPool.Builder<Connection> builder) {
39 pool.set(builder.build());
40 }
41
42 public static Builder<NonBlockingConnectionPool> builder() {
43 return new Builder<NonBlockingConnectionPool>(x -> x);
44 }
45
46 public static final class Builder<T> {
47
48 private ConnectionProvider cp;
49 private Predicate<? super Connection> healthCheck = c -> true;
50 private int maxPoolSize = 5;
51 private long idleTimeBeforeHealthCheckMs = 60000;
52 private long maxIdleTimeMs = 30 * 60000;
53 private long connectionRetryIntervalMs = 30000;
54 private Consumer<? super Connection> disposer = Util::closeSilently;
55 private Scheduler scheduler = null;
56 private Properties properties = new Properties();
57 private final Function<NonBlockingConnectionPool, T> transform;
58 private String url;
59 private Consumer<? super Optional<Throwable>> c;
60
61 public Builder(Function<NonBlockingConnectionPool, T> transform) {
62 this.transform = transform;
63 }
64
65
66
67
68
69
70
71
72 public Builder<T> connectionProvider(ConnectionProvider cp) {
73 Preconditions.checkArgument(!(cp instanceof SingletonConnectionProvider),
74 "connection provider should not return a singleton connection because "
75 + "a pool needs control over the creation and closing of connections. "
76 + "Use ConnectionProvider.from(url,...) instead.");
77 this.cp = cp;
78 return this;
79 }
80
81
82
83
84
85
86
87
88 public Builder<T> connectionProvider(DataSource ds) {
89 return connectionProvider(Util.connectionProvider(ds));
90 }
91
92
93
94
95
96
97
98
99 public Builder<T> url(String url) {
100 this.url = url;
101 return this;
102 }
103
104 public Builder<T> user(String user) {
105 properties.put("user", user);
106 return this;
107 }
108
109 public Builder<T> password(String password) {
110 properties.put("password", password);
111 return this;
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125 public Builder<T> properties(Properties properties) {
126 this.properties = properties;
127 return this;
128 }
129
130
131
132
133
134
135
136
137
138
139
140
141 public Builder<T> property(Object key, Object value) {
142 this.properties.put(key, value);
143 return this;
144 }
145
146
147
148
149
150
151
152
153
154
155
156
157 public Builder<T> maxIdleTime(long duration, TimeUnit unit) {
158 this.maxIdleTimeMs = unit.toMillis(duration);
159 return this;
160 }
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176 public Builder<T> idleTimeBeforeHealthCheck(long duration, TimeUnit unit) {
177 this.idleTimeBeforeHealthCheckMs = unit.toMillis(duration);
178 return this;
179 }
180
181
182
183
184
185
186
187
188
189
190
191 public Builder<T> connectionRetryInterval(long duration, TimeUnit unit) {
192 this.connectionRetryIntervalMs = unit.toMillis(duration);
193 return this;
194 }
195
196
197
198
199
200
201
202
203
204
205
206
207 public Builder<T> healthCheck(Predicate<? super Connection> healthCheck) {
208 this.healthCheck = healthCheck;
209 return this;
210 }
211
212
213
214
215
216
217
218
219
220
221
222 public Builder<T> healthCheck(DatabaseType databaseType) {
223 return healthCheck(databaseType.healthCheck());
224 }
225
226
227
228
229
230
231
232
233
234
235
236
237 public Builder<T> healthCheck(String sql) {
238 return healthCheck(new HealthCheckPredicate(sql));
239 }
240
241
242
243
244
245
246
247
248
249
250
251 public Builder<T> connectionListener(Consumer<? super Optional<Throwable>> c) {
252 Preconditions.checkArgument(c != null, "listener can only be set once");
253 this.c = c;
254 return this;
255 }
256
257
258
259
260
261
262
263
264 public Builder<T> maxPoolSize(int maxPoolSize) {
265 this.maxPoolSize = maxPoolSize;
266 return this;
267 }
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287 public Builder<T> scheduler(Scheduler scheduler) {
288 Preconditions.checkArgument(scheduler != Schedulers.trampoline(),
289 "do not use trampoline scheduler because of risk of stack overflow");
290 this.scheduler = scheduler;
291 return this;
292 }
293
294 public T build() {
295 if (scheduler == null) {
296 ExecutorService executor = Executors.newFixedThreadPool(maxPoolSize);
297 scheduler = new ExecutorScheduler(executor, false);
298 }
299 if (url != null) {
300 cp = Util.connectionProvider(url, properties);
301 }
302 Consumer<Optional<Throwable>> listener;
303 if (c == null) {
304 listener = null;
305 } else {
306 listener = new SerializedConnectionListener(c);
307 }
308 NonBlockingConnectionPool p = new NonBlockingConnectionPool(NonBlockingPool
309 .factory(() -> {
310 try {
311 Connection con = cp.get();
312 if (listener != null) {
313 try {
314 listener.accept(Optional.empty());
315 } catch (Throwable e) {
316 RxJavaPlugins.onError(e);
317 }
318 }
319 return con;
320 } catch (Throwable e) {
321 if (listener != null) {
322 try {
323 listener.accept(Optional.of(e));
324 } catch (Throwable e2) {
325 RxJavaPlugins.onError(e2);
326 }
327 }
328 throw e;
329 }
330 })
331 .checkinDecorator((con, checkin) -> new PooledConnection(con, checkin))
332 .idleTimeBeforeHealthCheck(idleTimeBeforeHealthCheckMs, TimeUnit.MILLISECONDS)
333 .maxIdleTime(maxIdleTimeMs, TimeUnit.MILLISECONDS)
334 .createRetryInterval(connectionRetryIntervalMs, TimeUnit.MILLISECONDS)
335 .scheduler(scheduler)
336 .disposer(disposer)
337 .healthCheck(healthCheck)
338 .scheduler(scheduler)
339 .maxSize(maxPoolSize)
340 );
341 return transform.apply(p);
342 }
343
344 }
345
346 @Override
347 public Single<Member<Connection>> member() {
348 return pool.get().member();
349 }
350
351 @Override
352 public void close() {
353 pool.get().close();
354 }
355
356 }