1 package org.davidmoten.rx.jdbc;
2
3 import java.sql.Connection;
4 import java.sql.DriverManager;
5 import java.sql.PreparedStatement;
6 import java.sql.SQLException;
7 import java.sql.Types;
8 import java.util.concurrent.CountDownLatch;
9 import java.util.concurrent.TimeUnit;
10 import java.util.concurrent.atomic.AtomicBoolean;
11 import java.util.concurrent.atomic.AtomicInteger;
12
13 import javax.annotation.Nonnull;
14 import javax.annotation.Nullable;
15 import javax.sql.DataSource;
16
17 import org.davidmoten.rx.internal.FlowableSingleDeferUntilRequest;
18 import org.davidmoten.rx.jdbc.exceptions.SQLRuntimeException;
19 import org.davidmoten.rx.jdbc.pool.NonBlockingConnectionPool;
20 import org.davidmoten.rx.jdbc.pool.Pools;
21 import org.davidmoten.rx.jdbc.pool.internal.ConnectionProviderBlockingPool;
22 import org.davidmoten.rx.pool.Member;
23 import org.davidmoten.rx.pool.Pool;
24
25 import com.github.davidmoten.guavamini.Preconditions;
26
27 import io.reactivex.Completable;
28 import io.reactivex.Flowable;
29 import io.reactivex.Single;
30 import io.reactivex.functions.Action;
31 import io.reactivex.functions.Consumer;
32 import io.reactivex.functions.Function;
33
34 public final class Database implements AutoCloseable {
35
36 private final Pool<Connection> pool;
37 private final Single<Connection> connection;
38
39 private final Action onClose;
40
41 private Database(@Nonnull Pool<Connection> pool, @Nonnull Action onClose) {
42 this.pool = pool;
43 this.connection = pool.member().map(x -> {
44 if (x.value() == null) {
45 throw new NullPointerException("connection is null!");
46 }
47 return x.value();
48 });
49 this.onClose = onClose;
50 }
51
52 public static NonBlockingConnectionPool.Builder<Database> nonBlocking() {
53 return new NonBlockingConnectionPool.Builder<Database>(pool -> Database.from(pool, () -> pool.close()));
54 }
55
56 public static Database from(@Nonnull String url, int maxPoolSize) {
57 Preconditions.checkNotNull(url, "url cannot be null");
58 Preconditions.checkArgument(maxPoolSize > 0, "maxPoolSize must be greater than 0");
59 NonBlockingConnectionPool pool = Pools.nonBlocking()
60 .url(url)
61 .maxPoolSize(maxPoolSize)
62 .build();
63 return Database.from(
64 pool,
65 () -> {
66 pool.close();
67 });
68 }
69
70 public static Database from(@Nonnull Pool<Connection> pool) {
71 Preconditions.checkNotNull(pool, "pool canot be null");
72 return new Database(pool, () -> pool.close());
73 }
74
75 public static Database from(@Nonnull Pool<Connection> pool, Action closeAction) {
76 Preconditions.checkNotNull(pool, "pool canot be null");
77 return new Database(pool, closeAction);
78 }
79
80 public static Database fromBlocking(@Nonnull ConnectionProvider cp) {
81 return Database.from(new ConnectionProviderBlockingPool(cp));
82 }
83
84 public static Database fromBlocking(@Nonnull DataSource dataSource) {
85 return fromBlocking(Util.connectionProvider(dataSource));
86 }
87
88 public static Database test(int maxPoolSize) {
89 Preconditions.checkArgument(maxPoolSize > 0, "maxPoolSize must be greater than 0");
90 return Database.from(
91 Pools.nonBlocking()
92 .connectionProvider(testConnectionProvider())
93 .maxPoolSize(maxPoolSize)
94 .build());
95 }
96
97 static ConnectionProvider testConnectionProvider() {
98 return testConnectionProvider(nextUrl());
99 }
100
101
102
103
104
105
106
107 public static Database test() {
108 return test(3);
109 }
110
111 private static void createTestDatabase(@Nonnull Connection c) {
112 try {
113 Sql
114 .statements(Database.class.getResourceAsStream("/database-test.sql"))
115 .stream()
116 .forEach(x -> {
117 try (PreparedStatement s = c.prepareStatement(x)) {
118 s.execute();
119 } catch (SQLException e) {
120 throw new SQLRuntimeException(e);
121 }
122 });
123 c.commit();
124 } catch (SQLException e) {
125 throw new SQLRuntimeException(e);
126 }
127 }
128
129 private static ConnectionProvider testConnectionProvider(@Nonnull String url) {
130 return new ConnectionProvider() {
131
132 private final AtomicBoolean once = new AtomicBoolean();
133 private final CountDownLatch latch = new CountDownLatch(1);
134
135 @Override
136 public Connection get() {
137 try {
138 Connection c = DriverManager.getConnection(url);
139 if (once.compareAndSet(false, true)) {
140 createTestDatabase(c);
141 latch.countDown();
142 } else {
143 if (!latch.await(1, TimeUnit.MINUTES)) {
144 throw new SQLRuntimeException("waited 1 minute but test database was not created");
145 }
146 }
147 return c;
148 } catch (SQLException e) {
149 throw new SQLRuntimeException(e);
150 } catch (InterruptedException e) {
151 throw new RuntimeException(e);
152 }
153 }
154
155 @Override
156 public void close() {
157
158 }
159 };
160 }
161
162 private static final AtomicInteger testDbNumber = new AtomicInteger();
163
164 private static String nextUrl() {
165 return "jdbc:derby:memory:derby" + testDbNumber.incrementAndGet() + ";create=true";
166 }
167
168 public Single<Connection> connection() {
169 return connection;
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191 public Flowable<Connection> connections() {
192 return new FlowableSingleDeferUntilRequest<Connection>(connection).repeat();
193 }
194
195 @Override
196 public void close() {
197 try {
198 onClose.run();
199 } catch (Exception e) {
200 throw new SQLRuntimeException(e);
201 }
202 }
203
204 public CallableBuilder call(@Nonnull String sql) {
205 return new CallableBuilder(sql, connection(), this);
206 }
207
208 public <T> SelectAutomappedBuilder<T> select(@Nonnull Class<T> cls) {
209 Preconditions.checkNotNull(cls, "cls cannot be null");
210 return new SelectAutomappedBuilder<T>(cls, connection, this);
211 }
212
213 public SelectBuilder select(@Nonnull String sql) {
214 Preconditions.checkNotNull(sql, "sql cannot be null");
215 return new SelectBuilder(sql, connection(), this);
216 }
217
218 public UpdateBuilder update(@Nonnull String sql) {
219 Preconditions.checkNotNull(sql, "sql cannot be null");
220 return new UpdateBuilder(sql, connection(), this);
221 }
222
223 public TransactedBuilder tx(@Nonnull Tx<?> tx) {
224 Preconditions.checkNotNull(tx, "tx cannot be null");
225 TxImpl<?> t = (TxImpl<?>) tx;
226 TransactedConnection c = t.connection().fork();
227 return new TransactedBuilder(c, this);
228 }
229
230 public static final Object NULL_CLOB = new Object();
231
232 public static final Object NULL_NUMBER = new Object();
233
234 public static Object toSentinelIfNull(@Nullable String s) {
235 if (s == null)
236 return NULL_CLOB;
237 else
238 return s;
239 }
240
241
242
243
244
245
246
247 public static final Object NULL_BLOB = new Object();
248
249 public static Object toSentinelIfNull(@Nullable byte[] bytes) {
250 if (bytes == null)
251 return NULL_BLOB;
252 else
253 return bytes;
254 }
255
256 public static Object clob(@Nullable String s) {
257 return toSentinelIfNull(s);
258 }
259
260 public static Object blob(@Nullable byte[] bytes) {
261 return toSentinelIfNull(bytes);
262 }
263
264
265
266
267
268
269
270
271 public Single<Member<Connection>> member() {
272 return pool.member();
273 }
274
275 public <T> Single<T> apply(Function<? super Connection, ? extends T> function) {
276 return member().map(member -> {
277 try {
278 return function.apply(member.value());
279 } finally {
280 member.checkin();
281 }
282 });
283 }
284
285 public <T> Completable apply(Consumer<? super Connection> consumer) {
286 return member().doOnSuccess(member -> {
287 try {
288 consumer.accept(member.value());
289 } finally {
290 member.checkin();
291 }
292 }).ignoreElement();
293 }
294
295 }