1 package com.github.davidmoten.rx.jdbc;
2
3 import static com.github.davidmoten.rx.RxUtil.greaterThanZero;
4
5 import java.io.InputStream;
6 import java.io.InputStreamReader;
7 import java.nio.charset.Charset;
8 import java.sql.Connection;
9 import java.sql.PreparedStatement;
10 import java.sql.Types;
11
12 import javax.naming.Context;
13 import javax.sql.DataSource;
14
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18 import rx.Observable;
19 import rx.Observable.Operator;
20 import rx.Scheduler;
21 import rx.functions.Func0;
22 import rx.functions.Func1;
23 import rx.functions.Func2;
24 import rx.observables.StringObservable;
25 import rx.schedulers.Schedulers;
26
27 import com.github.davidmoten.rx.Functions;
28 import com.github.davidmoten.rx.RxUtil;
29 import com.github.davidmoten.rx.RxUtil.CountingAction;
30 import com.github.davidmoten.rx.jdbc.exceptions.TransactionAlreadyOpenException;
31
32
33
34
35
36 final public class Database {
37
38
39
40
41 private static final Logger log = LoggerFactory.getLogger(Database.class);
42
43
44
45
46
47 private final QueryContext context;
48
49
50
51
52
53 private final ThreadLocal<Func0<Scheduler>> currentSchedulerFactory = new ThreadLocal<Func0<Scheduler>>();
54
55
56
57
58
59 private final ThreadLocal<ConnectionProvider> currentConnectionProvider = new ThreadLocal<ConnectionProvider>();
60
61 private final ThreadLocal<Boolean> isTransactionOpen = new ThreadLocal<Boolean>();
62
63 static final ThreadLocal<ResultSetCache> rsCache = new ThreadLocal<ResultSetCache>();
64
65 static final ThreadLocal<AutoMapCache> autoMapCache = new ThreadLocal<AutoMapCache>();
66
67
68
69
70
71 private final ThreadLocal<Observable<Boolean>> lastTransactionResult = new ThreadLocal<Observable<Boolean>>();
72
73
74
75
76 private final ConnectionProvider cp;
77
78
79
80
81 private final Func0<Scheduler> nonTransactionalSchedulerFactory;
82
83
84
85
86
87
88
89
90
91 public Database(final ConnectionProvider cp, Func0<Scheduler> nonTransactionalSchedulerFactory) {
92 Conditions.checkNotNull(cp);
93 this.cp = cp;
94 currentConnectionProvider.set(cp);
95 if (nonTransactionalSchedulerFactory != null)
96 this.nonTransactionalSchedulerFactory = nonTransactionalSchedulerFactory;
97 else
98 this.nonTransactionalSchedulerFactory = CURRENT_THREAD_SCHEDULER_FACTORY;
99 this.context = new QueryContext(this);
100 }
101
102
103
104
105
106
107 public ConnectionProvider getConnectionProvider() {
108 return cp;
109 }
110
111
112
113
114 private static final Func0<Scheduler> CURRENT_THREAD_SCHEDULER_FACTORY = new Func0<Scheduler>() {
115
116 @Override
117 public Scheduler call() {
118 return Schedulers.trampoline();
119 }
120 };
121
122
123
124
125
126
127
128
129
130
131
132 public Database(ConnectionProvider cp) {
133 this(cp, null);
134 }
135
136
137
138
139
140
141
142
143
144
145
146
147 public Database(String url, String username, String password) {
148 this(new ConnectionProviderFromUrl(url, username, password));
149 }
150
151
152
153
154
155
156
157
158
159
160 public Database(Connection con) {
161 this(new ConnectionProviderNonClosing(con), CURRENT_THREAD_SCHEDULER_FACTORY);
162 }
163
164
165
166
167
168
169
170
171 public static Database from(String url) {
172 return new Database(url, null, null);
173 }
174
175
176
177
178
179
180
181
182
183
184
185
186 public static Database from(String url, String username, String password) {
187 return new Database(url, username, password);
188 }
189
190
191
192
193
194
195
196
197 public static Database fromContext(String jndiResource) {
198 return new Database(new ConnectionProviderFromContext(jndiResource));
199 }
200
201
202
203
204
205
206
207
208 public static Database fromDataSource(DataSource dataSource) {
209 return new Database(new ConnectionProviderFromDataSource(dataSource));
210 }
211
212
213
214
215
216
217
218
219
220 public static Database from(ConnectionProvider cp) {
221 return new Database(cp);
222 }
223
224
225
226
227
228
229
230
231
232
233 public static Database from(Connection con) {
234 return new Database(con);
235 }
236
237
238
239
240
241
242 public static Builder builder() {
243 return new Builder();
244 }
245
246
247
248
249 public final static class Builder {
250
251 private ConnectionProvider cp;
252 private Func0<Scheduler> nonTransactionalSchedulerFactory = null;
253 private Pool pool = null;
254 private String url;
255 private String username;
256 private String password;
257
258 private static class Pool {
259 int minSize;
260 int maxSize;
261
262 Pool(int minSize, int maxSize) {
263 super();
264 this.minSize = minSize;
265 this.maxSize = maxSize;
266 }
267 }
268
269
270
271
272 private Builder() {
273 }
274
275
276
277
278
279
280
281 public Builder connectionProvider(ConnectionProvider cp) {
282 this.cp = cp;
283 return this;
284 }
285
286
287
288
289
290
291
292 public Builder url(String url) {
293 this.url = url;
294 return this;
295 }
296
297 public Builder username(String username) {
298 this.username = username;
299 return this;
300 }
301
302 public Builder password(String password) {
303 this.password = password;
304 return this;
305 }
306
307
308
309
310
311
312
313
314
315
316 public Builder pool(int minPoolSize, int maxPoolSize) {
317 pool = new Pool(minPoolSize, maxPoolSize);
318 return this;
319 }
320
321
322
323
324
325
326
327
328 public Builder pooled(String url) {
329 this.cp = new ConnectionProviderPooled(url, 0, 10);
330 return this;
331 }
332
333
334
335
336
337
338
339 public Builder nonTransactionalScheduler(Func0<Scheduler> factory) {
340 nonTransactionalSchedulerFactory = factory;
341 return this;
342 }
343
344
345
346
347
348
349
350 public Builder nonTransactionalSchedulerOnCurrentThread() {
351 nonTransactionalSchedulerFactory = CURRENT_THREAD_SCHEDULER_FACTORY;
352 return this;
353 }
354
355
356
357
358
359
360 public Database build() {
361 if (url != null && pool != null)
362 cp = new ConnectionProviderPooled(url, username, password, pool.minSize,
363 pool.maxSize);
364 else if (url != null)
365 cp = new ConnectionProviderFromUrl(url, username, password);
366 return new Database(cp, nonTransactionalSchedulerFactory);
367 }
368 }
369
370
371
372
373
374
375
376
377 public QueryContext queryContext() {
378 return context;
379 }
380
381
382
383
384
385
386
387
388
389 public QuerySelect.Builder select(String sql) {
390 return new QuerySelect.Builder(sql, this);
391 }
392
393
394
395
396
397
398
399 public QuerySelect.Builder select() {
400 return new QuerySelect.Builder(null, this);
401 }
402
403
404
405
406
407
408
409
410
411 public QueryUpdate.Builder update(String sql) {
412 return new QueryUpdate.Builder(sql, this);
413 }
414
415
416
417
418
419
420
421
422
423 public Observable<Boolean> beginTransaction(Observable<?> dependency) {
424 return update("begin").dependsOn(dependency).count().map(Functions.constant(true));
425 }
426
427
428
429
430
431
432
433
434 public Observable<Boolean> beginTransaction() {
435 return beginTransaction(Observable.empty());
436 }
437
438
439
440
441 private static final Func1<Integer, Boolean> IS_NON_ZERO = new Func1<Integer, Boolean>() {
442 @Override
443 public Boolean call(Integer i) {
444 return i != 0;
445 }
446 };
447
448
449
450
451
452
453
454
455
456
457 public Observable<Boolean> commit(Observable<?>... depends) {
458 return commitOrRollback(true, depends);
459 }
460
461
462
463
464
465
466
467 public <T> Operator<Boolean, T> commitOperator() {
468 return commitOrRollbackOperator(true);
469 }
470
471
472
473
474
475
476
477 public <T> Operator<Boolean, T> rollbackOperator() {
478 return commitOrRollbackOperator(false);
479 }
480
481 private <T> Operator<Boolean, T> commitOrRollbackOperator(final boolean commit) {
482 final QueryUpdate.Builder updateBuilder = createCommitOrRollbackQuery(commit);
483 return RxUtil.toOperator(new Func1<Observable<T>, Observable<Boolean>>() {
484 @Override
485 public Observable<Boolean> call(Observable<T> source) {
486 return updateBuilder.dependsOn(source).count().map(IS_NON_ZERO);
487 }
488 });
489 }
490
491
492
493
494
495
496
497
498
499
500
501 private Observable<Boolean> commitOrRollback(boolean commit, Observable<?>... depends) {
502
503 QueryUpdate.Builder u = createCommitOrRollbackQuery(commit);
504 for (Observable<?> dep : depends)
505 u = u.dependsOn(dep);
506 Observable<Boolean> result = u.count().map(IS_NON_ZERO);
507 lastTransactionResult.set(result);
508 return result;
509 }
510
511 private QueryUpdate.Builder createCommitOrRollbackQuery(boolean commit) {
512 String action;
513 if (commit)
514 action = "commit";
515 else
516 action = "rollback";
517 QueryUpdate.Builder u = update(action);
518 return u;
519 }
520
521
522
523
524
525
526
527
528
529
530
531 public Observable<Boolean> rollback(Observable<?>... depends) {
532 return commitOrRollback(false, depends);
533 }
534
535
536
537
538
539
540
541 public Observable<Boolean> lastTransactionResult() {
542 Observable<Boolean> o = lastTransactionResult.get();
543 if (o == null)
544 return Observable.empty();
545 else
546 return o;
547 }
548
549
550
551
552
553
554
555
556 public Database close() {
557 log.debug("closing connection provider");
558 cp.close();
559 log.debug("closed connection provider");
560 return this;
561 }
562
563
564
565
566
567
568 Scheduler currentScheduler() {
569 if (currentSchedulerFactory.get() == null)
570 return nonTransactionalSchedulerFactory.call();
571 else
572 return currentSchedulerFactory.get().call();
573 }
574
575
576
577
578
579
580 ConnectionProvider connectionProvider() {
581 if (currentConnectionProvider.get() == null)
582 return cp;
583 else
584 return currentConnectionProvider.get();
585 }
586
587
588
589
590
591 void beginTransactionObserve() {
592 log.debug("beginTransactionObserve");
593 currentConnectionProvider.set(new ConnectionProviderSingletonManualCommit(cp));
594 if (isTransactionOpen.get() != null && isTransactionOpen.get())
595 throw new TransactionAlreadyOpenException();
596 isTransactionOpen.set(true);
597 }
598
599
600
601
602
603 void beginTransactionSubscribe() {
604 log.debug("beginTransactionSubscribe");
605 currentSchedulerFactory.set(CURRENT_THREAD_SCHEDULER_FACTORY);
606 }
607
608
609
610
611 void endTransactionSubscribe() {
612 log.debug("endTransactionSubscribe");
613 currentSchedulerFactory.set(null);
614 rsCache.set(null);
615 }
616
617
618
619
620 void endTransactionObserve() {
621 log.debug("endTransactionObserve");
622 currentConnectionProvider.set(cp);
623 isTransactionOpen.set(false);
624 rsCache.set(null);
625 }
626
627
628
629
630
631
632
633
634 private <T> Operator<Boolean, T> commitOrRollbackOnCompleteOperator(final boolean isCommit) {
635 return RxUtil.toOperator(new Func1<Observable<T>, Observable<Boolean>>() {
636 @Override
637 public Observable<Boolean> call(Observable<T> source) {
638 return commitOrRollbackOnCompleteOperatorIfAtLeastOneValue(isCommit, Database.this,
639 source);
640 }
641 });
642 }
643
644
645
646
647
648
649
650 public <T> Operator<Boolean, T> commitOnCompleteOperator() {
651 return commitOrRollbackOnCompleteOperator(true);
652 }
653
654
655
656
657
658
659
660
661 public <T> Operator<Boolean, T> rollbackOnCompleteOperator() {
662 return commitOrRollbackOnCompleteOperator(false);
663 }
664
665
666
667
668
669
670
671
672
673 public <T> Operator<T, T> beginTransactionOnNextOperator() {
674 return RxUtil.toOperator(new Func1<Observable<T>, Observable<T>>() {
675 @Override
676 public Observable<T> call(Observable<T> source) {
677 return beginTransactionOnNext(Database.this, source);
678 }
679 });
680 }
681
682
683
684
685
686
687 public <T> Operator<Boolean, T> commitOnNextOperator() {
688 return commitOrRollbackOnNextOperator(true);
689 }
690
691 public <T> Operator<Boolean, Observable<T>> commitOnNextListOperator() {
692 return commitOrRollbackOnNextListOperator(true);
693 }
694
695 public <T> Operator<Boolean, Observable<T>> rollbackOnNextListOperator() {
696 return commitOrRollbackOnNextListOperator(false);
697 }
698
699 private <T> Operator<Boolean, Observable<T>> commitOrRollbackOnNextListOperator(
700 final boolean isCommit) {
701 return RxUtil.toOperator(new Func1<Observable<Observable<T>>, Observable<Boolean>>() {
702 @Override
703 public Observable<Boolean> call(Observable<Observable<T>> source) {
704 return source.concatMap(new Func1<Observable<T>, Observable<Boolean>>() {
705 @Override
706 public Observable<Boolean> call(Observable<T> source) {
707 if (isCommit)
708 return commit(source);
709 else
710 return rollback(source);
711 }
712 });
713 }
714 });
715 }
716
717
718
719
720
721
722 public Operator<Boolean, ?> rollbackOnNextOperator() {
723 return commitOrRollbackOnNextOperator(false);
724 }
725
726 private <T> Operator<Boolean, T> commitOrRollbackOnNextOperator(final boolean isCommit) {
727 return RxUtil.toOperator(new Func1<Observable<T>, Observable<Boolean>>() {
728 @Override
729 public Observable<Boolean> call(Observable<T> source) {
730 return commitOrRollbackOnNext(isCommit, Database.this, source);
731 }
732 });
733 }
734
735 private static <T> Observable<Boolean> commitOrRollbackOnCompleteOperatorIfAtLeastOneValue(
736 final boolean isCommit, final Database db, Observable<T> source) {
737 CountingAction<T> counter = RxUtil.counter();
738 Observable<Boolean> commit = counter
739
740 .count()
741
742 .filter(greaterThanZero())
743
744 .lift(db.commitOrRollbackOperator(isCommit));
745 return Observable
746
747 .concat(source
748
749 .doOnNext(counter)
750
751 .ignoreElements()
752
753 .cast(Boolean.class),
754
755 commit);
756 }
757
758
759
760
761
762
763
764
765
766 private static <T> Observable<Boolean> commitOrRollbackOnNext(final boolean isCommit,
767 final Database db, Observable<T> source) {
768 return source.concatMap(new Func1<T, Observable<Boolean>>() {
769 @Override
770 public Observable<Boolean> call(T t) {
771 if (isCommit)
772 return db.commit();
773 else
774 return db.rollback();
775 }
776 });
777 }
778
779 private static <T> Observable<T> beginTransactionOnNext(final Database db, Observable<T> source) {
780 return source.concatMap(new Func1<T, Observable<T>>() {
781 @Override
782 public Observable<T> call(T t) {
783 return db.beginTransaction().map(Functions.constant(t));
784 }
785 });
786 }
787
788
789
790
791
792
793
794
795
796 public Observable<Integer> run(Observable<String> commands) {
797 return commands.reduce(Observable.<Integer> empty(),
798 new Func2<Observable<Integer>, String, Observable<Integer>>() {
799 @Override
800 public Observable<Integer> call(Observable<Integer> dep, String command) {
801 return update(command).dependsOn(dep).count();
802 }
803 }).lift(RxUtil.<Integer> flatten());
804 }
805
806
807
808
809
810
811 public Operator<Integer, String> run() {
812 return RxUtil.toOperator(new Func1<Observable<String>, Observable<Integer>>() {
813 @Override
814 public Observable<Integer> call(Observable<String> commands) {
815 return run(commands);
816 }
817 });
818 }
819
820
821
822
823
824
825
826
827
828
829
830 public Observable<Integer> run(InputStream is, String delimiter) {
831 return run(is, Charset.defaultCharset(), delimiter);
832 }
833
834
835
836
837
838
839
840
841
842
843
844 public Observable<Integer> run(InputStream is, Charset charset, String delimiter) {
845 return StringObservable.split(StringObservable.from(new InputStreamReader(is, charset)),
846 ";").lift(run());
847 }
848
849
850
851
852
853
854
855 public Database asynchronous() {
856 return asynchronous(Schedulers.io());
857 }
858
859
860
861
862
863
864
865 public Database asynchronous(final Scheduler nonTransactionalScheduler) {
866 return asynchronous(new Func0<Scheduler>() {
867 @Override
868 public Scheduler call() {
869 return nonTransactionalScheduler;
870 }
871 });
872 }
873
874
875
876
877
878
879
880
881 public Database asynchronous(final Func0<Scheduler> nonTransactionalSchedulerFactory) {
882 return new Database(cp, nonTransactionalSchedulerFactory);
883 }
884
885
886
887
888
889
890
891
892 public static final Object NULL_CLOB = new Object();
893
894 public static final Object NULL_NUMBER = new Object();
895
896 public static Object toSentinelIfNull(String s) {
897 if (s == null)
898 return NULL_CLOB;
899 else
900 return s;
901 }
902
903
904
905
906
907
908
909
910 public static final Object NULL_BLOB = new Object();
911
912 public static Object toSentinelIfNull(byte[] bytes) {
913 if (bytes == null)
914 return NULL_BLOB;
915 else
916 return bytes;
917 }
918
919 }