1 package org.davidmoten.rxjava3.jdbc.pool;
2
3 import java.sql.Connection;
4 import java.util.Optional;
5 import java.util.Properties;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors;
8 import java.util.concurrent.TimeUnit;
9 import java.util.concurrent.atomic.AtomicReference;
10 import java.util.function.Function;
11
12 import javax.sql.DataSource;
13
14 import org.davidmoten.rxjava3.jdbc.ConnectionProvider;
15 import org.davidmoten.rxjava3.jdbc.Util;
16 import org.davidmoten.rxjava3.jdbc.internal.SingletonConnectionProvider;
17 import org.davidmoten.rxjava3.jdbc.pool.internal.HealthCheckPredicate;
18 import org.davidmoten.rxjava3.jdbc.pool.internal.PooledConnection;
19 import org.davidmoten.rxjava3.jdbc.pool.internal.SerializedConnectionListener;
20 import org.davidmoten.rxjava3.pool.Member;
21 import org.davidmoten.rxjava3.pool.NonBlockingPool;
22 import org.davidmoten.rxjava3.pool.Pool;
23
24 import com.github.davidmoten.guavamini.Preconditions;
25
26 import io.reactivex.rxjava3.core.Scheduler;
27 import io.reactivex.rxjava3.core.Single;
28 import io.reactivex.rxjava3.functions.Consumer;
29 import io.reactivex.rxjava3.functions.Predicate;
30 import io.reactivex.rxjava3.plugins.RxJavaPlugins;
31 import io.reactivex.rxjava3.schedulers.Schedulers;
32
33 public final class NonBlockingConnectionPool implements Pool<Connection> {
34
35 private final AtomicReference<NonBlockingPool<Connection>> pool = new AtomicReference<>();
36
37 NonBlockingConnectionPool(NonBlockingPool.Builder<Connection> builder) {
38 pool.set(builder.build());
39 }
40
41 public static Builder<NonBlockingConnectionPool> builder() {
42 return new Builder<NonBlockingConnectionPool>(x -> x);
43 }
44
45 public static final class Builder<T> {
46
47 private ConnectionProvider cp;
48 private Predicate<? super Connection> healthCheck = c -> true;
49 private int maxPoolSize = 5;
50 private long idleTimeBeforeHealthCheckMs = 60000;
51 private long maxIdleTimeMs = 30 * 60000;
52 private long connectionRetryIntervalMs = 30000;
53 private Consumer<? super Connection> disposer = Util::closeSilently;
54 private Scheduler scheduler = null;
55 private Properties properties = new Properties();
56 private final Function<NonBlockingConnectionPool, T> transform;
57 private String url;
58 private Consumer<? super Optional<Throwable>> c;
59
60 public Builder(Function<NonBlockingConnectionPool, T> transform) {
61 this.transform = transform;
62 this.cp = null;
63 }
64
65
66
67
68
69
70
71
72 public Builder<T> connectionProvider(ConnectionProvider cp) {
73 Preconditions.checkArgument(!(cp instanceof SingletonConnectionProvider),
74 "connection provider should not return a singleton connection because "
75 + "a pool needs control over the creation and closing of connections. "
76 + "Use ConnectionProvider.from(url,...) instead.");
77 this.cp = cp;
78 return this;
79 }
80
81
82
83
84
85
86
87
88 public Builder<T> connectionProvider(DataSource ds) {
89 return connectionProvider(Util.connectionProvider(ds));
90 }
91
92
93
94
95
96
97
98
99 public Builder<T> url(String url) {
100 this.url = url;
101 return this;
102 }
103
104 public Builder<T> user(String user) {
105 properties.put("user", user);
106 return this;
107 }
108
109 public Builder<T> password(String password) {
110 properties.put("password", password);
111 return this;
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125 public Builder<T> properties(Properties properties) {
126 this.properties = properties;
127 return this;
128 }
129
130
131
132
133
134
135
136
137
138
139
140
141 public Builder<T> property(Object key, Object value) {
142 this.properties.put(key, value);
143 return this;
144 }
145
146
147
148
149
150
151
152
153
154
155
156
157 public Builder<T> maxIdleTime(long duration, TimeUnit unit) {
158 this.maxIdleTimeMs = unit.toMillis(duration);
159 return this;
160 }
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176 public Builder<T> idleTimeBeforeHealthCheck(long duration, TimeUnit unit) {
177 this.idleTimeBeforeHealthCheckMs = unit.toMillis(duration);
178 return this;
179 }
180
181
182
183
184
185
186
187
188
189
190
191 public Builder<T> connectionRetryInterval(long duration, TimeUnit unit) {
192 this.connectionRetryIntervalMs = unit.toMillis(duration);
193 return this;
194 }
195
196
197
198
199
200
201
202
203
204
205
206
207 public Builder<T> healthCheck(Predicate<? super Connection> healthCheck) {
208 this.healthCheck = healthCheck;
209 return this;
210 }
211
212
213
214
215
216
217
218
219
220
221
222 public Builder<T> healthCheck(DatabaseType databaseType) {
223 return healthCheck(databaseType.healthCheck());
224 }
225
226
227
228
229
230
231
232
233
234
235
236
237 public Builder<T> healthCheck(String sql) {
238 return healthCheck(new HealthCheckPredicate(sql));
239 }
240
241
242
243
244
245
246
247
248
249
250
251 public Builder<T> connectionListener(Consumer<? super Optional<Throwable>> c) {
252 Preconditions.checkArgument(c != null, "listener can only be set once");
253 this.c = c;
254 return this;
255 }
256
257
258
259
260
261
262
263
264 public Builder<T> maxPoolSize(int maxPoolSize) {
265 this.maxPoolSize = maxPoolSize;
266 return this;
267 }
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287 public Builder<T> scheduler(Scheduler scheduler) {
288 Preconditions.checkArgument(scheduler != Schedulers.trampoline(),
289 "do not use trampoline scheduler because of risk of stack overflow");
290 this.scheduler = scheduler;
291 return this;
292 }
293
294 public T build() {
295 if (scheduler == null) {
296 ExecutorService executor = Executors.newFixedThreadPool(maxPoolSize);
297 scheduler = Schedulers.from(executor, false);
298 }
299 if (url != null) {
300 cp = Util.connectionProvider(url, properties);
301 } else if (cp == null) {
302 throw new IllegalArgumentException("connectionProvider or url must be set");
303 }
304 Consumer<Optional<Throwable>> listener;
305 if (c == null) {
306 listener = null;
307 } else {
308 listener = new SerializedConnectionListener(c);
309 }
310 NonBlockingConnectionPool p = new NonBlockingConnectionPool(NonBlockingPool
311 .factory(() -> {
312 try {
313 Connection con = cp.get();
314 if (listener != null) {
315 try {
316 listener.accept(Optional.empty());
317 } catch (Throwable e) {
318 RxJavaPlugins.onError(e);
319 }
320 }
321 return con;
322 } catch (Throwable e) {
323 if (listener != null) {
324 try {
325 listener.accept(Optional.of(e));
326 } catch (Throwable e2) {
327 RxJavaPlugins.onError(e2);
328 }
329 }
330 throw e;
331 }
332 })
333 .checkinDecorator((con, checkin) -> new PooledConnection(con, checkin))
334 .idleTimeBeforeHealthCheck(idleTimeBeforeHealthCheckMs, TimeUnit.MILLISECONDS)
335 .maxIdleTime(maxIdleTimeMs, TimeUnit.MILLISECONDS)
336 .createRetryInterval(connectionRetryIntervalMs, TimeUnit.MILLISECONDS)
337 .scheduler(scheduler)
338 .disposer(disposer)
339 .healthCheck(healthCheck)
340 .scheduler(scheduler)
341 .maxSize(maxPoolSize)
342 );
343 return transform.apply(p);
344 }
345
346 }
347
348 @Override
349 public Single<Member<Connection>> member() {
350 return pool.get().member();
351 }
352
353 @Override
354 public void close() {
355 pool.get().close();
356 }
357
358 }