View Javadoc
1   package com.github.davidmoten.rx.jdbc;
2   
3   import static com.github.davidmoten.rx.RxUtil.greaterThanZero;
4   
5   import java.io.InputStream;
6   import java.io.InputStreamReader;
7   import java.nio.charset.Charset;
8   import java.sql.Connection;
9   import java.sql.PreparedStatement;
10  import java.sql.Types;
11  
12  import javax.naming.Context;
13  import javax.sql.DataSource;
14  
15  import org.slf4j.Logger;
16  import org.slf4j.LoggerFactory;
17  
18  import rx.Observable;
19  import rx.Observable.Operator;
20  import rx.Scheduler;
21  import rx.functions.Func0;
22  import rx.functions.Func1;
23  import rx.functions.Func2;
24  import rx.observables.StringObservable;
25  import rx.schedulers.Schedulers;
26  
27  import com.github.davidmoten.rx.Functions;
28  import com.github.davidmoten.rx.RxUtil;
29  import com.github.davidmoten.rx.RxUtil.CountingAction;
30  import com.github.davidmoten.rx.jdbc.exceptions.TransactionAlreadyOpenException;
31  
32  /**
33   * Main entry point for manipulations of a database using rx-java-jdbc style
34   * queries.
35   */
36  final public class Database {
37  
38      /**
39       * Logger.
40       */
41      private static final Logger log = LoggerFactory.getLogger(Database.class);
42  
43      /**
44       * Provides access for queries to a limited subset of {@link Database}
45       * methods.
46       */
47      private final QueryContext context;
48  
49      /**
50       * ThreadLocal storage of the current {@link Scheduler} factory to use with
51       * queries.
52       */
53      private final ThreadLocal<Func0<Scheduler>> currentSchedulerFactory = new ThreadLocal<Func0<Scheduler>>();
54  
55      /**
56       * ThreadLocal storage of the current {@link ConnectionProvider} to use with
57       * queries.
58       */
59      private final ThreadLocal<ConnectionProvider> currentConnectionProvider = new ThreadLocal<ConnectionProvider>();
60  
61      private final ThreadLocal<Boolean> isTransactionOpen = new ThreadLocal<Boolean>();
62  
63      static final ThreadLocal<ResultSetCache> rsCache = new ThreadLocal<ResultSetCache>();
64  
65      static final ThreadLocal<AutoMapCache> autoMapCache = new ThreadLocal<AutoMapCache>();
66  
67      /**
68       * Records the result of the last finished transaction (committed =
69       * <code>true</code> or rolled back = <code>false</code>).
70       */
71      private final ThreadLocal<Observable<Boolean>> lastTransactionResult = new ThreadLocal<Observable<Boolean>>();
72  
73      /**
74       * Connection provider.
75       */
76      private final ConnectionProvider cp;
77  
78      /**
79       * Schedules non transactional queries.
80       */
81      private final Func0<Scheduler> nonTransactionalSchedulerFactory;
82  
83      /**
84       * Constructor.
85       * 
86       * @param cp
87       *            provides connections
88       * @param nonTransactionalSchedulerFactory
89       *            schedules non transactional queries
90       */
91      public Database(final ConnectionProvider cp, Func0<Scheduler> nonTransactionalSchedulerFactory) {
92          Conditions.checkNotNull(cp);
93          this.cp = cp;
94          currentConnectionProvider.set(cp);
95          if (nonTransactionalSchedulerFactory != null)
96              this.nonTransactionalSchedulerFactory = nonTransactionalSchedulerFactory;
97          else
98              this.nonTransactionalSchedulerFactory = CURRENT_THREAD_SCHEDULER_FACTORY;
99          this.context = new QueryContext(this);
100     }
101 
102     /**
103      * Returns the {@link ConnectionProvider}.
104      * 
105      * @return
106      */
107     public ConnectionProvider getConnectionProvider() {
108         return cp;
109     }
110 
111     /**
112      * Schedules using {@link Schedulers}.trampoline().
113      */
114     private static final Func0<Scheduler> CURRENT_THREAD_SCHEDULER_FACTORY = new Func0<Scheduler>() {
115 
116         @Override
117         public Scheduler call() {
118             return Schedulers.trampoline();
119         }
120     };
121 
122     /**
123      * Constructor. Thread pool size defaults to
124      * <code>{@link Runtime#getRuntime()}.availableProcessors()+1</code>. This
125      * may be too conservative if the database is on another server. If that is
126      * the case then you may want to use a thread pool size equal to the
127      * available processors + 1 on the database server.
128      * 
129      * @param cp
130      *            provides connections
131      */
132     public Database(ConnectionProvider cp) {
133         this(cp, null);
134     }
135 
136     /**
137      * Constructor. Uses a {@link ConnectionProviderFromUrl} based on the given
138      * url.
139      * 
140      * @param url
141      *            jdbc url
142      * @param username
143      *            username for connection
144      * @param password
145      *            password for connection
146      */
147     public Database(String url, String username, String password) {
148         this(new ConnectionProviderFromUrl(url, username, password));
149     }
150 
151     /**
152      * Constructor. Uses the single connection provided and current thread
153      * scheduler (trampoline) to run all queries. The connection will not be
154      * closed in reality though the log may indicate it as having received a
155      * close call.
156      * 
157      * @param con
158      *            the connection
159      */
160     public Database(Connection con) {
161         this(new ConnectionProviderNonClosing(con), CURRENT_THREAD_SCHEDULER_FACTORY);
162     }
163 
164     /**
165      * Returns a {@link Database} based on a jdbc connection string.
166      * 
167      * @param url
168      *            jdbc connection url
169      * @return
170      */
171     public static Database from(String url) {
172         return new Database(url, null, null);
173     }
174 
175     /**
176      * Returns a {@link Database} based on a jdbc connection string.
177      * 
178      * @param url
179      *            jdbc url
180      * @param username
181      *            username for connection
182      * @param password
183      *            password for connection
184      * @return the database object
185      */
186     public static Database from(String url, String username, String password) {
187         return new Database(url, username, password);
188     }
189 
190     /**
191      * Returns a {@link Database} based on connections obtained from a
192      * {@link DataSource} based on looking up the current {@link Context}
193      * 
194      * @param jndiResource
195      * @return
196      */
197     public static Database fromContext(String jndiResource) {
198         return new Database(new ConnectionProviderFromContext(jndiResource));
199     }
200 
201     /**
202      * Returns a {@link Database} based on connections obtained from a
203      * {@link DataSource}
204      * 
205      * @param jndiResource
206      * @return
207      */
208     public static Database fromDataSource(DataSource dataSource) {
209         return new Database(new ConnectionProviderFromDataSource(dataSource));
210     }
211 
212     /**
213      * Returns a {@link Database} that obtains {@link Connection}s on demand
214      * from the given {@link ConnectionProvider}. When {@link Database#close()}
215      * is called, {@link ConnectionProvider#close()} is called.
216      * 
217      * @param cp
218      * @return
219      */
220     public static Database from(ConnectionProvider cp) {
221         return new Database(cp);
222     }
223 
224     /**
225      * Factory method. Uses the single connection provided and current thread
226      * scheduler (trampoline) to run all queries. The connection will not be
227      * closed in reality though the log may indicate it as having received a
228      * close call.
229      * 
230      * @param con
231      *            the connection
232      */
233     public static Database from(Connection con) {
234         return new Database(con);
235     }
236 
237     /**
238      * Returns a new {@link Builder}.
239      * 
240      * @return
241      */
242     public static Builder builder() {
243         return new Builder();
244     }
245 
246     /**
247      * Builds a {@link Database}.
248      */
249     public final static class Builder {
250 
251         private ConnectionProvider cp;
252         private Func0<Scheduler> nonTransactionalSchedulerFactory = null;
253         private Pool pool = null;
254         private String url;
255         private String username;
256         private String password;
257 
258         private static class Pool {
259             int minSize;
260             int maxSize;
261 
262             Pool(int minSize, int maxSize) {
263                 super();
264                 this.minSize = minSize;
265                 this.maxSize = maxSize;
266             }
267         }
268 
269         /**
270          * Constructor.
271          */
272         private Builder() {
273         }
274 
275         /**
276          * Sets the connection provider.
277          * 
278          * @param cp
279          * @return
280          */
281         public Builder connectionProvider(ConnectionProvider cp) {
282             this.cp = cp;
283             return this;
284         }
285 
286         /**
287          * Sets the jdbc url.
288          * 
289          * @param url
290          * @return
291          */
292         public Builder url(String url) {
293             this.url = url;
294             return this;
295         }
296 
297         public Builder username(String username) {
298             this.username = username;
299             return this;
300         }
301 
302         public Builder password(String password) {
303             this.password = password;
304             return this;
305         }
306 
307         /**
308          * Sets the {@link ConnectionProvider} to use a connection pool with the
309          * given jdbc url and pool size.
310          * 
311          * @param url
312          * @param minPoolSize
313          * @param maxPoolSize
314          * @return
315          */
316         public Builder pool(int minPoolSize, int maxPoolSize) {
317             pool = new Pool(minPoolSize, maxPoolSize);
318             return this;
319         }
320 
321         /**
322          * Sets the {@link ConnectionProvider} to use a connection pool with the
323          * given jdbc url and min pool size of 0, max pool size of 10.
324          * 
325          * @param url
326          * @return
327          */
328         public Builder pooled(String url) {
329             this.cp = new ConnectionProviderPooled(url, 0, 10);
330             return this;
331         }
332 
333         /**
334          * Sets the non transactional scheduler.
335          * 
336          * @param factory
337          * @return
338          */
339         public Builder nonTransactionalScheduler(Func0<Scheduler> factory) {
340             nonTransactionalSchedulerFactory = factory;
341             return this;
342         }
343 
344         /**
345          * Requests that the non transactional queries are run using
346          * {@link Schedulers#trampoline()}.
347          * 
348          * @return
349          */
350         public Builder nonTransactionalSchedulerOnCurrentThread() {
351             nonTransactionalSchedulerFactory = CURRENT_THREAD_SCHEDULER_FACTORY;
352             return this;
353         }
354 
355         /**
356          * Returns a {@link Database}.
357          * 
358          * @return
359          */
360         public Database build() {
361             if (url != null && pool != null)
362                 cp = new ConnectionProviderPooled(url, username, password, pool.minSize,
363                         pool.maxSize);
364             else if (url != null)
365                 cp = new ConnectionProviderFromUrl(url, username, password);
366             return new Database(cp, nonTransactionalSchedulerFactory);
367         }
368     }
369 
370     /**
371      * Returns the thread local current query context (will not return null).
372      * Will return overriden context (for example using Database returned from
373      * {@link Database#beginTransaction()} if set.
374      * 
375      * @return
376      */
377     public QueryContext queryContext() {
378         return context;
379     }
380 
381     /**
382      * Returns a {@link QuerySelect.Builder} builder based on the given select
383      * statement sql.
384      * 
385      * @param sql
386      *            a select statement.
387      * @return select query builder
388      */
389     public QuerySelect.Builder select(String sql) {
390         return new QuerySelect.Builder(sql, this);
391     }
392 
393     /**
394      * Returns a {@link QuerySelect.Builder} builder and defers specifying sql
395      * to the `autoMap` Class parameter.
396      * 
397      * @return query builder
398      */
399     public QuerySelect.Builder select() {
400         return new QuerySelect.Builder(null, this);
401     }
402 
403     /**
404      * Returns a {@link QueryUpdate.Builder} builder based on the given
405      * update/insert/delete/DDL statement sql.
406      * 
407      * @param sql
408      *            an update/insert/delete/DDL statement.
409      * @return update/insert query builder
410      */
411     public QueryUpdate.Builder update(String sql) {
412         return new QueryUpdate.Builder(sql, this);
413     }
414 
415     /**
416      * Starts a transaction. Until commit() or rollback() is called on the
417      * source this will set the query context for all created queries to be a
418      * single threaded executor with one (new) connection.
419      * 
420      * @param dependency
421      * @return
422      */
423     public Observable<Boolean> beginTransaction(Observable<?> dependency) {
424         return update("begin").dependsOn(dependency).count().map(Functions.constant(true));
425     }
426 
427     /**
428      * Starts a transaction. Until commit() or rollback() is called on the
429      * source this will set the query context for all created queries to be a
430      * single threaded executor with one (new) connection.
431      * 
432      * @return
433      */
434     public Observable<Boolean> beginTransaction() {
435         return beginTransaction(Observable.empty());
436     }
437 
438     /**
439      * Returns true if and only if integer is non-zero.
440      */
441     private static final Func1<Integer, Boolean> IS_NON_ZERO = new Func1<Integer, Boolean>() {
442         @Override
443         public Boolean call(Integer i) {
444             return i != 0;
445         }
446     };
447 
448     /**
449      * Commits a transaction and resets the current query context so that
450      * further queries will use the asynchronous version by default. All
451      * Observable dependencies must be complete before commit is called.
452      * 
453      * @param depends
454      *            depdencies that must complete before commit occurs.
455      * @return
456      */
457     public Observable<Boolean> commit(Observable<?>... depends) {
458         return commitOrRollback(true, depends);
459     }
460 
461     /**
462      * Waits for the source to complete before returning the result of
463      * db.commit();
464      * 
465      * @return commit operator
466      */
467     public <T> Operator<Boolean, T> commitOperator() {
468         return commitOrRollbackOperator(true);
469     }
470 
471     /**
472      * Waits for the source to complete before returning the result of
473      * db.rollback();
474      * 
475      * @return rollback operator
476      */
477     public <T> Operator<Boolean, T> rollbackOperator() {
478         return commitOrRollbackOperator(false);
479     }
480 
481     private <T> Operator<Boolean, T> commitOrRollbackOperator(final boolean commit) {
482         final QueryUpdate.Builder updateBuilder = createCommitOrRollbackQuery(commit);
483         return RxUtil.toOperator(new Func1<Observable<T>, Observable<Boolean>>() {
484             @Override
485             public Observable<Boolean> call(Observable<T> source) {
486                 return updateBuilder.dependsOn(source).count().map(IS_NON_ZERO);
487             }
488         });
489     }
490 
491     /**
492      * Commits or rolls back a transaction depending on the <code>commit</code>
493      * parameter and resets the current query context so that further queries
494      * will use the asynchronous version by default. All Observable dependencies
495      * must be complete before commit/rollback is called.
496      * 
497      * @param commit
498      * @param depends
499      * @return
500      */
501     private Observable<Boolean> commitOrRollback(boolean commit, Observable<?>... depends) {
502 
503         QueryUpdate.Builder u = createCommitOrRollbackQuery(commit);
504         for (Observable<?> dep : depends)
505             u = u.dependsOn(dep);
506         Observable<Boolean> result = u.count().map(IS_NON_ZERO);
507         lastTransactionResult.set(result);
508         return result;
509     }
510 
511     private QueryUpdate.Builder createCommitOrRollbackQuery(boolean commit) {
512         String action;
513         if (commit)
514             action = "commit";
515         else
516             action = "rollback";
517         QueryUpdate.Builder u = update(action);
518         return u;
519     }
520 
521     /**
522      * Rolls back a transaction and resets the current query context so that
523      * further queries will use the asynchronous version by default. All
524      * Observable dependencies must be complete before rollback is called.
525      * 
526      * @param depends
527      *            depdencies that must complete before commit occurs.
528      * @return
529      * 
530      **/
531     public Observable<Boolean> rollback(Observable<?>... depends) {
532         return commitOrRollback(false, depends);
533     }
534 
535     /**
536      * Returns observable that emits true when last transaction committed or
537      * false when last transaction is rolled back.
538      * 
539      * @return
540      */
541     public Observable<Boolean> lastTransactionResult() {
542         Observable<Boolean> o = lastTransactionResult.get();
543         if (o == null)
544             return Observable.empty();
545         else
546             return o;
547     }
548 
549     /**
550      * Close the database in particular closes the {@link ConnectionProvider}
551      * for the database. For a {@link ConnectionProviderPooled} this will be a
552      * required call for cleanup.
553      * 
554      * @return
555      */
556     public Database close() {
557         log.debug("closing connection provider");
558         cp.close();
559         log.debug("closed connection provider");
560         return this;
561     }
562 
563     /**
564      * Returns the current thread local {@link Scheduler}.
565      * 
566      * @return
567      */
568     Scheduler currentScheduler() {
569         if (currentSchedulerFactory.get() == null)
570             return nonTransactionalSchedulerFactory.call();
571         else
572             return currentSchedulerFactory.get().call();
573     }
574 
575     /**
576      * Returns the current thread local {@link ConnectionProvider}.
577      * 
578      * @return
579      */
580     ConnectionProvider connectionProvider() {
581         if (currentConnectionProvider.get() == null)
582             return cp;
583         else
584             return currentConnectionProvider.get();
585     }
586 
587     /**
588      * Sets the current thread local {@link ConnectionProvider} to a singleton
589      * manual commit instance.
590      */
591     void beginTransactionObserve() {
592         log.debug("beginTransactionObserve");
593         currentConnectionProvider.set(new ConnectionProviderSingletonManualCommit(cp));
594         if (isTransactionOpen.get() != null && isTransactionOpen.get())
595             throw new TransactionAlreadyOpenException();
596         isTransactionOpen.set(true);
597     }
598 
599     /**
600      * Sets the current thread local {@link Scheduler} to be
601      * {@link Schedulers#trampoline()}.
602      */
603     void beginTransactionSubscribe() {
604         log.debug("beginTransactionSubscribe");
605         currentSchedulerFactory.set(CURRENT_THREAD_SCHEDULER_FACTORY);
606     }
607 
608     /**
609      * Resets the current thread local {@link Scheduler} to default.
610      */
611     void endTransactionSubscribe() {
612         log.debug("endTransactionSubscribe");
613         currentSchedulerFactory.set(null);
614         rsCache.set(null);
615     }
616 
617     /**
618      * Resets the current thread local {@link ConnectionProvider} to default.
619      */
620     void endTransactionObserve() {
621         log.debug("endTransactionObserve");
622         currentConnectionProvider.set(cp);
623         isTransactionOpen.set(false);
624         rsCache.set(null);
625     }
626 
627     /**
628      * Returns an {@link Operator} that performs commit or rollback of a
629      * transaction.
630      * 
631      * @param isCommit
632      * @return
633      */
634     private <T> Operator<Boolean, T> commitOrRollbackOnCompleteOperator(final boolean isCommit) {
635         return RxUtil.toOperator(new Func1<Observable<T>, Observable<Boolean>>() {
636             @Override
637             public Observable<Boolean> call(Observable<T> source) {
638                 return commitOrRollbackOnCompleteOperatorIfAtLeastOneValue(isCommit, Database.this,
639                         source);
640             }
641         });
642     }
643 
644     /**
645      * Commits current transaction on the completion of source if and only if
646      * the source sequence is non-empty.
647      * 
648      * @return operator that commits on completion of source.
649      */
650     public <T> Operator<Boolean, T> commitOnCompleteOperator() {
651         return commitOrRollbackOnCompleteOperator(true);
652     }
653 
654     /**
655      * Rolls back current transaction on the completion of source if and only if
656      * the source sequence is non-empty.
657      * 
658      * @return operator that rolls back on completion of source.
659      */
660 
661     public <T> Operator<Boolean, T> rollbackOnCompleteOperator() {
662         return commitOrRollbackOnCompleteOperator(false);
663     }
664 
665     /**
666      * Starts a database transaction for each onNext call. Following database
667      * calls will be subscribed on current thread (Schedulers.trampoline()) and
668      * share the same {@link Connection} until transaction is rolled back or
669      * committed.
670      * 
671      * @return begin transaction operator
672      */
673     public <T> Operator<T, T> beginTransactionOnNextOperator() {
674         return RxUtil.toOperator(new Func1<Observable<T>, Observable<T>>() {
675             @Override
676             public Observable<T> call(Observable<T> source) {
677                 return beginTransactionOnNext(Database.this, source);
678             }
679         });
680     }
681 
682     /**
683      * Commits the currently open transaction. Emits true.
684      * 
685      * @return
686      */
687     public <T> Operator<Boolean, T> commitOnNextOperator() {
688         return commitOrRollbackOnNextOperator(true);
689     }
690 
691     public <T> Operator<Boolean, Observable<T>> commitOnNextListOperator() {
692         return commitOrRollbackOnNextListOperator(true);
693     }
694 
695     public <T> Operator<Boolean, Observable<T>> rollbackOnNextListOperator() {
696         return commitOrRollbackOnNextListOperator(false);
697     }
698 
699     private <T> Operator<Boolean, Observable<T>> commitOrRollbackOnNextListOperator(
700             final boolean isCommit) {
701         return RxUtil.toOperator(new Func1<Observable<Observable<T>>, Observable<Boolean>>() {
702             @Override
703             public Observable<Boolean> call(Observable<Observable<T>> source) {
704                 return source.concatMap(new Func1<Observable<T>, Observable<Boolean>>() {
705                     @Override
706                     public Observable<Boolean> call(Observable<T> source) {
707                         if (isCommit)
708                             return commit(source);
709                         else
710                             return rollback(source);
711                     }
712                 });
713             }
714         });
715     }
716 
717     /**
718      * Rolls back the current transaction. Emits false.
719      * 
720      * @return
721      */
722     public Operator<Boolean, ?> rollbackOnNextOperator() {
723         return commitOrRollbackOnNextOperator(false);
724     }
725 
726     private <T> Operator<Boolean, T> commitOrRollbackOnNextOperator(final boolean isCommit) {
727         return RxUtil.toOperator(new Func1<Observable<T>, Observable<Boolean>>() {
728             @Override
729             public Observable<Boolean> call(Observable<T> source) {
730                 return commitOrRollbackOnNext(isCommit, Database.this, source);
731             }
732         });
733     }
734 
735     private static <T> Observable<Boolean> commitOrRollbackOnCompleteOperatorIfAtLeastOneValue(
736             final boolean isCommit, final Database db, Observable<T> source) {
737         CountingAction<T> counter = RxUtil.counter();
738         Observable<Boolean> commit = counter
739         // get count
740                 .count()
741                 // greater than zero or empty
742                 .filter(greaterThanZero())
743                 // commit if at least one value
744                 .lift(db.commitOrRollbackOperator(isCommit));
745         return Observable
746         // concatenate
747                 .concat(source
748                 // count emissions
749                         .doOnNext(counter)
750                         // ignore emissions
751                         .ignoreElements()
752                         // cast the empty sequence to type Boolean
753                         .cast(Boolean.class),
754                 // concat with commit
755                         commit);
756     }
757 
758     /**
759      * Emits true for commit and false for rollback.
760      * 
761      * @param isCommit
762      * @param db
763      * @param source
764      * @return
765      */
766     private static <T> Observable<Boolean> commitOrRollbackOnNext(final boolean isCommit,
767             final Database db, Observable<T> source) {
768         return source.concatMap(new Func1<T, Observable<Boolean>>() {
769             @Override
770             public Observable<Boolean> call(T t) {
771                 if (isCommit)
772                     return db.commit();
773                 else
774                     return db.rollback();
775             }
776         });
777     }
778 
779     private static <T> Observable<T> beginTransactionOnNext(final Database db, Observable<T> source) {
780         return source.concatMap(new Func1<T, Observable<T>>() {
781             @Override
782             public Observable<T> call(T t) {
783                 return db.beginTransaction().map(Functions.constant(t));
784             }
785         });
786     }
787 
788     /**
789      * Returns an {@link Observable} that is the result of running a sequence of
790      * update commands (insert/update/delete, ddl) read from the given
791      * {@link Observable} sequence.
792      * 
793      * @param commands
794      * @return
795      */
796     public Observable<Integer> run(Observable<String> commands) {
797         return commands.reduce(Observable.<Integer> empty(),
798                 new Func2<Observable<Integer>, String, Observable<Integer>>() {
799                     @Override
800                     public Observable<Integer> call(Observable<Integer> dep, String command) {
801                         return update(command).dependsOn(dep).count();
802                     }
803                 }).lift(RxUtil.<Integer> flatten());
804     }
805 
806     /**
807      * Returns an {@link Operator} version of {@link #run(Observable)}.
808      * 
809      * @return
810      */
811     public Operator<Integer, String> run() {
812         return RxUtil.toOperator(new Func1<Observable<String>, Observable<Integer>>() {
813             @Override
814             public Observable<Integer> call(Observable<String> commands) {
815                 return run(commands);
816             }
817         });
818     }
819 
820     /**
821      * Returns an {@link Observable} that is the result of running a sequence of
822      * update commands (insert/update/delete, ddl) commands read from an
823      * InputStream using the given delimiter as the statement delimiter (for
824      * example semicolon).
825      * 
826      * @param is
827      * @param delimiter
828      * @return
829      */
830     public Observable<Integer> run(InputStream is, String delimiter) {
831         return run(is, Charset.defaultCharset(), delimiter);
832     }
833 
834     /**
835      * Returns an {@link Observable} that is the result of running a sequence of
836      * update commands (insert/update/delete, ddl) commands read from an
837      * {@link InputStream} with the given {@link Charset} using the given
838      * delimiter as the statement delimiter (for example semicolon).
839      * 
840      * @param is
841      * @param delimiter
842      * @return
843      */
844     public Observable<Integer> run(InputStream is, Charset charset, String delimiter) {
845         return StringObservable.split(StringObservable.from(new InputStreamReader(is, charset)),
846                 ";").lift(run());
847     }
848 
849     /**
850      * Returns a Database based on the current Database except all
851      * non-transactional queries run {@link Schedulers#io}.
852      * 
853      * @return new Database instance
854      */
855     public Database asynchronous() {
856         return asynchronous(Schedulers.io());
857     }
858 
859     /**
860      * Returns a Database based on the current Database except all
861      * non-transactional queries run on the given scheduler.
862      * 
863      * @return new Database instance
864      */
865     public Database asynchronous(final Scheduler nonTransactionalScheduler) {
866         return asynchronous(new Func0<Scheduler>() {
867             @Override
868             public Scheduler call() {
869                 return nonTransactionalScheduler;
870             }
871         });
872     }
873 
874     /**
875      * Returns a Database based on the current Database except all
876      * non-transactional queries run on the scheduler provided by the given
877      * factory.
878      * 
879      * @return new Database instance
880      */
881     public Database asynchronous(final Func0<Scheduler> nonTransactionalSchedulerFactory) {
882         return new Database(cp, nonTransactionalSchedulerFactory);
883     }
884 
885     /**
886      * Sentinel object used to indicate in parameters of a query that rather
887      * than calling {@link PreparedStatement#setObject(int, Object)} with a null
888      * we call {@link PreparedStatement#setNull(int, int)} with
889      * {@link Types#CLOB}. This is required by many databases for setting CLOB
890      * and BLOB fields to null.
891      */
892     public static final Object NULL_CLOB = new Object();
893 
894     public static final Object NULL_NUMBER = new Object();
895 
896     public static Object toSentinelIfNull(String s) {
897         if (s == null)
898             return NULL_CLOB;
899         else
900             return s;
901     }
902 
903     /**
904      * Sentinel object used to indicate in parameters of a query that rather
905      * than calling {@link PreparedStatement#setObject(int, Object)} with a null
906      * we call {@link PreparedStatement#setNull(int, int)} with
907      * {@link Types#CLOB}. This is required by many databases for setting CLOB
908      * and BLOB fields to null.
909      */
910     public static final Object NULL_BLOB = new Object();
911 
912     public static Object toSentinelIfNull(byte[] bytes) {
913         if (bytes == null)
914             return NULL_BLOB;
915         else
916             return bytes;
917     }
918 
919 }