View Javadoc
1   package org.davidmoten.rx.jdbc;
2   
3   import java.sql.Connection;
4   import java.sql.DriverManager;
5   import java.sql.PreparedStatement;
6   import java.sql.SQLException;
7   import java.sql.Types;
8   import java.util.concurrent.CountDownLatch;
9   import java.util.concurrent.TimeUnit;
10  import java.util.concurrent.atomic.AtomicBoolean;
11  import java.util.concurrent.atomic.AtomicInteger;
12  
13  import javax.annotation.Nonnull;
14  import javax.annotation.Nullable;
15  import javax.sql.DataSource;
16  
17  import org.davidmoten.rx.internal.FlowableSingleDeferUntilRequest;
18  import org.davidmoten.rx.jdbc.exceptions.SQLRuntimeException;
19  import org.davidmoten.rx.jdbc.pool.NonBlockingConnectionPool;
20  import org.davidmoten.rx.jdbc.pool.Pools;
21  import org.davidmoten.rx.jdbc.pool.internal.ConnectionProviderBlockingPool;
22  import org.davidmoten.rx.pool.Member;
23  import org.davidmoten.rx.pool.Pool;
24  
25  import com.github.davidmoten.guavamini.Preconditions;
26  
27  import io.reactivex.Completable;
28  import io.reactivex.Flowable;
29  import io.reactivex.Single;
30  import io.reactivex.functions.Action;
31  import io.reactivex.functions.Consumer;
32  import io.reactivex.functions.Function;
33  
34  public final class Database implements AutoCloseable {
35  
36      private final Pool<Connection> pool;
37      private final Single<Connection> connection;
38  
39      private final Action onClose;
40  
41      private Database(@Nonnull Pool<Connection> pool, @Nonnull Action onClose) {
42          this.pool = pool;
43          this.connection = pool.member().map(x -> {
44              if (x.value() == null) {
45                  throw new NullPointerException("connection is null!");
46              }
47              return x.value();
48          });
49          this.onClose = onClose;
50      }
51  
52      public static NonBlockingConnectionPool.Builder<Database> nonBlocking() {
53          return new NonBlockingConnectionPool.Builder<Database>(pool -> Database.from(pool, () -> pool.close()));
54      }
55  
56      public static Database from(@Nonnull String url, int maxPoolSize) {
57          Preconditions.checkNotNull(url, "url cannot be null");
58          Preconditions.checkArgument(maxPoolSize > 0, "maxPoolSize must be greater than 0");
59          NonBlockingConnectionPool pool = Pools.nonBlocking() //
60                  .url(url) //
61                  .maxPoolSize(maxPoolSize) //
62                  .build();
63          return Database.from( //
64                  pool, //
65                  () -> {
66                      pool.close();
67                  });
68      }
69  
70      public static Database from(@Nonnull Pool<Connection> pool) {
71          Preconditions.checkNotNull(pool, "pool canot be null");
72          return new Database(pool, () -> pool.close());
73      }
74  
75      public static Database from(@Nonnull Pool<Connection> pool, Action closeAction) {
76          Preconditions.checkNotNull(pool, "pool canot be null");
77          return new Database(pool, closeAction);
78      }
79  
80      public static Database fromBlocking(@Nonnull ConnectionProvider cp) {
81          return Database.from(new ConnectionProviderBlockingPool(cp));
82      }
83  
84      public static Database fromBlocking(@Nonnull DataSource dataSource) {
85          return fromBlocking(Util.connectionProvider(dataSource));
86      }
87  
88      public static Database test(int maxPoolSize) {
89          Preconditions.checkArgument(maxPoolSize > 0, "maxPoolSize must be greater than 0");
90          return Database.from( //
91                  Pools.nonBlocking() //
92                          .connectionProvider(testConnectionProvider()) //
93                          .maxPoolSize(maxPoolSize) //
94                          .build());
95      }
96  
97      static ConnectionProvider testConnectionProvider() {
98          return testConnectionProvider(nextUrl());
99      }
100 
101     /**
102      * Returns a new testing Apache Derby in-memory database with a connection pool
103      * of size 3.
104      * 
105      * @return new testing Database instance
106      */
107     public static Database test() {
108         return test(3);
109     }
110 
111     private static void createTestDatabase(@Nonnull Connection c) {
112         try {
113             Sql //
114                     .statements(Database.class.getResourceAsStream("/database-test.sql")) //
115                     .stream() //
116                     .forEach(x -> {
117                         try (PreparedStatement s = c.prepareStatement(x)) {
118                             s.execute();
119                         } catch (SQLException e) {
120                             throw new SQLRuntimeException(e);
121                         }
122                     });
123             c.commit();
124         } catch (SQLException e) {
125             throw new SQLRuntimeException(e);
126         }
127     }
128 
129     private static ConnectionProvider testConnectionProvider(@Nonnull String url) {
130         return new ConnectionProvider() {
131 
132             private final AtomicBoolean once = new AtomicBoolean();
133             private final CountDownLatch latch = new CountDownLatch(1);
134 
135             @Override
136             public Connection get() {
137                 try {
138                     Connection c = DriverManager.getConnection(url);
139                     if (once.compareAndSet(false, true)) {
140                         createTestDatabase(c);
141                         latch.countDown();
142                     } else {
143                         if (!latch.await(1, TimeUnit.MINUTES)) {
144                             throw new SQLRuntimeException("waited 1 minute but test database was not created");
145                         }
146                     }
147                     return c;
148                 } catch (SQLException e) {
149                     throw new SQLRuntimeException(e);
150                 } catch (InterruptedException e) {
151                     throw new RuntimeException(e);
152                 }
153             }
154 
155             @Override
156             public void close() {
157                 //
158             }
159         };
160     }
161 
162     private static final AtomicInteger testDbNumber = new AtomicInteger();
163 
164     private static String nextUrl() {
165         return "jdbc:derby:memory:derby" + testDbNumber.incrementAndGet() + ";create=true";
166     }
167 
168     public Single<Connection> connection() {
169         return connection;
170     }
171 
172     /**
173      * <p>
174      * Returns a flowable stream of checked out Connections from the pool. It's
175      * preferrable to use the {@code connection()} method and subscribe to a
176      * MemberSingle instead because the sometimes surprising request patterns of
177      * Flowable operators may mean that more Connections are checked out from the
178      * pool than are needed. For instance if you use
179      * 
180      * <pre>
181      * Flowable&lt;Connection&gt; cons = Database.connection().repeat()
182      * </pre>
183      * <p>
184      * then you will checkout more (1 more) Connection with {@code repeat} than you
185      * requested because {@code repeat} subscribes one more time than dictated by
186      * the requests (buffers).
187      * 
188      * @return stream of checked out connections from the pool. When you call
189      *         {@code close()} on a connection it is returned to the pool
190      */
191     public Flowable<Connection> connections() {
192         return new FlowableSingleDeferUntilRequest<Connection>(connection).repeat();
193     }
194 
195     @Override
196     public void close() {
197         try {
198             onClose.run();
199         } catch (Exception e) {
200             throw new SQLRuntimeException(e);
201         }
202     }
203 
204     public CallableBuilder call(@Nonnull String sql) {
205         return new CallableBuilder(sql, connection(), this);
206     }
207 
208     public <T> SelectAutomappedBuilder<T> select(@Nonnull Class<T> cls) {
209         Preconditions.checkNotNull(cls, "cls cannot be null");
210         return new SelectAutomappedBuilder<T>(cls, connection, this);
211     }
212 
213     public SelectBuilder select(@Nonnull String sql) {
214         Preconditions.checkNotNull(sql, "sql cannot be null");
215         return new SelectBuilder(sql, connection(), this);
216     }
217 
218     public UpdateBuilder update(@Nonnull String sql) {
219         Preconditions.checkNotNull(sql, "sql cannot be null");
220         return new UpdateBuilder(sql, connection(), this);
221     }
222 
223     public TransactedBuilder tx(@Nonnull Tx<?> tx) {
224         Preconditions.checkNotNull(tx, "tx cannot be null");
225         TxImpl<?> t = (TxImpl<?>) tx;
226         TransactedConnection c = t.connection().fork();
227         return new TransactedBuilder(c, this);
228     }
229 
230     public static final Object NULL_CLOB = new Object();
231 
232     public static final Object NULL_NUMBER = new Object();
233 
234     public static Object toSentinelIfNull(@Nullable String s) {
235         if (s == null)
236             return NULL_CLOB;
237         else
238             return s;
239     }
240 
241     /**
242      * Sentinel object used to indicate in parameters of a query that rather than
243      * calling {@link PreparedStatement#setObject(int, Object)} with a null we call
244      * {@link PreparedStatement#setNull(int, int)} with {@link Types#CLOB}. This is
245      * required by many databases for setting CLOB and BLOB fields to null.
246      */
247     public static final Object NULL_BLOB = new Object();
248     
249     public static Object toSentinelIfNull(@Nullable byte[] bytes) {
250         if (bytes == null)
251             return NULL_BLOB;
252         else
253             return bytes;
254     }
255 
256     public static Object clob(@Nullable String s) {
257         return toSentinelIfNull(s);
258     }
259 
260     public static Object blob(@Nullable byte[] bytes) {
261         return toSentinelIfNull(bytes);
262     }
263     
264     /**
265      * Returns a Single of a member of the connection pool. When finished with the
266      * emitted member you must call {@code member.checkin()} to return the
267      * connection to the pool.
268      * 
269      * @return a single member of the connection pool
270      */
271     public Single<Member<Connection>> member() {
272         return pool.member();
273     }
274 
275     public <T> Single<T> apply(Function<? super Connection, ? extends T> function) {
276         return member().map(member -> {
277             try {
278                 return function.apply(member.value());
279             } finally {
280                 member.checkin();
281             }
282         });
283     }
284 
285     public <T> Completable apply(Consumer<? super Connection> consumer) {
286         return member().doOnSuccess(member -> {
287             try {
288                 consumer.accept(member.value());
289             } finally {
290                 member.checkin();
291             }
292         }).ignoreElement();
293     }
294 
295 }