View Javadoc
1   package org.davidmoten.rx.jdbc;
2   
3   import static org.junit.Assert.assertEquals;
4   import static org.junit.Assert.assertFalse;
5   import static org.junit.Assert.assertNotEquals;
6   import static org.junit.Assert.assertTrue;
7   
8   import java.io.ByteArrayInputStream;
9   import java.io.ByteArrayOutputStream;
10  import java.io.File;
11  import java.io.FileNotFoundException;
12  import java.io.FileReader;
13  import java.io.IOException;
14  import java.io.InputStream;
15  import java.io.Reader;
16  import java.sql.Blob;
17  import java.sql.CallableStatement;
18  import java.sql.Clob;
19  import java.sql.Connection;
20  import java.sql.PreparedStatement;
21  import java.sql.ResultSet;
22  import java.sql.SQLException;
23  import java.sql.SQLSyntaxErrorException;
24  import java.sql.Statement;
25  import java.sql.Time;
26  import java.sql.Timestamp;
27  import java.sql.Types;
28  import java.time.Instant;
29  import java.time.ZoneOffset;
30  import java.time.ZonedDateTime;
31  import java.util.Arrays;
32  import java.util.Calendar;
33  import java.util.Date;
34  import java.util.GregorianCalendar;
35  import java.util.HashSet;
36  import java.util.List;
37  import java.util.Optional;
38  import java.util.Set;
39  import java.util.concurrent.CopyOnWriteArrayList;
40  import java.util.concurrent.CountDownLatch;
41  import java.util.concurrent.Executors;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.TimeoutException;
44  import java.util.concurrent.atomic.AtomicBoolean;
45  import java.util.concurrent.atomic.AtomicInteger;
46  
47  import org.apache.log4j.Level;
48  import org.apache.log4j.LogManager;
49  import org.davidmoten.rx.jdbc.annotations.Column;
50  import org.davidmoten.rx.jdbc.annotations.Index;
51  import org.davidmoten.rx.jdbc.annotations.Query;
52  import org.davidmoten.rx.jdbc.exceptions.AnnotationsNotFoundException;
53  import org.davidmoten.rx.jdbc.exceptions.AutomappedInterfaceInaccessibleException;
54  import org.davidmoten.rx.jdbc.exceptions.CannotForkTransactedConnection;
55  import org.davidmoten.rx.jdbc.exceptions.ColumnIndexOutOfRangeException;
56  import org.davidmoten.rx.jdbc.exceptions.ColumnNotFoundException;
57  import org.davidmoten.rx.jdbc.exceptions.MoreColumnsRequestedThanExistException;
58  import org.davidmoten.rx.jdbc.exceptions.NamedParameterFoundButSqlDoesNotHaveNamesException;
59  import org.davidmoten.rx.jdbc.exceptions.NamedParameterMissingException;
60  import org.davidmoten.rx.jdbc.exceptions.QueryAnnotationMissingException;
61  import org.davidmoten.rx.jdbc.exceptions.SQLRuntimeException;
62  import org.davidmoten.rx.jdbc.internal.DelegatedConnection;
63  import org.davidmoten.rx.jdbc.pool.DatabaseCreator;
64  import org.davidmoten.rx.jdbc.pool.DatabaseType;
65  import org.davidmoten.rx.jdbc.pool.NonBlockingConnectionPool;
66  import org.davidmoten.rx.jdbc.pool.Pools;
67  import org.davidmoten.rx.jdbc.tuple.Tuple2;
68  import org.davidmoten.rx.jdbc.tuple.Tuple3;
69  import org.davidmoten.rx.jdbc.tuple.Tuple4;
70  import org.davidmoten.rx.jdbc.tuple.Tuple5;
71  import org.davidmoten.rx.jdbc.tuple.Tuple6;
72  import org.davidmoten.rx.jdbc.tuple.Tuple7;
73  import org.davidmoten.rx.jdbc.tuple.TupleN;
74  import org.davidmoten.rx.pool.PoolClosedException;
75  import org.h2.jdbc.JdbcSQLException;
76  import org.hsqldb.jdbc.JDBCBlobFile;
77  import org.hsqldb.jdbc.JDBCClobFile;
78  import org.junit.Assert;
79  import org.junit.FixMethodOrder;
80  import org.junit.Ignore;
81  import org.junit.Test;
82  import org.junit.runners.MethodSorters;
83  import org.slf4j.Logger;
84  import org.slf4j.LoggerFactory;
85  
86  import com.github.davidmoten.guavamini.Lists;
87  import com.github.davidmoten.guavamini.Sets;
88  
89  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
90  import io.reactivex.Completable;
91  import io.reactivex.Flowable;
92  import io.reactivex.Observable;
93  import io.reactivex.Scheduler;
94  import io.reactivex.Single;
95  import io.reactivex.exceptions.UndeliverableException;
96  import io.reactivex.functions.Predicate;
97  import io.reactivex.plugins.RxJavaPlugins;
98  import io.reactivex.schedulers.Schedulers;
99  import io.reactivex.schedulers.TestScheduler;
100 import io.reactivex.subscribers.TestSubscriber;
101 
102 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
103 public class DatabaseTest {
104 
105     private static final long FRED_REGISTERED_TIME = 1442515672690L;
106     private static final int NAMES_COUNT_BIG = 5163;
107     private static final Logger log = LoggerFactory.getLogger(DatabaseTest.class);
108     private static final int TIMEOUT_SECONDS = 10;
109 
110     private static Database db() {
111         return DatabaseCreator.create(1);
112     }
113 
114     private static Database blocking() {
115         return DatabaseCreator.createBlocking();
116     }
117 
118     private static Database db(int poolSize) {
119         return DatabaseCreator.create(poolSize);
120     }
121 
122     private static Database big(int poolSize) {
123         return DatabaseCreator.create(poolSize, true, Schedulers.computation());
124     }
125 
126     @Test
127     public void testSelectUsingQuestionMark() {
128         try (Database db = db()) {
129             db.select("select score from person where name=?") //
130                     .parameters("FRED", "JOSEPH") //
131                     .getAs(Integer.class) //
132                     .test() //
133                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
134                     .assertNoErrors() //
135                     .assertValues(21, 34) //
136                     .assertComplete();
137         }
138     }
139 
140     @Test
141     public void testDemonstrateDbClosureOnTerminate() {
142         Database db = db();
143         db.select("select score from person where name=?") //
144                 .parameters("FRED", "JOSEPH") //
145                 .getAs(Integer.class) //
146                 .doOnTerminate(() -> db.close()) //
147                 .test() //
148                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
149                 .assertNoErrors() //
150                 .assertValues(21, 34) //
151                 .assertComplete();
152     }
153 
154     @Test
155     public void testFromDataSource() {
156         ConnectionProvider cp = DatabaseCreator.connectionProvider();
157         AtomicInteger count = new AtomicInteger();
158         Set<Connection> connections = new HashSet<>();
159         AtomicInteger closed = new AtomicInteger();
160         Database db = Database.fromBlocking(new ConnectionProvider() {
161 
162             @Override
163             public Connection get() {
164                 count.incrementAndGet();
165                 Connection c = cp.get();
166                 connections.add(c);
167                 return new DelegatedConnection() {
168 
169                     @Override
170                     public Connection con() {
171                         return c;
172                     }
173 
174                     @Override
175                     public void close() {
176                         closed.incrementAndGet();
177                     }
178                 };
179             }
180 
181             @Override
182             public void close() {
183                 // do nothing
184             }
185         });
186         db.select("select count(*) from person") //
187                 .getAs(Integer.class) //
188                 .blockingSubscribe();
189         db.select("select count(*) from person") //
190                 .getAs(Integer.class) //
191                 .blockingSubscribe();
192         assertEquals(2, count.get());
193         assertEquals(2, connections.size());
194         assertEquals(2, closed.get());
195     }
196 
197     @Test
198     public void testBlockingForEachWhenError() {
199         try {
200             Flowable //
201                     .error(new RuntimeException("boo")) //
202                     .blockingForEach(System.out::println);
203         } catch (RuntimeException e) {
204             assertEquals("boo", e.getMessage());
205         }
206     }
207 
208     @Test
209     public void testSelectUsingQuestionMarkAndInClauseIssue10() {
210         Database.test() //
211                 .select("select score from person where name in (?) order by score") //
212                 .parameters("FRED", "JOSEPH") //
213                 .getAs(Integer.class) //
214                 .test() //
215                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
216                 .assertNoErrors() //
217                 .assertValues(21, 34) //
218                 .assertComplete();
219     }
220 
221     @Test
222     public void testSelectUsingQuestionMarkAndInClauseWithSetParameter() {
223         Database.test() //
224                 .select("select score from person where name in (?) order by score") //
225                 .parameter(Sets.newHashSet("FRED", "JOSEPH")) //
226                 .getAs(Integer.class) //
227                 .test() //
228                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
229                 .assertNoErrors() //
230                 .assertValues(21, 34) //
231                 .assertComplete();
232     }
233 
234     @Test
235     public void testUpdateWithInClauseBatchSize0() {
236         Database.test() //
237                 .update("update person set score=50 where name in (?)") //
238                 .batchSize(0) //
239                 .parameter(Sets.newHashSet("FRED", "JOSEPH")) //
240                 .counts() //
241                 .test() //
242                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.DAYS) //
243                 .assertComplete() //
244                 .assertValues(2);
245     }
246 
247     @Test
248     public void testUpdateWithInClauseBatchSize10() {
249         Database.test() //
250                 .update("update person set score=50 where name in (?)") //
251                 .batchSize(10) //
252                 .parameter(Sets.newHashSet("FRED", "JOSEPH")) //
253                 .counts() //
254                 .test() //
255                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.DAYS) //
256                 .assertComplete() //
257                 .assertValues(2);
258     }
259 
260     @Test
261     public void testSelectUsingQuestionMarkAndInClauseWithSetParameterUsingParametersMethod() {
262         Database.test() //
263                 .select("select score from person where name in (?) order by score") //
264                 .parameters(Sets.newHashSet("FRED", "JOSEPH")) //
265                 .getAs(Integer.class) //
266                 .test() //
267                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
268                 .assertNoErrors() //
269                 .assertValues(21, 34) //
270                 .assertComplete();
271     }
272 
273     @Test
274     public void testSelectUsingInClauseWithListParameter() {
275         Database.test() //
276                 .select("select score from person where score > ? and name in (?) order by score") //
277                 .parameters(0, Lists.newArrayList("FRED", "JOSEPH")) //
278                 .getAs(Integer.class) //
279                 .test() //
280                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
281                 .assertNoErrors() //
282                 .assertValues(21, 34) //
283                 .assertComplete();
284     }
285 
286     @Test
287     public void testSelectUsingInClauseWithNamedListParameter() {
288         Database.test() //
289                 .select("select score from person where score > :score and name in (:names) order by score") //
290                 .parameter("score", 0) //
291                 .parameter("names", Lists.newArrayList("FRED", "JOSEPH")) //
292                 .getAs(Integer.class) //
293                 .test() //
294                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
295                 .assertNoErrors() //
296                 .assertValues(21, 34) //
297                 .assertComplete();
298     }
299 
300     @Test
301     public void testSelectUsingInClauseWithRepeatedNamedListParameter() {
302         Database.test() //
303                 .select("select score from person where name in (:names) and name in (:names) order by score") //
304                 .parameter("names", Lists.newArrayList("FRED", "JOSEPH")) //
305                 .getAs(Integer.class) //
306                 .test() //
307                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
308                 .assertNoErrors() //
309                 .assertValues(21, 34) //
310                 .assertComplete();
311     }
312 
313     @Test
314     public void testSelectUsingInClauseWithRepeatedNamedListParameterAndRepeatedNonListParameter() {
315         Database.test() //
316                 .select("select score from person where name in (:names) and score >:score and name in (:names) and score >:score order by score") //
317                 .parameter("names", Lists.newArrayList("FRED", "JOSEPH")) //
318                 .parameter("score", 0) //
319                 .getAs(Integer.class) //
320                 .test() //
321                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
322                 .assertNoErrors() //
323                 .assertValues(21, 34) //
324                 .assertComplete();
325     }
326 
327     @Test
328     public void testSelectUsingNamedParameterList() {
329         try (Database db = db()) {
330             db.select("select score from person where name=:name") //
331                     .parameters(Parameter.named("name", "FRED").value("JOSEPH").list()) //
332                     .getAs(Integer.class) //
333                     .test() //
334                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
335                     .assertNoErrors() //
336                     .assertValues(21, 34) //
337                     .assertComplete();
338         }
339     }
340 
341     @Test
342     public void testSelectUsingQuestionMarkFlowableParameters() {
343         try (Database db = db()) {
344             db.select("select score from person where name=?") //
345                     .parameterStream(Flowable.just("FRED", "JOSEPH")) //
346                     .getAs(Integer.class) //
347                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
348                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
349                     .assertNoErrors() //
350                     .assertValues(21, 34) //
351                     .assertComplete();
352         }
353     }
354 
355     @Test
356     public void testSelectUsingQuestionMarkFlowableParametersInLists() {
357         try (Database db = db()) {
358             db.select("select score from person where name=?") //
359                     .parameterListStream(
360                             Flowable.just(Arrays.asList("FRED"), Arrays.asList("JOSEPH"))) //
361                     .getAs(Integer.class) //
362                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
363                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
364                     .assertNoErrors() //
365                     .assertValues(21, 34) //
366                     .assertComplete();
367         }
368     }
369 
370     @Test
371     public void testDrivingSelectWithoutParametersUsingParameterStream() {
372         try (Database db = db()) {
373             db.select("select count(*) from person") //
374                     .parameters(1, 2, 3) //
375                     .getAs(Integer.class) //
376                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
377                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
378                     .assertValues(3, 3, 3) //
379                     .assertComplete();
380         }
381     }
382 
383     @Test
384     public void testSelectUsingQuestionMarkFlowableParametersTwoParametersPerQuery() {
385         try (Database db = db()) {
386             db.select("select score from person where name=? and score = ?") //
387                     .parameterStream(Flowable.just("FRED", 21, "JOSEPH", 34)) //
388                     .getAs(Integer.class) //
389                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
390                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
391                     .assertNoErrors() //
392                     .assertValues(21, 34) //
393                     .assertComplete();
394         }
395     }
396 
397     @Test
398     public void testSelectUsingQuestionMarkFlowableParameterListsTwoParametersPerQuery() {
399         try (Database db = db()) {
400             db.select("select score from person where name=? and score = ?") //
401                     .parameterListStream(
402                             Flowable.just(Arrays.asList("FRED", 21), Arrays.asList("JOSEPH", 34))) //
403                     .getAs(Integer.class) //
404                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
405                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
406                     .assertNoErrors() //
407                     .assertValues(21, 34) //
408                     .assertComplete();
409         }
410     }
411 
412     @Test
413     public void testSelectUsingQuestionMarkWithPublicTestingDatabase() {
414         try (Database db = Database.test()) {
415             db //
416                     .select("select score from person where name=?") //
417                     .parameters("FRED", "JOSEPH") //
418                     .getAs(Integer.class) //
419                     .test() //
420                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
421                     .assertNoErrors() //
422                     .assertValues(21, 34) //
423                     .assertComplete();
424         }
425     }
426 
427     @Test
428     public void testSelectWithFetchSize() {
429         try (Database db = db()) {
430             db.select("select score from person order by name") //
431                     .fetchSize(2) //
432                     .getAs(Integer.class) //
433                     .test() //
434                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
435                     .assertNoErrors() //
436                     .assertValues(21, 34, 25) //
437                     .assertComplete();
438         }
439     }
440 
441     @Test
442     public void testSelectWithFetchSizeZero() {
443         try (Database db = db()) {
444             db.select("select score from person order by name") //
445                     .fetchSize(0) //
446                     .getAs(Integer.class) //
447                     .test() //
448                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
449                     .assertNoErrors() //
450                     .assertValues(21, 34, 25) //
451                     .assertComplete();
452         }
453     }
454 
455     @Test(expected = IllegalArgumentException.class)
456     public void testSelectWithFetchSizeNegative() {
457         try (Database db = db()) {
458             db.select("select score from person order by name") //
459                     .fetchSize(-1);
460         }
461     }
462 
463     @Test
464     public void testSelectUsingNonBlockingBuilder() {
465         NonBlockingConnectionPool pool = Pools //
466                 .nonBlocking() //
467                 .connectionProvider(DatabaseCreator.connectionProvider()) //
468                 .maxIdleTime(1, TimeUnit.MINUTES) //
469                 .idleTimeBeforeHealthCheck(1, TimeUnit.MINUTES) //
470                 .connectionRetryInterval(1, TimeUnit.SECONDS) //
471                 .healthCheck(c -> c.prepareStatement("select 1").execute()) //
472                 .maxPoolSize(3) //
473                 .build();
474 
475         try (Database db = Database.from(pool)) {
476             db.select("select score from person where name=?") //
477                     .parameters("FRED", "JOSEPH") //
478                     .getAs(Integer.class) //
479                     .test() //
480                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
481                     .assertNoErrors() //
482                     .assertValues(21, 34) //
483                     .assertComplete();
484         }
485     }
486 
487     @Test
488     public void testSelectSpecifyingHealthCheck() {
489         try (Database db = Database//
490                 .nonBlocking() //
491                 .connectionProvider(DatabaseCreator.connectionProvider()) //
492                 .maxIdleTime(1, TimeUnit.MINUTES) //
493                 .idleTimeBeforeHealthCheck(1, TimeUnit.MINUTES) //
494                 .connectionRetryInterval(1, TimeUnit.SECONDS) //
495                 .healthCheck(DatabaseType.H2) //
496                 .maxPoolSize(3) //
497                 .build()) {
498             db.select("select score from person where name=?") //
499                     .parameters("FRED", "JOSEPH") //
500                     .getAs(Integer.class) //
501                     .test() //
502                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
503                     .assertNoErrors() //
504                     .assertValues(21, 34) //
505                     .assertComplete();
506         }
507     }
508 
509     @Test(expected = IllegalArgumentException.class)
510     public void testCreateTestDatabaseWithZeroSizePoolThrows() {
511         Database.test(0);
512     }
513 
514     @Test
515     public void testSelectSpecifyingHealthCheckAsSql() {
516         final AtomicInteger success = new AtomicInteger();
517         NonBlockingConnectionPool pool = Pools //
518                 .nonBlocking() //
519                 .connectionProvider(DatabaseCreator.connectionProvider()) //
520                 .maxIdleTime(1, TimeUnit.MINUTES) //
521                 .healthCheck(DatabaseType.H2) //
522                 .idleTimeBeforeHealthCheck(1, TimeUnit.MINUTES) //
523                 .connectionRetryInterval(1, TimeUnit.SECONDS) //
524                 .healthCheck("select 1") //
525                 .connectionListener(error -> {
526                     if (error.isPresent()) {
527                         success.set(Integer.MIN_VALUE);
528                     } else {
529                         success.incrementAndGet();
530                     }
531                 }) //
532                 .maxPoolSize(3) //
533                 .build();
534 
535         try (Database db = Database.from(pool)) {
536             db.select("select score from person where name=?") //
537                     .parameters("FRED", "JOSEPH") //
538                     .getAs(Integer.class) //
539                     .test() //
540                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
541                     .assertNoErrors() //
542                     .assertValues(21, 34) //
543                     .assertComplete();
544         }
545         assertEquals(1, success.get());
546     }
547 
548     @Test(expected = IllegalArgumentException.class)
549     public void testNonBlockingPoolWithTrampolineSchedulerThrows() {
550         Pools.nonBlocking().scheduler(Schedulers.trampoline());
551     }
552 
553     @Test(timeout = 40000)
554     public void testSelectUsingNonBlockingBuilderConcurrencyTest()
555             throws InterruptedException, TimeoutException {
556         info();
557         try {
558             try (Database db = db(3)) {
559                 Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(50));
560                 int n = 10000;
561                 CountDownLatch latch = new CountDownLatch(n);
562                 AtomicInteger count = new AtomicInteger();
563                 for (int i = 0; i < n; i++) {
564                     db.select("select score from person where name=?") //
565                             .parameters("FRED", "JOSEPH") //
566                             .getAs(Integer.class) //
567                             .subscribeOn(scheduler) //
568                             .toList() //
569                             .doOnSuccess(x -> {
570                                 if (!x.equals(Lists.newArrayList(21, 34))) {
571                                     throw new RuntimeException("run broken");
572                                 } else {
573                                     // log.debug(iCopy + " succeeded");
574                                 }
575                             }) //
576                             .doOnSuccess(x -> {
577                                 count.incrementAndGet();
578                                 latch.countDown();
579                             }) //
580                             .doOnError(x -> latch.countDown()) //
581                             .subscribe();
582                 }
583                 if (!latch.await(20, TimeUnit.SECONDS)) {
584                     throw new TimeoutException("timeout");
585                 }
586                 assertEquals(n, count.get());
587             }
588         } finally {
589             debug();
590         }
591     }
592 
593     @Test(timeout = 5000)
594     public void testSelectConcurrencyTest() throws InterruptedException, TimeoutException {
595         debug();
596         try {
597             try (Database db = db(1)) {
598                 Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(2));
599                 int n = 2;
600                 CountDownLatch latch = new CountDownLatch(n);
601                 AtomicInteger count = new AtomicInteger();
602                 for (int i = 0; i < n; i++) {
603                     db.select("select score from person where name=?") //
604                             .parameters("FRED", "JOSEPH") //
605                             .getAs(Integer.class) //
606                             .subscribeOn(scheduler) //
607                             .toList() //
608                             .doOnSuccess(x -> {
609                                 if (!x.equals(Lists.newArrayList(21, 34))) {
610                                     throw new RuntimeException("run broken");
611                                 }
612                             }) //
613                             .doOnSuccess(x -> {
614                                 count.incrementAndGet();
615                                 latch.countDown();
616                             }) //
617                             .doOnError(x -> latch.countDown()) //
618                             .subscribe();
619                     log.info("submitted " + i);
620                 }
621                 if (!latch.await(5000, TimeUnit.SECONDS)) {
622                     throw new TimeoutException("timeout");
623                 }
624                 assertEquals(n, count.get());
625             }
626         } finally {
627             debug();
628         }
629     }
630 
631     @Test
632     public void testDatabaseClose() {
633         try (Database db = db()) {
634             db.select("select score from person where name=?") //
635                     .parameters("FRED", "JOSEPH") //
636                     .getAs(Integer.class) //
637                     .test() //
638                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
639                     .assertNoErrors() //
640                     .assertValues(21, 34) //
641                     .assertComplete();
642         }
643     }
644 
645     @Test
646     public void testSelectUsingName() {
647         try (Database db = db()) {
648             db //
649                     .select("select score from person where name=:name") //
650                     .parameter("name", "FRED") //
651                     .parameter("name", "JOSEPH") //
652                     .getAs(Integer.class) //
653                     .test() //
654                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
655                     .assertValues(21, 34) //
656                     .assertComplete();
657         }
658     }
659 
660     @Test
661     public void testSelectUsingNameNotGiven() {
662         try (Database db = db()) {
663             db //
664                     .select("select score from person where name=:name and name<>:name2") //
665                     .parameter("name", "FRED") //
666                     .parameter("name", "JOSEPH") //
667                     .getAs(Integer.class) //
668                     .test() //
669                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
670                     .assertError(NamedParameterMissingException.class) //
671                     .assertNoValues();
672         }
673     }
674 
675     @Test
676     public void testSelectUsingParameterNameNullNameWhenSqlHasNoNames() {
677         db() //
678                 .select("select score from person where name=?") //
679                 .parameter(Parameter.create("name", "FRED")) //
680                 .getAs(Integer.class) //
681                 .test() //
682                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
683                 .assertError(NamedParameterFoundButSqlDoesNotHaveNamesException.class) //
684                 .assertNoValues();
685     }
686 
687     @Test
688     public void testUpdateWithNullNamedParameter() {
689         try (Database db = db()) {
690             db //
691                     .update("update person set date_of_birth = :dob") //
692                     .parameter(Parameter.create("dob", null)) //
693                     .counts() //
694                     .test() //
695                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
696                     .assertValue(3) //
697                     .assertComplete();
698         }
699     }
700 
701     @Test
702     public void testUpdateWithNullParameter() {
703         try (Database db = db()) {
704             db //
705                     .update("update person set date_of_birth = ?") //
706                     .parameter(null) //
707                     .counts() //
708                     .test() //
709                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
710                     .assertValue(3) //
711                     .assertComplete();
712         }
713     }
714 
715     @Test
716     public void testUpdateWithNullStreamParameter() {
717         try (Database db = db()) {
718             db //
719                     .update("update person set date_of_birth = ?") //
720                     .parameterStream(Flowable.just(Parameter.NULL)) //
721                     .counts() //
722                     .test() //
723                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
724                     .assertValue(3) //
725                     .assertComplete();
726         }
727     }
728 
729     @Test
730     public void testUpdateWithTestDatabaseForReadme() {
731         try (Database db = db()) {
732             db //
733                     .update("update person set date_of_birth = ?") //
734                     .parameterStream(Flowable.just(Parameter.NULL)) //
735                     .counts() //
736                     .test() //
737                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
738                     .assertValue(3) //
739                     .assertComplete();
740         }
741     }
742 
743     @Test
744     public void testUpdateClobWithNull() {
745         try (Database db = db()) {
746             insertNullClob(db);
747             db //
748                     .update("update person_clob set document = :doc") //
749                     .parameter("doc", Database.NULL_CLOB) //
750                     .counts() //
751                     .test() //
752                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
753                     .assertValue(1) //
754                     .assertComplete();
755         }
756     }
757 
758     @Test
759     public void testUpdateClobWithClob() throws SQLException {
760         try (Database db = db()) {
761             Clob clob = new JDBCClobFile(new File("src/test/resources/big.txt"));
762             insertNullClob(db);
763             db //
764                     .update("update person_clob set document = :doc") //
765                     .parameter("doc", clob) //
766                     .counts() //
767                     .test() //
768                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
769                     .assertValue(1) //
770                     .assertComplete();
771         }
772     }
773 
774     @Test
775     public void testUpdateClobWithReader() throws FileNotFoundException {
776         try (Database db = db()) {
777             Reader reader = new FileReader(new File("src/test/resources/big.txt"));
778             insertNullClob(db);
779             db //
780                     .update("update person_clob set document = :doc") //
781                     .parameter("doc", reader) //
782                     .counts() //
783                     .test() //
784                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
785                     .assertValue(1) //
786                     .assertComplete();
787         }
788     }
789 
790     @Test
791     public void testUpdateBlobWithBlob() throws SQLException {
792         try (Database db = db()) {
793             Blob blob = new JDBCBlobFile(new File("src/test/resources/big.txt"));
794             insertPersonBlob(db);
795             db //
796                     .update("update person_blob set document = :doc") //
797                     .parameter("doc", blob) //
798                     .counts() //
799                     .test() //
800                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
801                     .assertValue(1) //
802                     .assertComplete();
803         }
804     }
805 
806     @Test
807     public void testUpdateBlobWithNull() {
808         try (Database db = db()) {
809             insertPersonBlob(db);
810             db //
811                     .update("update person_blob set document = :doc") //
812                     .parameter("doc", Database.NULL_BLOB) //
813                     .counts() //
814                     .test() //
815                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
816                     .assertValue(1) //
817                     .assertComplete();
818         }
819     }
820 
821     @Test(expected = NullPointerException.class)
822     public void testSelectUsingNullNameInParameter() {
823         try (Database db = db()) {
824             db //
825                     .select("select score from person where name=:name") //
826                     .parameter(null, "FRED"); //
827         }
828     }
829 
830     @Test(expected = IllegalArgumentException.class)
831     public void testSelectUsingNameDoesNotExist() {
832         try (Database db = db()) {
833             db //
834                     .select("select score from person where name=:name") //
835                     .parameters("nam", "FRED");
836         }
837     }
838 
839     @Test(expected = IllegalArgumentException.class)
840     public void testSelectUsingNameWithoutSpecifyingNameThrowsImmediately() {
841         try (Database db = db()) {
842             db //
843                     .select("select score from person where name=:name") //
844                     .parameters("FRED", "JOSEPH");
845         }
846     }
847 
848     @Test
849     public void testSelectTransacted() {
850         try (Database db = db()) {
851             db //
852                     .select("select score from person where name=?") //
853                     .parameters("FRED", "JOSEPH") //
854                     .transacted() //
855                     .getAs(Integer.class) //
856                     .doOnNext(tx -> log
857                             .debug(tx.isComplete() ? "complete" : String.valueOf(tx.value()))) //
858                     .test() //
859                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
860                     .assertValueCount(3) //
861                     .assertComplete();
862         }
863     }
864 
865     @Test
866     public void testSelectAutomappedAnnotatedTransacted() {
867         try (Database db = db()) {
868             db //
869                     .select(Person10.class) //
870                     .transacted() //
871                     .valuesOnly() //
872                     .get().test() //
873                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
874                     .assertValueCount(3) //
875                     .assertComplete();
876         }
877     }
878 
879     @Test
880     public void testSelectAutomappedTransactedValuesOnly() {
881         try (Database db = db()) {
882             db //
883                     .select("select name, score from person") //
884                     .transacted() //
885                     .valuesOnly() //
886                     .autoMap(Person2.class) //
887                     .test() //
888                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
889                     .assertValueCount(3) //
890                     .assertComplete();
891         }
892     }
893 
894     @Test
895     public void testSelectAutomappedTransacted() {
896         try (Database db = db()) {
897             db //
898                     .select("select name, score from person") //
899                     .transacted() //
900                     .autoMap(Person2.class) //
901                     .test() //
902                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
903                     .assertValueCount(4) //
904                     .assertComplete();
905         }
906     }
907 
908     @Test
909     public void testSelectTransactedTuple2() {
910         try (Database db = db()) {
911             Tx<Tuple2<String, Integer>> t = db //
912                     .select("select name, score from person where name=?") //
913                     .parameters("FRED") //
914                     .transacted() //
915                     .getAs(String.class, Integer.class) //
916                     .test() //
917                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
918                     .assertValueCount(2) //
919                     .assertComplete() //
920                     .values().get(0);
921             assertEquals("FRED", t.value()._1());
922             assertEquals(21, (int) t.value()._2());
923         }
924     }
925 
926     @Test
927     public void testSelectTransactedTuple3() {
928         try (Database db = db()) {
929             Tx<Tuple3<String, Integer, String>> t = db //
930                     .select("select name, score, name from person where name=?") //
931                     .parameters("FRED") //
932                     .transacted() //
933                     .getAs(String.class, Integer.class, String.class) //
934                     .test() //
935                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
936                     .assertValueCount(2) //
937                     .assertComplete() //
938                     .values().get(0);
939             assertEquals("FRED", t.value()._1());
940             assertEquals(21, (int) t.value()._2());
941             assertEquals("FRED", t.value()._3());
942         }
943     }
944 
945     @Test
946     public void testSelectTransactedTuple4() {
947         try (Database db = db()) {
948             Tx<Tuple4<String, Integer, String, Integer>> t = db //
949                     .select("select name, score, name, score from person where name=?") //
950                     .parameters("FRED") //
951                     .transacted() //
952                     .getAs(String.class, Integer.class, String.class, Integer.class) //
953                     .test() //
954                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
955                     .assertValueCount(2) //
956                     .assertComplete() //
957                     .values().get(0);
958             assertEquals("FRED", t.value()._1());
959             assertEquals(21, (int) t.value()._2());
960             assertEquals("FRED", t.value()._3());
961             assertEquals(21, (int) t.value()._4());
962         }
963     }
964 
965     @Test
966     public void testSelectTransactedTuple5() {
967         try (Database db = db()) {
968             Tx<Tuple5<String, Integer, String, Integer, String>> t = db //
969                     .select("select name, score, name, score, name from person where name=?") //
970                     .parameters("FRED") //
971                     .transacted() //
972                     .getAs(String.class, Integer.class, String.class, Integer.class, String.class) //
973                     .test() //
974                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
975                     .assertValueCount(2) //
976                     .assertComplete() //
977                     .values().get(0);
978             assertEquals("FRED", t.value()._1());
979             assertEquals(21, (int) t.value()._2());
980             assertEquals("FRED", t.value()._3());
981             assertEquals(21, (int) t.value()._4());
982             assertEquals("FRED", t.value()._5());
983         }
984     }
985 
986     @Test
987     public void testSelectTransactedTuple6() {
988         try (Database db = db()) {
989             Tx<Tuple6<String, Integer, String, Integer, String, Integer>> t = db //
990                     .select("select name, score, name, score, name, score from person where name=?") //
991                     .parameters("FRED") //
992                     .transacted() //
993                     .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
994                             Integer.class) //
995                     .test() //
996                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
997                     .assertValueCount(2) //
998                     .assertComplete() //
999                     .values().get(0);
1000             assertEquals("FRED", t.value()._1());
1001             assertEquals(21, (int) t.value()._2());
1002             assertEquals("FRED", t.value()._3());
1003             assertEquals(21, (int) t.value()._4());
1004             assertEquals("FRED", t.value()._5());
1005             assertEquals(21, (int) t.value()._6());
1006         }
1007     }
1008 
1009     @Test
1010     public void testSelectTransactedTuple7() {
1011         try (Database db = db()) {
1012             Tx<Tuple7<String, Integer, String, Integer, String, Integer, String>> t = db //
1013                     .select("select name, score, name, score, name, score, name from person where name=?") //
1014                     .parameters("FRED") //
1015                     .transacted() //
1016                     .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
1017                             Integer.class, String.class) //
1018                     .test() //
1019                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1020                     .assertValueCount(2) //
1021                     .assertComplete() //
1022                     .values().get(0);
1023             assertEquals("FRED", t.value()._1());
1024             assertEquals(21, (int) t.value()._2());
1025             assertEquals("FRED", t.value()._3());
1026             assertEquals(21, (int) t.value()._4());
1027             assertEquals("FRED", t.value()._5());
1028             assertEquals(21, (int) t.value()._6());
1029             assertEquals("FRED", t.value()._7());
1030         }
1031     }
1032 
1033     @Test
1034     public void testSelectTransactedTupleN() {
1035         try (Database db = db()) {
1036             List<Tx<TupleN<Object>>> list = db //
1037                     .select("select name, score from person where name=?") //
1038                     .parameters("FRED") //
1039                     .transacted() //
1040                     .getTupleN() //
1041                     .test() //
1042                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1043                     .assertValueCount(2) //
1044                     .assertComplete() //
1045                     .values();
1046             assertEquals("FRED", list.get(0).value().values().get(0));
1047             assertEquals(21, (int) list.get(0).value().values().get(1));
1048             assertTrue(list.get(1).isComplete());
1049             assertEquals(2, list.size());
1050         }
1051     }
1052 
1053     @Test
1054     public void testSelectTransactedCount() {
1055         try (Database db = db()) {
1056             db //
1057                     .select("select name, score, name, score, name, score, name from person where name=?") //
1058                     .parameters("FRED") //
1059                     .transacted() //
1060                     .count() //
1061                     .test() //
1062                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1063                     .assertValueCount(1) //
1064                     .assertComplete();
1065         }
1066     }
1067 
1068     @Test
1069     public void testSelectTransactedGetAs() {
1070         try (Database db = db()) {
1071             db //
1072                     .select("select name from person") //
1073                     .transacted() //
1074                     .getAs(String.class) //
1075                     .test() //
1076                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1077                     .assertValueCount(4) //
1078                     .assertComplete();
1079         }
1080     }
1081 
1082     @Test
1083     public void testSelectTransactedGetAsOptional() {
1084         try (Database db = db()) {
1085             List<Tx<Optional<String>>> list = db //
1086                     .select("select name from person where name=?") //
1087                     .parameters("FRED") //
1088                     .transacted() //
1089                     .getAsOptional(String.class) //
1090                     .test() //
1091                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1092                     .assertValueCount(2) //
1093                     .assertComplete() //
1094                     .values();
1095             assertTrue(list.get(0).isValue());
1096             assertEquals("FRED", list.get(0).value().get());
1097             assertTrue(list.get(1).isComplete());
1098         }
1099     }
1100 
1101     @Test
1102     public void testDatabaseFrom() {
1103         Database.from(DatabaseCreator.nextUrl(), 3) //
1104                 .select("select name from person") //
1105                 .getAs(String.class) //
1106                 .count() //
1107                 .test() //
1108                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1109                 .assertError(JdbcSQLException.class);
1110     }
1111 
1112     @Test
1113     public void testSelectTransactedChained() throws Exception {
1114         try (Database db = db()) {
1115             db //
1116                     .select("select score from person where name=?") //
1117                     .parameters("FRED", "JOSEPH") //
1118                     .transacted() //
1119                     .transactedValuesOnly() //
1120                     .getAs(Integer.class) //
1121                     .doOnNext(tx -> log
1122                             .debug(tx.isComplete() ? "complete" : String.valueOf(tx.value())))//
1123                     .flatMap(tx -> tx //
1124                             .select("select name from person where score = ?") //
1125                             .parameter(tx.value()) //
1126                             .valuesOnly() //
1127                             .getAs(String.class)) //
1128                     .test() //
1129                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1130                     .assertNoErrors() //
1131                     .assertValues("FRED", "JOSEPH") //
1132                     .assertComplete();
1133         }
1134     }
1135 
1136     @Test
1137     public void databaseIsAutoCloseable() {
1138         try (Database db = db()) {
1139             log.debug(db.toString());
1140         }
1141     }
1142 
1143     private static void println(Object o) {
1144         log.debug("{}", o);
1145     }
1146 
1147     @Test
1148     public void testSelectChained() {
1149         try (Database db = db(1)) {
1150             // we can do this with 1 connection only!
1151             db.select("select score from person where name=?") //
1152                     .parameters("FRED", "JOSEPH") //
1153                     .getAs(Integer.class) //
1154                     .doOnNext(DatabaseTest::println) //
1155                     .concatMap(score -> {
1156                         log.info("score={}", score);
1157                         return db //
1158                                 .select("select name from person where score = ?") //
1159                                 .parameter(score) //
1160                                 .getAs(String.class) //
1161                                 .doOnComplete(
1162                                         () -> log.info("completed select where score=" + score));
1163                     }) //
1164                     .test() //
1165                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1166                     .assertNoErrors() //
1167                     .assertValues("FRED", "JOSEPH") //
1168                     .assertComplete(); //
1169         }
1170     }
1171 
1172     @Test
1173     @SuppressFBWarnings
1174     public void testReadMeFragment1() {
1175         try (Database db = db()) {
1176             db.select("select name from person") //
1177                     .getAs(String.class) //
1178                     .forEach(DatabaseTest::println);
1179         }
1180     }
1181 
1182     @Test
1183     public void testReadMeFragmentColumnDoesNotExistEmitsSqlSyntaxErrorException() {
1184         try (Database db = Database.test()) {
1185             db.select("select nam from person") //
1186                     .getAs(String.class) //
1187                     .test() //
1188                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1189                     .assertNoValues() //
1190                     .assertError(SQLSyntaxErrorException.class);
1191         }
1192     }
1193 
1194     @Test
1195     public void testReadMeFragmentDerbyHealthCheck() {
1196         try (Database db = Database.test()) {
1197             db.select("select 'a' from sysibm.sysdummy1") //
1198                     .getAs(String.class) //
1199                     .test() //
1200                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1201                     .assertValue("a") //
1202                     .assertComplete();
1203         }
1204     }
1205 
1206     @Test
1207     @SuppressFBWarnings
1208     public void testTupleSupport() {
1209         try (Database db = db()) {
1210             db.select("select name, score from person") //
1211                     .getAs(String.class, Integer.class) //
1212                     .forEach(DatabaseTest::println);
1213         }
1214     }
1215 
1216     @Test
1217     public void testDelayedCallsAreNonBlocking() throws InterruptedException {
1218         List<String> list = new CopyOnWriteArrayList<String>();
1219         try (Database db = db(1)) { //
1220             db.select("select score from person where name=?") //
1221                     .parameter("FRED") //
1222                     .getAs(Integer.class) //
1223                     .doOnNext(x -> Thread.sleep(1000)) //
1224                     .subscribeOn(Schedulers.io()) //
1225                     .subscribe();
1226             Thread.sleep(100);
1227             CountDownLatch latch = new CountDownLatch(1);
1228             db.select("select score from person where name=?") //
1229                     .parameter("FRED") //
1230                     .getAs(Integer.class) //
1231                     .doOnNext(x -> list.add("emitted")) //
1232                     .doOnNext(x -> log.debug("emitted on " + Thread.currentThread().getName())) //
1233                     .doOnNext(x -> latch.countDown()) //
1234                     .subscribe();
1235             list.add("subscribed");
1236             assertTrue(latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
1237             assertEquals(Arrays.asList("subscribed", "emitted"), list);
1238         }
1239     }
1240 
1241     @Test
1242     public void testAutoMapToInterface() {
1243         try (Database db = db()) {
1244             db //
1245                     .select("select name from person") //
1246                     .autoMap(Person.class) //
1247                     .map(p -> p.name()) //
1248                     .test() //
1249                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1250                     .assertValueCount(3) //
1251                     .assertComplete();
1252         }
1253     }
1254 
1255     @Test
1256     public void testAutoMapToInterfaceWithoutAnnotationstsError() {
1257         try (Database db = db()) {
1258             db //
1259                     .select("select name from person") //
1260                     .autoMap(PersonNoAnnotation.class) //
1261                     .map(p -> p.name()) //
1262                     .test() //
1263                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1264                     .assertNoValues() //
1265                     .assertError(AnnotationsNotFoundException.class);
1266         }
1267     }
1268 
1269     @Test
1270     public void testAutoMapToInterfaceWithTwoMethods() {
1271         try (Database db = db()) {
1272             db //
1273                     .select("select name, score from person order by name") //
1274                     .autoMap(Person2.class) //
1275                     .firstOrError() //
1276                     .map(Person2::score) //
1277                     .test() //
1278                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1279                     .assertValue(21) //
1280                     .assertComplete();
1281         }
1282     }
1283 
1284     @Test
1285     public void testAutoMapToInterfaceWithExplicitColumnName() {
1286         try (Database db = db()) {
1287             db //
1288                     .select("select name, score from person order by name") //
1289                     .autoMap(Person3.class) //
1290                     .firstOrError() //
1291                     .map(Person3::examScore) //
1292                     .test() //
1293                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1294                     .assertValue(21) //
1295                     .assertComplete();
1296         }
1297     }
1298 
1299     @Test
1300     public void testAutoMapToInterfaceWithExplicitColumnNameThatDoesNotExist() {
1301         try (Database db = db()) {
1302             db //
1303                     .select("select name, score from person order by name") //
1304                     .autoMap(Person4.class) //
1305                     .firstOrError() //
1306                     .map(Person4::examScore) //
1307                     .test() //
1308                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1309                     .assertNoValues() //
1310                     .assertError(ColumnNotFoundException.class);
1311         }
1312     }
1313 
1314     @Test
1315     public void testAutoMapToInterfaceWithIndex() {
1316         try (Database db = db()) {
1317             db //
1318                     .select("select name, score from person order by name") //
1319                     .autoMap(Person5.class) //
1320                     .firstOrError() //
1321                     .map(Person5::examScore) //
1322                     .test() //
1323                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1324                     .assertValue(21) //
1325                     .assertComplete();
1326         }
1327     }
1328 
1329     @Test
1330     public void testAutoMapToInterfaceWithIndexTooLarge() {
1331         try (Database db = db()) {
1332             db //
1333                     .select("select name, score from person order by name") //
1334                     .autoMap(Person6.class) //
1335                     .firstOrError() //
1336                     .map(Person6::examScore) //
1337                     .test() //
1338                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1339                     .assertNoValues() //
1340                     .assertError(ColumnIndexOutOfRangeException.class);
1341         }
1342     }
1343 
1344     @Test
1345     public void testAutoMapToInterfaceWithIndexTooSmall() {
1346         try (Database db = db()) {
1347             db //
1348                     .select("select name, score from person order by name") //
1349                     .autoMap(Person7.class) //
1350                     .firstOrError() //
1351                     .map(Person7::examScore) //
1352                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1353                     .assertNoValues() //
1354                     .assertError(ColumnIndexOutOfRangeException.class);
1355         }
1356     }
1357 
1358     @Test
1359     public void testAutoMapWithUnmappableColumnType() {
1360         try (Database db = db()) {
1361             db //
1362                     .select("select name from person order by name") //
1363                     .autoMap(Person8.class) //
1364                     .map(p -> p.name()) //
1365                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1366                     .assertNoValues() //
1367                     .assertError(ClassCastException.class);
1368         }
1369     }
1370 
1371     @Test
1372     public void testAutoMapWithMixIndexAndName() {
1373         try (Database db = db()) {
1374             db //
1375                     .select("select name, score from person order by name") //
1376                     .autoMap(Person9.class) //
1377                     .firstOrError() //
1378                     .map(Person9::score) //
1379                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1380                     .assertValue(21) //
1381                     .assertComplete();
1382         }
1383     }
1384 
1385     @Test
1386     public void testAutoMapWithQueryInAnnotation() {
1387         try (Database db = db()) {
1388             db.select(Person10.class) //
1389                     .get() //
1390                     .firstOrError() //
1391                     .map(Person10::score) //
1392                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1393                     .assertValue(21) //
1394                     .assertComplete();
1395         }
1396     }
1397 
1398     @Test
1399     @Ignore
1400     public void testAutoMapForReadMe() {
1401         try (Database db = Database.test()) {
1402             db.select(Person10.class) //
1403                     .get(Person10::name) //
1404                     .blockingForEach(DatabaseTest::println);
1405         }
1406     }
1407 
1408     @Test(expected = QueryAnnotationMissingException.class)
1409     public void testAutoMapWithoutQueryInAnnotation() {
1410         try (Database db = db()) {
1411             db.select(Person.class);
1412         }
1413     }
1414 
1415     @Test
1416     public void testSelectWithoutWhereClause() {
1417         try (Database db = db()) {
1418             Assert.assertEquals(3, (long) db.select("select name from person") //
1419                     .count() //
1420                     .blockingGet());
1421         }
1422     }
1423 
1424     @Test
1425     public void testTuple3() {
1426         try (Database db = db()) {
1427             db //
1428                     .select("select name, score, name from person order by name") //
1429                     .getAs(String.class, Integer.class, String.class) //
1430                     .firstOrError() //
1431                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1432                     .assertComplete() //
1433                     .assertValue(Tuple3.create("FRED", 21, "FRED")); //
1434         }
1435     }
1436 
1437     @Test
1438     public void testTuple4() {
1439         try (Database db = db()) {
1440             db //
1441                     .select("select name, score, name, score from person order by name") //
1442                     .getAs(String.class, Integer.class, String.class, Integer.class) //
1443                     .firstOrError() //
1444                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1445                     .assertComplete() //
1446                     .assertValue(Tuple4.create("FRED", 21, "FRED", 21)); //
1447         }
1448     }
1449 
1450     @Test
1451     public void testTuple5() {
1452         try (Database db = db()) {
1453             db //
1454                     .select("select name, score, name, score, name from person order by name") //
1455                     .getAs(String.class, Integer.class, String.class, Integer.class, String.class) //
1456                     .firstOrError() //
1457                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1458                     .assertComplete().assertValue(Tuple5.create("FRED", 21, "FRED", 21, "FRED")); //
1459         }
1460     }
1461 
1462     @Test
1463     public void testTuple6() {
1464         try (Database db = db()) {
1465             db //
1466                     .select("select name, score, name, score, name, score from person order by name") //
1467                     .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
1468                             Integer.class) //
1469                     .firstOrError() //
1470                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1471                     .assertComplete()
1472                     .assertValue(Tuple6.create("FRED", 21, "FRED", 21, "FRED", 21)); //
1473         }
1474     }
1475 
1476     @Test
1477     public void testTuple7() {
1478         try (Database db = db()) {
1479             db //
1480                     .select("select name, score, name, score, name, score, name from person order by name") //
1481                     .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
1482                             Integer.class, String.class) //
1483                     .firstOrError() //
1484                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1485                     .assertComplete()
1486                     .assertValue(Tuple7.create("FRED", 21, "FRED", 21, "FRED", 21, "FRED")); //
1487         }
1488     }
1489 
1490     @Test
1491     public void testTupleN() {
1492         try (Database db = db()) {
1493             db //
1494                     .select("select name, score, name from person order by name") //
1495                     .getTupleN() //
1496                     .firstOrError().test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1497                     .assertComplete() //
1498                     .assertValue(TupleN.create("FRED", 21, "FRED")); //
1499         }
1500     }
1501 
1502     @Test
1503     public void testTupleNWithClass() {
1504         try (Database db = db()) {
1505             db //
1506                     .select("select score a, score b from person order by name") //
1507                     .getTupleN(Integer.class) //
1508                     .firstOrError() //
1509                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1510                     .assertComplete() //
1511                     .assertValue(TupleN.create(21, 21)); //
1512         }
1513     }
1514 
1515     @Test
1516     public void testTupleNWithClassInTransaction() {
1517         try (Database db = db()) {
1518             db //
1519                     .select("select score a, score b from person order by name") //
1520                     .transactedValuesOnly() //
1521                     .getTupleN(Integer.class) //
1522                     .map(x -> x.value()) //
1523                     .firstOrError() //
1524                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1525                     .assertComplete() //
1526                     .assertValue(TupleN.create(21, 21)); //
1527         }
1528     }
1529 
1530     @Test
1531     public void testHealthCheck() throws InterruptedException {
1532         AtomicBoolean once = new AtomicBoolean(true);
1533         testHealthCheck(c -> {
1534             log.debug("doing health check");
1535             return !once.compareAndSet(true, false);
1536         });
1537     }
1538 
1539     @Test
1540     public void testHealthCheckThatThrows() throws InterruptedException {
1541         AtomicBoolean once = new AtomicBoolean(true);
1542         testHealthCheck(c -> {
1543             log.debug("doing health check");
1544             if (!once.compareAndSet(true, false))
1545                 return true;
1546             else
1547                 throw new RuntimeException("health check failed");
1548         });
1549     }
1550 
1551     @Test
1552     public void testUpdateOneRow() {
1553         try (Database db = db()) {
1554             db.update("update person set score=20 where name='FRED'") //
1555                     .counts() //
1556                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1557                     .assertValue(1) //
1558                     .assertComplete();
1559         }
1560     }
1561 
1562     @Test
1563     public void testUpdateThreeRows() {
1564         try (Database db = db()) {
1565             db.update("update person set score=20") //
1566                     .counts() //
1567                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1568                     .assertValue(3) //
1569                     .assertComplete();
1570         }
1571     }
1572 
1573     @Test
1574     public void testUpdateWithParameter() {
1575         try (Database db = db()) {
1576             db.update("update person set score=20 where name=?") //
1577                     .parameter("FRED").counts() //
1578                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1579                     .assertValue(1) //
1580                     .assertComplete();
1581         }
1582     }
1583 
1584     @Test
1585     public void testUpdateWithParameterTwoRuns() {
1586         try (Database db = db()) {
1587             db.update("update person set score=20 where name=?") //
1588                     .parameters("FRED", "JOSEPH").counts() //
1589                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1590                     .assertValues(1, 1) //
1591                     .assertComplete();
1592         }
1593     }
1594 
1595     @Test
1596     public void testUpdateAllWithParameterFourRuns() {
1597         try (Database db = db()) {
1598             db.update("update person set score=?") //
1599                     .parameters(1, 2, 3, 4) //
1600                     .counts() //
1601                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1602                     .assertValues(3, 3, 3, 3) //
1603                     .assertComplete();
1604         }
1605     }
1606 
1607     @Test
1608     public void testUpdateWithBatchSize2() {
1609         try (Database db = db()) {
1610             db.update("update person set score=?") //
1611                     .batchSize(2) //
1612                     .parameters(1, 2, 3, 4) //
1613                     .counts() //
1614                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1615                     .assertValues(3, 3, 3, 3) //
1616                     .assertComplete();
1617         }
1618     }
1619 
1620     @Test
1621     public void testUpdateWithBatchSize3GreaterThanNumRecords() {
1622         try (Database db = db()) {
1623             db.update("update person set score=?") //
1624                     .batchSize(3) //
1625                     .parameters(1, 2, 3, 4) //
1626                     .counts() //
1627                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1628                     .assertValues(3, 3, 3, 3) //
1629                     .assertComplete();
1630         }
1631     }
1632 
1633     @Test
1634     public void testInsert() {
1635         try (Database db = db()) {
1636             db.update("insert into person(name, score) values(?,?)") //
1637                     .parameters("DAVE", 12, "ANNE", 18) //
1638                     .counts() //
1639                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1640                     .assertValues(1, 1) //
1641                     .assertComplete();
1642             List<Tuple2<String, Integer>> list = db.select("select name, score from person") //
1643                     .getAs(String.class, Integer.class) //
1644                     .toList() //
1645                     .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1646                     .blockingGet();
1647             assertTrue(list.contains(Tuple2.create("DAVE", 12)));
1648             assertTrue(list.contains(Tuple2.create("ANNE", 18)));
1649         }
1650     }
1651 
1652     @Test
1653     public void testReturnGeneratedKeys() {
1654         try (Database db = db()) {
1655             // note is a table with auto increment
1656             db.update("insert into note(text) values(?)") //
1657                     .parameters("HI", "THERE") //
1658                     .returnGeneratedKeys() //
1659                     .getAs(Integer.class)//
1660                     .test() //
1661                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1662                     .assertValues(1, 2) //
1663                     .assertComplete();
1664 
1665             db.update("insert into note(text) values(?)") //
1666                     .parameters("ME", "TOO") //
1667                     .returnGeneratedKeys() //
1668                     .getAs(Integer.class)//
1669                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1670                     .assertValues(3, 4) //
1671                     .assertComplete();
1672         }
1673     }
1674 
1675     @Test
1676     public void testReturnGeneratedKeysDerby() {
1677         Database db = DatabaseCreator.createDerby(1);
1678 
1679         // note is a table with auto increment
1680         db.update("insert into note2(text) values(?)") //
1681                 .parameters("HI", "THERE") //
1682                 .returnGeneratedKeys() //
1683                 .getAs(Integer.class)//
1684                 .test() //
1685                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1686                 .assertNoErrors().assertValues(1, 3) //
1687                 .assertComplete();
1688 
1689         db.update("insert into note2(text) values(?)") //
1690                 .parameters("ME", "TOO") //
1691                 .returnGeneratedKeys() //
1692                 .getAs(Integer.class)//
1693                 .test() //
1694                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1695                 .assertValues(5, 7) //
1696                 .assertComplete();
1697     }
1698 
1699     @Test(expected = IllegalArgumentException.class)
1700     public void testReturnGeneratedKeysWithBatchSizeShouldThrow() {
1701         try (Database db = db()) {
1702             // note is a table with auto increment
1703             db.update("insert into note(text) values(?)") //
1704                     .parameters("HI", "THERE") //
1705                     .batchSize(2) //
1706                     .returnGeneratedKeys();
1707         }
1708     }
1709 
1710     @Test
1711     public void testTransactedReturnGeneratedKeys() {
1712         try (Database db = db()) {
1713             // note is a table with auto increment
1714             db.update("insert into note(text) values(?)") //
1715                     .parameters("HI", "THERE") //
1716                     .transacted() //
1717                     .returnGeneratedKeys() //
1718                     .valuesOnly() //
1719                     .getAs(Integer.class)//
1720                     .test() //
1721                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1722                     .assertValues(1, 2) //
1723                     .assertComplete();
1724 
1725             db.update("insert into note(text) values(?)") //
1726                     .parameters("ME", "TOO") //
1727                     .transacted() //
1728                     .returnGeneratedKeys() //
1729                     .valuesOnly() //
1730                     .getAs(Integer.class)//
1731                     .test() //
1732                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1733                     .assertValues(3, 4) //
1734                     .assertComplete();
1735         }
1736     }
1737 
1738     @Test
1739     public void testTransactedReturnGeneratedKeys2() {
1740         try (Database db = db()) {
1741             // note is a table with auto increment
1742             Flowable<Integer> a = db.update("insert into note(text) values(?)") //
1743                     .parameters("HI", "THERE") //
1744                     .transacted() //
1745                     .returnGeneratedKeys() //
1746                     .valuesOnly() //
1747                     .getAs(Integer.class);
1748 
1749             db.update("insert into note(text) values(?)") //
1750                     .parameters("ME", "TOO") //
1751                     .transacted() //
1752                     .returnGeneratedKeys() //
1753                     .valuesOnly() //
1754                     .getAs(Integer.class)//
1755                     .startWith(a) //
1756                     .test() //
1757                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1758                     .assertValues(1, 2, 3, 4) //
1759                     .assertComplete();
1760         }
1761     }
1762 
1763     @Test
1764     public void testUpdateWithinTransaction() {
1765         try (Database db = db()) {
1766             db //
1767                     .select("select name from person") //
1768                     .transactedValuesOnly() //
1769                     .getAs(String.class) //
1770                     .doOnNext(DatabaseTest::println) //
1771                     .flatMap(tx -> tx//
1772                             .update("update person set score=-1 where name=:name") //
1773                             .batchSize(1) //
1774                             .parameter("name", tx.value()) //
1775                             .valuesOnly() //
1776                             .counts()) //
1777                     .test() //
1778                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1779                     .assertValues(1, 1, 1) //
1780                     .assertComplete();
1781         }
1782     }
1783 
1784     @Test
1785     public void testSelectDependsOnFlowable() {
1786         try (Database db = db()) {
1787             Flowable<Integer> a = db.update("update person set score=100 where name=?") //
1788                     .parameter("FRED") //
1789                     .counts();
1790             db.select("select score from person where name=?") //
1791                     .parameter("FRED") //
1792                     .dependsOn(a) //
1793                     .getAs(Integer.class)//
1794                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1795                     .assertValues(100) //
1796                     .assertComplete();
1797         }
1798     }
1799 
1800     @Test
1801     public void testSelectDependsOnObservable() {
1802         try (Database db = db()) {
1803             Observable<Integer> a = db.update("update person set score=100 where name=?") //
1804                     .parameter("FRED") //
1805                     .counts().toObservable();
1806             db.select("select score from person where name=?") //
1807                     .parameter("FRED") //
1808                     .dependsOn(a) //
1809                     .getAs(Integer.class)//
1810                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1811                     .assertValues(100) //
1812                     .assertComplete();
1813         }
1814     }
1815 
1816     @Test
1817     public void testSelectDependsOnOnSingle() {
1818         try (Database db = db()) {
1819             Single<Long> a = db.update("update person set score=100 where name=?") //
1820                     .parameter("FRED") //
1821                     .counts().count();
1822             db.select("select score from person where name=?") //
1823                     .parameter("FRED") //
1824                     .dependsOn(a) //
1825                     .getAs(Integer.class)//
1826                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1827                     .assertValues(100) //
1828                     .assertComplete();
1829         }
1830     }
1831 
1832     @Test
1833     public void testSelectDependsOnCompletable() {
1834         try (Database db = db()) {
1835             Completable a = db.update("update person set score=100 where name=?") //
1836                     .parameter("FRED") //
1837                     .counts().ignoreElements();
1838             db.select("select score from person where name=?") //
1839                     .parameter("FRED") //
1840                     .dependsOn(a) //
1841                     .getAs(Integer.class)//
1842                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1843                     .assertValues(100) //
1844                     .assertComplete();
1845         }
1846     }
1847 
1848     @Test
1849     public void testUpdateWithinTransactionBatchSize0() {
1850         try (Database db = db()) {
1851             db //
1852                     .select("select name from person") //
1853                     .transactedValuesOnly() //
1854                     .getAs(String.class) //
1855                     .doOnNext(DatabaseTest::println) //
1856                     .flatMap(tx -> tx//
1857                             .update("update person set score=-1 where name=:name") //
1858                             .batchSize(0) //
1859                             .parameter("name", tx.value()) //
1860                             .valuesOnly() //
1861                             .counts()) //
1862                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1863                     .assertValues(1, 1, 1) //
1864                     .assertComplete();
1865         }
1866     }
1867 
1868     private static void info() {
1869         LogManager.getRootLogger().setLevel(Level.INFO);
1870     }
1871 
1872     private static void debug() {
1873         LogManager.getRootLogger().setLevel(Level.INFO);
1874     }
1875 
1876     @Test
1877     public void testCreateBig() {
1878         info();
1879         big(5).select("select count(*) from person") //
1880                 .getAs(Integer.class) //
1881                 .test().awaitDone(20, TimeUnit.SECONDS) //
1882                 .assertValue(5163) //
1883                 .assertComplete();
1884         debug();
1885     }
1886 
1887     @Test
1888     public void testTxWithBig() {
1889         info();
1890         big(1) //
1891                 .select("select name from person") //
1892                 .transactedValuesOnly() //
1893                 .getAs(String.class) //
1894                 .flatMap(tx -> tx//
1895                         .update("update person set score=-1 where name=:name") //
1896                         .batchSize(1) //
1897                         .parameter("name", tx.value()) //
1898                         .valuesOnly() //
1899                         .counts()) //
1900                 .count() //
1901                 .test().awaitDone(20, TimeUnit.SECONDS) //
1902                 .assertValue((long) NAMES_COUNT_BIG) //
1903                 .assertComplete();
1904         debug();
1905     }
1906 
1907     @Test
1908     public void testTxWithBigInputBatchSize2000() {
1909         info();
1910         big(1) //
1911                 .select("select name from person") //
1912                 .transactedValuesOnly() //
1913                 .getAs(String.class) //
1914                 .flatMap(tx -> tx//
1915                         .update("update person set score=-1 where name=:name") //
1916                         .batchSize(2000) //
1917                         .parameter("name", tx.value()) //
1918                         .valuesOnly() //
1919                         .counts()) //
1920                 .count() //
1921                 .test().awaitDone(20, TimeUnit.SECONDS) //
1922                 .assertValue((long) NAMES_COUNT_BIG) //
1923                 .assertComplete();
1924         debug();
1925     }
1926 
1927     @Test
1928     public void testInsertNullClobAndReadClobAsString() {
1929         try (Database db = db()) {
1930             insertNullClob(db);
1931             db.select("select document from person_clob where name='FRED'") //
1932                     .getAsOptional(String.class) //
1933                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1934                     .assertValue(Optional.<String>empty()) //
1935                     .assertComplete();
1936         }
1937     }
1938     
1939     @Test
1940     public void testAutomapClobIssue32() {
1941         try (Database db = db()) {
1942             db.update("insert into person_clob(name, document) values(?, ? )") //
1943                     .parameters("fred", "hello there") //
1944                     .complete() //
1945                     .blockingAwait(TIMEOUT_SECONDS, TimeUnit.SECONDS);
1946             db.select("select * from person_clob") //
1947                     .autoMap(PersonClob.class) //
1948                     .map(pc -> {
1949                         System.out.println(pc);
1950                         return pc.document();
1951                     }) //
1952                     .test() //
1953                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1954                     .assertValueCount(1) //
1955                     .assertComplete();
1956         }
1957     }
1958     
1959     private static interface PersonClob {
1960         @Column
1961         String name();
1962         @Column
1963         String document();
1964     }
1965 
1966     private static void insertNullClob(Database db) {
1967         db.update("insert into person_clob(name,document) values(?,?)") //
1968                 .parameters("FRED", Database.NULL_CLOB) //
1969                 .counts() //
1970                 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1971                 .assertValue(1) //
1972                 .assertComplete();
1973     }
1974 
1975     @Test
1976     public void testClobMethod() {
1977         assertEquals(Database.NULL_CLOB, Database.clob(null));
1978     }
1979 
1980     @Test
1981     public void testBlobMethod() {
1982         assertEquals(Database.NULL_BLOB, Database.blob(null));
1983     }
1984 
1985     @Test
1986     public void testClobMethodPresent() {
1987         assertEquals("a", Database.clob("a"));
1988     }
1989 
1990     @Test
1991     public void testBlobMethodPresent() {
1992         byte[] b = new byte[1];
1993         assertEquals(b, Database.blob(b));
1994     }
1995 
1996     @Test
1997     public void testDateOfBirthNullableForReadMe() {
1998         Database.test() //
1999                 .select("select date_of_birth from person where name='FRED'") //
2000                 .getAsOptional(Instant.class) //
2001                 .blockingForEach(DatabaseTest::println);
2002     }
2003 
2004     @Test
2005     public void testInsertNullClobAndReadClobAsTuple2() {
2006         try (Database db = db()) {
2007             insertNullClob(db);
2008             db.select("select document, document from person_clob where name='FRED'") //
2009                     .getAs(String.class, String.class) //
2010                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2011                     .assertValue(Tuple2.create(null, null)) //
2012                     .assertComplete();
2013         }
2014     }
2015 
2016     @Test
2017     public void testInsertClobAndReadClobAsString() {
2018         try (Database db = db()) {
2019             db.update("insert into person_clob(name,document) values(?,?)") //
2020                     .parameters("FRED", "some text here") //
2021                     .counts() //
2022                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2023                     .assertValue(1) //
2024                     .assertComplete();
2025             db.select("select document from person_clob where name='FRED'") //
2026                     .getAs(String.class) //
2027                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) // .assertValue("some
2028                                                                          // text
2029                                                                          // here")
2030                                                                          // //
2031                     .assertComplete();
2032         }
2033     }
2034 
2035     @Test
2036     public void testInsertClobAndReadClobUsingReader() {
2037         try (Database db = db()) {
2038             db.update("insert into person_clob(name,document) values(?,?)") //
2039                     .parameters("FRED", "some text here") //
2040                     .counts() //
2041                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2042                     .assertValue(1) //
2043                     .assertComplete();
2044             db.select("select document from person_clob where name='FRED'") //
2045                     .getAs(Reader.class) //
2046                     .map(r -> read(r)).test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2047                     .assertValue("some text here") //
2048                     .assertComplete();
2049         }
2050     }
2051 
2052     @Test
2053     public void testInsertBlobAndReadBlobAsByteArray() {
2054         try (Database db = db()) {
2055             insertPersonBlob(db);
2056             db.select("select document from person_blob where name='FRED'") //
2057                     .getAs(byte[].class) //
2058                     .map(b -> new String(b)) //
2059                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2060                     .assertValue("some text here") //
2061                     .assertComplete();
2062         }
2063     }
2064 
2065     private static void insertPersonBlob(Database db) {
2066         byte[] bytes = "some text here".getBytes();
2067         db.update("insert into person_blob(name,document) values(?,?)") //
2068                 .parameters("FRED", bytes) //
2069                 .counts() //
2070                 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2071                 .assertValue(1) //
2072                 .assertComplete();
2073     }
2074 
2075     @Test
2076     public void testInsertBlobAndReadBlobAsInputStream() {
2077         try (Database db = db()) {
2078             byte[] bytes = "some text here".getBytes();
2079             db.update("insert into person_blob(name,document) values(?,?)") //
2080                     .parameters("FRED", new ByteArrayInputStream(bytes)) //
2081                     .counts() //
2082                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2083                     .assertValue(1) //
2084                     .assertComplete();
2085             db.select("select document from person_blob where name='FRED'") //
2086                     .getAs(InputStream.class) //
2087                     .map(is -> read(is)) //
2088                     .map(b -> new String(b)) //
2089                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2090                     .assertValue("some text here") //
2091                     .assertComplete();
2092         }
2093     }
2094 
2095     private static String read(Reader reader) throws IOException {
2096         StringBuffer s = new StringBuffer();
2097         char[] ch = new char[128];
2098         int n = 0;
2099         while ((n = reader.read(ch)) != -1) {
2100             s.append(ch, 0, n);
2101         }
2102         reader.close();
2103         return s.toString();
2104     }
2105 
2106     private static byte[] read(InputStream is) throws IOException {
2107         ByteArrayOutputStream bytes = new ByteArrayOutputStream();
2108         byte[] b = new byte[128];
2109         int n = 0;
2110         while ((n = is.read(b)) != -1) {
2111             bytes.write(b, 0, n);
2112         }
2113         is.close();
2114         return bytes.toByteArray();
2115     }
2116 
2117     private void testHealthCheck(Predicate<Connection> healthy) throws InterruptedException {
2118         TestScheduler scheduler = new TestScheduler();
2119 
2120         NonBlockingConnectionPool pool = Pools //
2121                 .nonBlocking() //
2122                 .connectionProvider(DatabaseCreator.connectionProvider()) //
2123                 .maxIdleTime(10, TimeUnit.MINUTES) //
2124                 .idleTimeBeforeHealthCheck(0, TimeUnit.MINUTES) //
2125                 .healthCheck(healthy) //
2126                 .scheduler(scheduler) //
2127                 .maxPoolSize(1) //
2128                 .build();
2129 
2130         try (Database db = Database.from(pool)) {
2131             TestSubscriber<Integer> ts0 = db.select( //
2132                     "select score from person where name=?") //
2133                     .parameter("FRED") //
2134                     .getAs(Integer.class) //
2135                     .test();
2136             ts0.assertValueCount(0) //
2137                     .assertNotComplete();
2138             scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
2139             ts0.assertValueCount(1) //
2140                     .assertComplete();
2141             TestSubscriber<Integer> ts = db.select( //
2142                     "select score from person where name=?") //
2143                     .parameter("FRED") //
2144                     .getAs(Integer.class) //
2145                     .test() //
2146                     .assertValueCount(0);
2147             log.debug("done2");
2148             scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
2149             Thread.sleep(200);
2150             ts.assertValueCount(1);
2151             Thread.sleep(200);
2152             ts.assertValue(21) //
2153                     .assertComplete();
2154         }
2155     }
2156 
2157     @Test
2158     public void testShutdownBeforeUse() {
2159         NonBlockingConnectionPool pool = Pools //
2160                 .nonBlocking() //
2161                 .connectionProvider(DatabaseCreator.connectionProvider()) //
2162                 .scheduler(Schedulers.io()) //
2163                 .maxPoolSize(1) //
2164                 .build();
2165         pool.close();
2166         Database.from(pool) //
2167                 .select("select score from person where name=?") //
2168                 .parameter("FRED") //
2169                 .getAs(Integer.class) //
2170                 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2171                 .assertNoValues() //
2172                 .assertError(PoolClosedException.class);
2173     }
2174 
2175     @Test
2176     public void testFewerColumnsMappedThanAvailable() {
2177         try (Database db = db()) {
2178             db.select("select name, score from person where name='FRED'") //
2179                     .getAs(String.class) //
2180                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2181                     .assertValues("FRED") //
2182                     .assertComplete();
2183         }
2184     }
2185 
2186     @Test
2187     public void testMoreColumnsMappedThanAvailable() {
2188         try (Database db = db()) {
2189             db //
2190                     .select("select name, score from person where name='FRED'") //
2191                     .getAs(String.class, Integer.class, String.class) //
2192                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2193                     .assertNoValues() //
2194                     .assertError(MoreColumnsRequestedThanExistException.class);
2195         }
2196     }
2197 
2198     @Test
2199     public void testSelectTimestamp() {
2200         try (Database db = db()) {
2201             db //
2202                     .select("select registered from person where name='FRED'") //
2203                     .getAs(Long.class) //
2204                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2205                     .assertValue(FRED_REGISTERED_TIME) //
2206                     .assertComplete();
2207         }
2208     }
2209 
2210     @Test
2211     public void testSelectTimestampAsDate() {
2212         try (Database db = db()) {
2213             db //
2214                     .select("select registered from person where name='FRED'") //
2215                     .getAs(Date.class) //
2216                     .map(d -> d.getTime()) //
2217                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2218                     .assertValue(FRED_REGISTERED_TIME) //
2219                     .assertComplete();
2220         }
2221     }
2222 
2223     @Test
2224     public void testSelectTimestampAsInstant() {
2225         try (Database db = db()) {
2226             db //
2227                     .select("select registered from person where name='FRED'") //
2228                     .getAs(Instant.class) //
2229                     .map(d -> d.toEpochMilli()) //
2230                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2231                     .assertValue(FRED_REGISTERED_TIME) //
2232                     .assertComplete();
2233         }
2234     }
2235 
2236     @Test
2237     public void testUpdateCalendarParameter() {
2238         Calendar c = GregorianCalendar.from(ZonedDateTime.parse("2017-03-25T15:37Z"));
2239         try (Database db = db()) {
2240             db.update("update person set registered=?") //
2241                     .parameter(c) //
2242                     .counts() //
2243                     .test() //
2244                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2245                     .assertValue(3) //
2246                     .assertComplete();
2247             db.select("select registered from person") //
2248                     .getAs(Long.class) //
2249                     .firstOrError() //
2250                     .test() //
2251                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2252                     .assertValue(c.getTimeInMillis()) //
2253                     .assertComplete();
2254         }
2255     }
2256 
2257     @Test
2258     public void testUpdateTimeParameter() {
2259         try (Database db = db()) {
2260             Time t = new Time(1234);
2261             db.update("update person set registered=?") //
2262                     .parameter(t) //
2263                     .counts() //
2264                     .test() //
2265                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2266                     .assertValue(3) //
2267                     .assertComplete();
2268             db.select("select registered from person") //
2269                     .getAs(Long.class) //
2270                     .firstOrError() //
2271                     .test() //
2272                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2273                     .assertValue(1234L) //
2274                     .assertComplete();
2275         }
2276     }
2277 
2278     @Test
2279     public void testUpdateTimestampParameter() {
2280         try (Database db = db()) {
2281             Timestamp t = new Timestamp(1234);
2282             db.update("update person set registered=?") //
2283                     .parameter(t) //
2284                     .counts() //
2285                     .test() //
2286                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2287                     .assertValue(3) //
2288                     .assertComplete();
2289             db.select("select registered from person") //
2290                     .getAs(Long.class) //
2291                     .firstOrError() //
2292                     .test() //
2293                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2294                     .assertValue(1234L) //
2295                     .assertComplete();
2296         }
2297     }
2298 
2299     @Test
2300     public void testUpdateSqlDateParameter() {
2301         try (Database db = db()) {
2302             java.sql.Date t = new java.sql.Date(1234);
2303 
2304             db.update("update person set registered=?") //
2305                     .parameter(t) //
2306                     .counts() //
2307                     .test() //
2308                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2309                     .assertValue(3) //
2310                     .assertComplete();
2311             db.select("select registered from person") //
2312                     .getAs(Long.class) //
2313                     .firstOrError() //
2314                     .test() //
2315                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2316                     // TODO make a more accurate comparison using the current TZ
2317                     .assertValue(x -> Math.abs(x - 1234) <= TimeUnit.HOURS.toMillis(24)) //
2318                     .assertComplete();
2319         }
2320     }
2321 
2322     @Test
2323     public void testUpdateUtilDateParameter() {
2324         try (Database db = db()) {
2325             Date d = new Date(1234);
2326             db.update("update person set registered=?") //
2327                     .parameter(d) //
2328                     .counts() //
2329                     .test() //
2330                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2331                     .assertValue(3) //
2332                     .assertComplete();
2333             db.select("select registered from person") //
2334                     .getAs(Long.class) //
2335                     .firstOrError() //
2336                     .test() //
2337                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2338                     .assertValue(1234L) //
2339                     .assertComplete();
2340         }
2341     }
2342 
2343     @Test
2344     public void testUpdateTimestampAsInstant() {
2345         try (Database db = db()) {
2346             db.update("update person set registered=? where name='FRED'") //
2347                     .parameter(Instant.ofEpochMilli(FRED_REGISTERED_TIME)) //
2348                     .counts() //
2349                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2350                     .assertValue(1) //
2351                     .assertComplete();
2352             db.select("select registered from person where name='FRED'") //
2353                     .getAs(Long.class) //
2354                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2355                     .assertValue(FRED_REGISTERED_TIME) //
2356                     .assertComplete();
2357         }
2358     }
2359 
2360     @Test
2361     public void testUpdateTimestampAsZonedDateTime() {
2362         try (Database db = db()) {
2363             db.update("update person set registered=? where name='FRED'") //
2364                     .parameter(ZonedDateTime.ofInstant(Instant.ofEpochMilli(FRED_REGISTERED_TIME),
2365                             ZoneOffset.UTC.normalized())) //
2366                     .counts() //
2367                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2368                     .assertValue(1) //
2369                     .assertComplete();
2370             db.select("select registered from person where name='FRED'") //
2371                     .getAs(Long.class) //
2372                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2373                     .assertValue(FRED_REGISTERED_TIME) //
2374                     .assertComplete();
2375         }
2376     }
2377 
2378     @Test
2379     public void testCompleteCompletes() {
2380         try (Database db = db(1)) {
2381             db //
2382                     .update("update person set score=-3 where name='FRED'") //
2383                     .complete() //
2384                     .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2385                     .blockingAwait();
2386 
2387             int score = db.select("select score from person where name='FRED'") //
2388                     .getAs(Integer.class) //
2389                     .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2390                     .blockingFirst();
2391             assertEquals(-3, score);
2392         }
2393     }
2394 
2395     @Test
2396     public void testComplete() throws InterruptedException {
2397         try (Database db = db(1)) {
2398             Completable a = db //
2399                     .update("update person set score=-3 where name='FRED'") //
2400                     .complete();
2401             db.update("update person set score=-4 where score = -3") //
2402                     .dependsOn(a) //
2403                     .counts() //
2404                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2405                     .assertValue(1) //
2406                     .assertComplete();
2407         }
2408     }
2409 
2410     @Test
2411     public void testCountsOnlyInTransaction() {
2412         try (Database db = db()) {
2413             db.update("update person set score = -3") //
2414                     .transacted() //
2415                     .countsOnly() //
2416                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2417                     .assertValues(3) //
2418                     .assertComplete();
2419         }
2420     }
2421 
2422     @Test
2423     public void testCountsInTransaction() {
2424         try (Database db = db()) {
2425             db.update("update person set score = -3") //
2426                     .transacted() //
2427                     .counts() //
2428                     .doOnNext(DatabaseTest::println) //
2429                     .toList() //
2430                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2431                     .assertValue(list -> list.get(0).isValue() && list.get(0).value() == 3
2432                             && list.get(1).isComplete() && list.size() == 2) //
2433                     .assertComplete();
2434         }
2435     }
2436 
2437     @Test
2438     public void testTx() throws InterruptedException {
2439         Database db = db(3);
2440         Flowable<Tx<?>> transaction = db //
2441                 .update("update person set score=-3 where name='FRED'") //
2442                 .transaction();
2443 
2444         transaction //
2445                 .doOnCancel(() -> log.debug("disposing")) //
2446                 .doOnNext(DatabaseTest::println) //
2447                 .flatMap(tx -> {
2448                     log.debug("flatmapping");
2449                     return tx //
2450                             .update("update person set score = -4 where score = -3") //
2451                             .countsOnly() //
2452                             .doOnSubscribe(s -> log.debug("subscribed")) //
2453                             .doOnNext(num -> log.debug("num=" + num));
2454                 }) //
2455                 .test() //
2456                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2457                 .assertNoErrors() //
2458                 .assertValue(1) //
2459                 .assertComplete();
2460     }
2461 
2462     @Test
2463     public void testTxAfterSelect() {
2464         Database db = db(3);
2465         Single<Tx<Integer>> transaction = db //
2466                 .select("select score from person where name='FRED'") //
2467                 .transactedValuesOnly() //
2468                 .getAs(Integer.class) //
2469                 .firstOrError();
2470 
2471         transaction //
2472                 .doOnDispose(() -> log.debug("disposing")) //
2473                 .doOnSuccess(DatabaseTest::println) //
2474                 .flatMapPublisher(tx -> {
2475                     log.debug("flatmapping");
2476                     return tx //
2477                             .update("update person set score = -4 where score = ?") //
2478                             .parameter(tx.value()) //
2479                             .countsOnly() //
2480                             .doOnSubscribe(s -> log.debug("subscribed")) //
2481                             .doOnNext(num -> log.debug("num=" + num));
2482                 }) //
2483                 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2484                 .assertNoErrors() //
2485                 .assertValue(1) //
2486                 .assertComplete();
2487     }
2488 
2489     @Test
2490     public void testUseTxOnComplete() {
2491         db(1) //
2492                 .select(Person10.class) //
2493                 .transacted() //
2494                 .get() //
2495                 .lastOrError() //
2496                 .map(tx -> tx.select("select count(*) from person") //
2497                         .count().blockingGet()) //
2498                 .test() //
2499                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2500                 .assertError(CannotForkTransactedConnection.class) //
2501                 .assertValueCount(0);
2502     }
2503 
2504     @Test
2505     public void testSingleFlatMap() {
2506         Single.just(1).flatMapPublisher(n -> Flowable.just(1)).test(1).assertValue(1)
2507                 .assertComplete();
2508     }
2509 
2510     @Test
2511     public void testAutomappedInstanceHasMeaningfulToStringMethod() {
2512         info();
2513         String s = Database.test() //
2514                 .select("select name, score from person where name=?") //
2515                 .parameterStream(Flowable.just("FRED")) //
2516                 .autoMap(Person2.class) //
2517                 .map(x -> x.toString()) //
2518                 .blockingSingle();
2519         assertEquals("Person2[name=FRED, score=21]", s);
2520     }
2521 
2522     @Test
2523     public void testAutomappedEquals() {
2524         boolean b = Database.test() //
2525                 .select("select name, score from person where name=?") //
2526                 .parameterStream(Flowable.just("FRED")) //
2527                 .autoMap(Person2.class) //
2528                 .map(x -> x.equals(x)) //
2529                 .blockingSingle();
2530         assertTrue(b);
2531     }
2532 
2533     @Test
2534     public void testAutomappedDoesNotEqualNull() {
2535         boolean b = Database.test() //
2536                 .select("select name, score from person where name=?") //
2537                 .parameterStream(Flowable.just("FRED")) //
2538                 .autoMap(Person2.class) //
2539                 .map(x -> x.equals(null)) //
2540                 .blockingSingle();
2541         assertFalse(b);
2542     }
2543 
2544     @Test
2545     public void testAutomappedDoesNotEqual() {
2546         boolean b = Database.test() //
2547                 .select("select name, score from person where name=?") //
2548                 .parameterStream(Flowable.just("FRED")) //
2549                 .autoMap(Person2.class) //
2550                 .map(x -> x.equals(new Object())) //
2551                 .blockingSingle();
2552         assertFalse(b);
2553     }
2554 
2555     @Test
2556     public void testAutomappedHashCode() {
2557         Person2 p = Database.test() //
2558                 .select("select name, score from person where name=?") //
2559                 .parameterStream(Flowable.just("FRED")) //
2560                 .autoMap(Person2.class) //
2561                 .blockingSingle();
2562         assertTrue(p.hashCode() != 0);
2563     }
2564 
2565     @Test
2566     public void testAutomappedWithParametersThatProvokesMoreThanOneQuery() {
2567         Database.test() //
2568                 .select("select name, score from person where name=?") //
2569                 .parameters("FRED", "FRED") //
2570                 .autoMap(Person2.class) //
2571                 .doOnNext(DatabaseTest::println) //
2572                 .test() //
2573                 .awaitDone(2000, TimeUnit.SECONDS) //
2574                 .assertNoErrors() //
2575                 .assertValueCount(2) //
2576                 .assertComplete();
2577     }
2578 
2579     @Test
2580     public void testAutomappedObjectsEqualsAndHashCodeIsDistinctOnValues() {
2581         Database.test() //
2582                 .select("select name, score from person where name=?") //
2583                 .parameters("FRED", "FRED") //
2584                 .autoMap(Person2.class) //
2585                 .distinct() //
2586                 .doOnNext(DatabaseTest::println) //
2587                 .test() //
2588                 .awaitDone(2000, TimeUnit.SECONDS) //
2589                 .assertNoErrors() //
2590                 .assertValueCount(1) //
2591                 .assertComplete();
2592     }
2593 
2594     @Test
2595     public void testAutomappedObjectsEqualsDifferentiatesDifferentInterfacesWithSameMethodNamesAndValues() {
2596         PersonDistinct1 p1 = Database.test() //
2597                 .select("select name, score from person where name=?") //
2598                 .parameters("FRED") //
2599                 .autoMap(PersonDistinct1.class) //
2600                 .blockingFirst();
2601 
2602         PersonDistinct2 p2 = Database.test() //
2603                 .select("select name, score from person where name=?") //
2604                 .parameters("FRED") //
2605                 .autoMap(PersonDistinct2.class) //
2606                 .blockingFirst();
2607 
2608         assertNotEquals(p1, p2);
2609     }
2610 
2611     @Test
2612     public void testAutomappedObjectsWhenDefaultMethodInvoked() {
2613         // only run test if java 8
2614         if (System.getProperty("java.version").startsWith("1.8.")) {
2615             PersonWithDefaultMethod p = Database.test() //
2616                     .select("select name, score from person where name=?") //
2617                     .parameters("FRED") //
2618                     .autoMap(PersonWithDefaultMethod.class) //
2619                     .blockingFirst();
2620             assertEquals("fred", p.nameLower());
2621         }
2622     }
2623 
2624     @Test(expected = AutomappedInterfaceInaccessibleException.class)
2625     public void testAutomappedObjectsWhenDefaultMethodInvokedAndIsNonPublicThrows() {
2626         PersonWithDefaultMethodNonPublic p = Database.test() //
2627                 .select("select name, score from person where name=?") //
2628                 .parameters("FRED") //
2629                 .autoMap(PersonWithDefaultMethodNonPublic.class) //
2630                 .blockingFirst();
2631         p.nameLower();
2632     }
2633 
2634     @Test
2635     public void testBlockingDatabase() {
2636         Database db = blocking();
2637         db.select("select score from person where name=?") //
2638                 .parameters("FRED", "JOSEPH") //
2639                 .getAs(Integer.class) //
2640                 .test() //
2641                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2642                 .assertNoErrors() //
2643                 .assertValues(21, 34) //
2644                 .assertComplete();
2645     }
2646 
2647     @Test
2648     public void testBlockingDatabaseTransacted() {
2649         Database db = blocking();
2650         db.select("select score from person where name=?") //
2651                 .parameters("FRED", "JOSEPH") //
2652                 .transactedValuesOnly() //
2653                 .getAs(Integer.class) //
2654                 .map(x -> x.value()) //
2655                 .test() //
2656                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2657                 .assertNoErrors() //
2658                 .assertValues(21, 34) //
2659                 .assertComplete();
2660     }
2661 
2662     @Test
2663     public void testBlockingDatabaseTransactedNested() {
2664         Database db = blocking();
2665         db.select("select score from person where name=?") //
2666                 .parameters("FRED", "JOSEPH") //
2667                 .transactedValuesOnly() //
2668                 .getAs(Integer.class) //
2669                 .flatMap(tx -> tx.select("select name from person where score=?") //
2670                         .parameter(tx.value()) //
2671                         .valuesOnly() //
2672                         .getAs(String.class))
2673                 .test() //
2674                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2675                 .assertNoErrors() //
2676                 .assertValues("FRED", "JOSEPH") //
2677                 .assertComplete();
2678     }
2679 
2680     @Test
2681     public void testUsingNormalJDBCApi() {
2682         Database db = db(1);
2683         db.apply(con -> {
2684             try (PreparedStatement stmt = con
2685                     .prepareStatement("select count(*) from person where name='FRED'");
2686                     ResultSet rs = stmt.executeQuery()) {
2687                 rs.next();
2688                 return rs.getInt(1);
2689             }
2690         }) //
2691                 .test() //
2692                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2693                 .assertValue(1) //
2694                 .assertComplete();
2695 
2696         // now check that the connection was returned to the pool
2697         db.select("select count(*) from person where name='FRED'") //
2698                 .getAs(Integer.class) //
2699                 .test() //
2700                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2701                 .assertValue(1) //
2702                 .assertComplete();
2703     }
2704 
2705     @Test
2706     public void testUsingNormalJDBCApiCompletable() {
2707         Database db = db(1);
2708         db.apply(con -> {
2709             try (PreparedStatement stmt = con
2710                     .prepareStatement("select count(*) from person where name='FRED'");
2711                     ResultSet rs = stmt.executeQuery()) {
2712                 rs.next();
2713             }
2714         }) //
2715                 .test() //
2716                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2717                 .assertComplete();
2718 
2719         // now check that the connection was returned to the pool
2720         db.select("select count(*) from person where name='FRED'") //
2721                 .getAs(Integer.class) //
2722                 .test() //
2723                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2724                 .assertValue(1) //
2725                 .assertComplete();
2726     }
2727 
2728     @Test
2729     public void testCallableStatement() {
2730         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2731         db.apply(con -> {
2732             try (Statement stmt = con.createStatement()) {
2733                 CallableStatement st = con.prepareCall("call getPersonCount(?, ?)");
2734                 st.setInt(1, 0);
2735                 st.registerOutParameter(2, Types.INTEGER);
2736                 st.execute();
2737                 assertEquals(2, st.getInt(2));
2738             }
2739         }).blockingAwait(TIMEOUT_SECONDS, TimeUnit.SECONDS);
2740     }
2741 
2742     @Test
2743     public void testCallableStatementReturningResultSets() {
2744         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2745         db.apply(con -> {
2746             try (Statement stmt = con.createStatement()) {
2747                 CallableStatement st = con.prepareCall("call in1out0rs2(?)");
2748                 st.setInt(1, 0);
2749                 st.execute();
2750                 ResultSet rs1 = st.getResultSet();
2751                 st.getMoreResults(Statement.KEEP_CURRENT_RESULT);
2752                 ResultSet rs2 = st.getResultSet();
2753                 rs1.next();
2754                 assertEquals("FRED", rs1.getString(1));
2755                 rs2.next();
2756                 assertEquals("SARAH", rs2.getString(1));
2757             }
2758         }).blockingAwait(TIMEOUT_SECONDS, TimeUnit.SECONDS);
2759     }
2760 
2761     @Test
2762     public void testCallableApiNoParameters() {
2763         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2764         db //
2765                 .call("call zero()") //
2766                 .once() //
2767                 .test() //
2768                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2769                 .assertComplete();
2770     }
2771 
2772     @Test
2773     public void testCallableApiNoParametersTransacted() {
2774         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2775         db //
2776                 .call("call zero()") //
2777                 .transacted() //
2778                 .once() //
2779                 .test() //
2780                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2781                 .assertValueCount(1) //
2782                 .assertValue(x -> x.isComplete()) //
2783                 .assertComplete();
2784     }
2785 
2786     @Test
2787     public void testCallableApiOneInOutParameter() {
2788         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2789         db //
2790                 .call("call inout1(?)") //
2791                 .inOut(Type.INTEGER, Integer.class) //
2792                 .input(4) //
2793                 .test() //
2794                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2795                 .assertValue(5) //
2796                 .assertComplete();
2797     }
2798 
2799     @Test
2800     public void testCallableApiOneInOutParameterTransacted() {
2801         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2802         db //
2803                 .call("call inout1(?)") //
2804                 .transacted() //
2805                 .inOut(Type.INTEGER, Integer.class) //
2806                 .input(4) //
2807                 .flatMap(Tx.flattenToValuesOnly()) //
2808                 .test() //
2809                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2810                 .assertValue(5) //
2811                 .assertComplete();
2812     }
2813 
2814     @Test
2815     public void testCallableApiTwoInOutParameters() {
2816         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2817         db //
2818                 .call("call inout2(?, ?)") //
2819                 .inOut(Type.INTEGER, Integer.class) //
2820                 .inOut(Type.INTEGER, Integer.class) //
2821                 .input(4, 10) //
2822                 .test() //
2823                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2824                 .assertValue(x -> x._1() == 5 && x._2() == 12) //
2825                 .assertComplete();
2826     }
2827 
2828     @Test
2829     public void testCallableApiTwoInOutParametersTransacted() {
2830         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2831         db //
2832                 .call("call inout2(?, ?)") //
2833                 .transacted() //
2834                 .inOut(Type.INTEGER, Integer.class) //
2835                 .inOut(Type.INTEGER, Integer.class) //
2836                 .input(4, 10) //
2837                 .flatMap(Tx.flattenToValuesOnly()) //
2838                 .test() //
2839                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2840                 .assertValue(x -> x._1() == 5 && x._2() == 12) //
2841                 .assertComplete();
2842     }
2843 
2844     @Test
2845     public void testCallableApiThreeInOutParameters() {
2846         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2847         db //
2848                 .call("call inout3(?, ?, ?)") //
2849                 .inOut(Type.INTEGER, Integer.class) //
2850                 .inOut(Type.INTEGER, Integer.class) //
2851                 .inOut(Type.INTEGER, Integer.class) //
2852                 .input(4, 10, 13) //
2853                 .test() //
2854                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2855                 .assertValue(x -> x._1() == 5 && x._2() == 12 && x._3() == 16) //
2856                 .assertComplete();
2857     }
2858 
2859     @Test
2860     public void testCallableApiThreeInOutParametersTransacted() {
2861         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2862         db //
2863                 .call("call inout3(?, ?, ?)") //
2864                 .transacted() //
2865                 .inOut(Type.INTEGER, Integer.class) //
2866                 .inOut(Type.INTEGER, Integer.class) //
2867                 .inOut(Type.INTEGER, Integer.class) //
2868                 .input(4, 10, 13) //
2869                 .flatMap(Tx.flattenToValuesOnly()) //
2870                 .test() //
2871                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2872                 .assertValue(x -> x._1() == 5 && x._2() == 12 && x._3() == 16) //
2873                 .assertComplete();
2874     }
2875 
2876     @Test
2877     public void testCallableApiReturningOneOutParameter() throws InterruptedException {
2878         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2879         db //
2880                 .call("call in1out1(?,?)") //
2881                 .in() //
2882                 .out(Type.INTEGER, Integer.class) //
2883                 .input(0, 10, 20) //
2884                 .test() //
2885                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2886                 .assertValues(0, 10, 20) //
2887                 .assertComplete();
2888     }
2889 
2890     @Test
2891     public void testCallableApiReturningOneOutParameterTransacted() throws InterruptedException {
2892         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2893         db //
2894                 .call("call in1out1(?,?)") //
2895                 .transacted() //
2896                 .in() //
2897                 .out(Type.INTEGER, Integer.class) //
2898                 .input(0, 10, 20) //
2899                 .flatMap(Tx.flattenToValuesOnly()) //
2900                 .test() //
2901                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2902                 .assertValues(0, 10, 20) //
2903                 .assertComplete();
2904     }
2905 
2906     @Test
2907     public void testCallableApiReturningTwoOutParameters() throws InterruptedException {
2908         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2909         db //
2910                 .call("call in1out2(?,?,?)") //
2911                 .in() //
2912                 .out(Type.INTEGER, Integer.class) //
2913                 .out(Type.INTEGER, Integer.class) //
2914                 .input(0, 10, 20) //
2915                 .test() //
2916                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2917                 .assertValueCount(3) //
2918                 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1) //
2919                 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11) //
2920                 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21) //
2921                 .assertComplete();
2922 
2923         db.call("call in1out2(?,?,?)").in().out(Type.INTEGER, Integer.class)
2924                 .out(Type.INTEGER, Integer.class).input(0, 10, 20)
2925                 .blockingForEach(System.out::println);
2926     }
2927 
2928     @Test
2929     public void testCallableApiReturningTwoOutParametersTransacted() throws InterruptedException {
2930         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2931         db //
2932                 .call("call in1out2(?,?,?)") //
2933                 .transacted() //
2934                 .in() //
2935                 .out(Type.INTEGER, Integer.class) //
2936                 .out(Type.INTEGER, Integer.class) //
2937                 .input(0, 10, 20) //
2938                 .flatMap(Tx.flattenToValuesOnly()) //
2939                 .test() //
2940                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2941                 .assertValueCount(3) //
2942                 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1) //
2943                 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11) //
2944                 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21) //
2945                 .assertComplete();
2946 
2947         db.call("call in1out2(?,?,?)") //
2948                 .in() //
2949                 .out(Type.INTEGER, Integer.class) //
2950                 .out(Type.INTEGER, Integer.class) //
2951                 .input(0, 10, 20) //
2952                 .blockingForEach(System.out::println);
2953     }
2954 
2955     @Test
2956     public void testCallableApiReturningThreeOutParameters() throws InterruptedException {
2957         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2958         db //
2959                 .call("call in1out3(?,?,?,?)") //
2960                 .in() //
2961                 .out(Type.INTEGER, Integer.class) //
2962                 .out(Type.INTEGER, Integer.class) //
2963                 .out(Type.INTEGER, Integer.class) //
2964                 .input(0, 10, 20) //
2965                 .test() //
2966                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2967                 .assertValueCount(3) //
2968                 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1 && x._3() == 2) //
2969                 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11 && x._3() == 12) //
2970                 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21 && x._3() == 22) //
2971                 .assertComplete();
2972     }
2973 
2974     @Test
2975     public void testCallableApiReturningThreeOutParametersTransacted() throws InterruptedException {
2976         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2977         db //
2978                 .call("call in1out3(?,?,?,?)") //
2979                 .transacted() //
2980                 .in() //
2981                 .out(Type.INTEGER, Integer.class) //
2982                 .out(Type.INTEGER, Integer.class) //
2983                 .out(Type.INTEGER, Integer.class) //
2984                 .input(0, 10, 20) //
2985                 .flatMap(Tx.flattenToValuesOnly()) //
2986                 .test() //
2987                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2988                 .assertValueCount(3) //
2989                 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1 && x._3() == 2) //
2990                 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11 && x._3() == 12) //
2991                 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21 && x._3() == 22) //
2992                 .assertComplete();
2993     }
2994 
2995     @Test
2996     public void testCallableApiReturningOneResultSet() throws InterruptedException {
2997         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2998         db //
2999                 .call("call in0out0rs1()") //
3000                 .autoMap(Person2.class) //
3001                 .input(0, 10, 20) //
3002                 .doOnNext(x -> {
3003                     assertTrue(x.outs().isEmpty());
3004                 }) //
3005                 .flatMap(x -> x.results()) //
3006                 .test() //
3007                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3008                 .assertValueAt(0, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3009                 .assertValueAt(1, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3010                 .assertValueAt(2, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3011                 .assertValueAt(3, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3012                 .assertValueAt(4, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3013                 .assertValueAt(5, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3014                 .assertValueCount(6) //
3015                 .assertComplete();
3016     }
3017 
3018     @Test
3019     public void testCallableApiReturningOneResultSetTransacted() throws InterruptedException {
3020         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3021         db //
3022                 .call("call in0out0rs1()") //
3023                 .transacted() //
3024                 .autoMap(Person2.class) //
3025                 .input(0, 10, 20) //
3026                 .flatMap(Tx.flattenToValuesOnly()).doOnNext(x -> {
3027                     assertTrue(x.outs().isEmpty());
3028                 }) //
3029                 .flatMap(x -> x.results()) //
3030                 .test() //
3031                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3032                 .assertValueAt(0, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3033                 .assertValueAt(1, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3034                 .assertValueAt(2, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3035                 .assertValueAt(3, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3036                 .assertValueAt(4, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3037                 .assertValueAt(5, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3038                 .assertValueCount(6) //
3039                 .assertComplete();
3040     }
3041 
3042     @Test
3043     public void testCallableApiReturningTwoResultSetsWithAutoMap() throws InterruptedException {
3044         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3045         db //
3046                 .call("call in1out0rs2(?)") //
3047                 .in() //
3048                 .autoMap(Person2.class) //
3049                 .autoMap(Person2.class) //
3050                 .input(0, 10, 20) //
3051                 .doOnNext(x -> assertTrue(x.outs().isEmpty())) //
3052                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())) //
3053                 .test() //
3054                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3055                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3056                         "SARAHFRED") //
3057                 .assertComplete();
3058     }
3059 
3060     @Test
3061     public void testCallableApiReturningTwoResultSetsWithAutoMapTransacted()
3062             throws InterruptedException {
3063         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3064         db //
3065                 .call("call in1out0rs2(?)") //
3066                 .transacted() //
3067                 .in() //
3068                 .autoMap(Person2.class) //
3069                 .autoMap(Person2.class) //
3070                 .input(0, 10, 20) //
3071                 .flatMap(Tx.flattenToValuesOnly()) //
3072                 .doOnNext(x -> assertTrue(x.outs().isEmpty())) //
3073                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())) //
3074                 .test() //
3075                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3076                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3077                         "SARAHFRED") //
3078                 .assertComplete();
3079     }
3080 
3081     @Test
3082     public void testCallableApiReturningTwoResultSetsWithGet() throws InterruptedException {
3083         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3084         db //
3085                 .call("call in1out0rs2(?)") //
3086                 .in() //
3087                 .getAs(String.class, Integer.class) //
3088                 .getAs(String.class, Integer.class)//
3089                 .input(0, 10, 20) //
3090                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y._1() + z._1())) //
3091                 .test() //
3092                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3093                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3094                         "SARAHFRED") //
3095                 .assertComplete();
3096     }
3097 
3098     @Test
3099     public void testCallableApiReturningTwoResultSetsWithGetTransacted()
3100             throws InterruptedException {
3101         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3102         db //
3103                 .call("call in1out0rs2(?)") //
3104                 .transacted() //
3105                 .in() //
3106                 .getAs(String.class, Integer.class) //
3107                 .getAs(String.class, Integer.class)//
3108                 .input(0, 10, 20) //
3109                 .flatMap(Tx.flattenToValuesOnly()) //
3110                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y._1() + z._1())) //
3111                 .test() //
3112                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3113                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3114                         "SARAHFRED") //
3115                 .assertComplete();
3116     }
3117 
3118     @Test
3119     public void testCallableApiReturningTwoResultSetsSwitchOrder1() throws InterruptedException {
3120         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3121         db //
3122                 .call("call in1out0rs2(?)") //
3123                 .autoMap(Person2.class) //
3124                 .in() //
3125                 .autoMap(Person2.class) //
3126                 .input(0, 10, 20) //
3127                 .doOnNext(x -> assertTrue(x.outs().isEmpty())) //
3128                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())) //
3129                 .test() //
3130                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3131                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3132                         "SARAHFRED") //
3133                 .assertComplete();
3134     }
3135 
3136     @Test
3137     public void testCallableApiReturningTwoResultSetsSwitchOrder1Transacted()
3138             throws InterruptedException {
3139         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3140         db //
3141                 .call("call in1out0rs2(?)") //
3142                 .transacted() //
3143                 .autoMap(Person2.class) //
3144                 .in() //
3145                 .autoMap(Person2.class) //
3146                 .input(0, 10, 20) //
3147                 .flatMap(Tx.flattenToValuesOnly()) //
3148                 .doOnNext(x -> assertTrue(x.outs().isEmpty())) //
3149                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())) //
3150                 .test() //
3151                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3152                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3153                         "SARAHFRED") //
3154                 .assertComplete();
3155     }
3156 
3157     @Test
3158     public void testCallableApiReturningTwoResultSetsSwitchOrder2() throws InterruptedException {
3159         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3160         db //
3161                 .call("call in1out0rs2(?)") //
3162                 .autoMap(Person2.class) //
3163                 .autoMap(Person2.class) //
3164                 .in() //
3165                 .input(0, 10, 20) //
3166                 .doOnNext(x -> assertTrue(x.outs().isEmpty())) //
3167                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())) //
3168                 .test() //
3169                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3170                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3171                         "SARAHFRED") //
3172                 .assertComplete();
3173     }
3174 
3175     @Test
3176     public void testCallableApiReturningTwoResultSetsSwitchOrder2Transacted()
3177             throws InterruptedException {
3178         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3179         db //
3180                 .call("call in1out0rs2(?)") //
3181                 .transacted() //
3182                 .autoMap(Person2.class) //
3183                 .autoMap(Person2.class) //
3184                 .in() //
3185                 .input(0, 10, 20) //
3186                 .flatMap(Tx.flattenToValuesOnly()) //
3187                 .doOnNext(x -> assertTrue(x.outs().isEmpty())) //
3188                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())) //
3189                 .test() //
3190                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3191                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3192                         "SARAHFRED") //
3193                 .assertComplete();
3194     }
3195 
3196     @Test
3197     public void testCallableApiReturningTwoOutputThreeResultSets() throws InterruptedException {
3198         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3199         db //
3200                 .call("call in0out2rs3(?, ?)") //
3201                 .out(Type.INTEGER, Integer.class) //
3202                 .out(Type.INTEGER, Integer.class) //
3203                 .autoMap(Person2.class) //
3204                 .autoMap(Person2.class) //
3205                 .autoMap(Person2.class) //
3206                 .input(0, 10, 20) //
3207                 .doOnNext(x -> {
3208                     assertEquals(2, x.outs().size());
3209                     assertEquals(1, x.outs().get(0));
3210                     assertEquals(2, x.outs().get(1));
3211                 }) //
3212                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())
3213                         .zipWith(x.results3(), (y, z) -> y + z.name())) //
3214                 .test() //
3215                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3216                 .assertNoErrors() //
3217                 .assertValues("FREDSARAHFRED", "SARAHFREDSARAH", "FREDSARAHFRED", "SARAHFREDSARAH",
3218                         "FREDSARAHFRED", "SARAHFREDSARAH") //
3219                 .assertComplete();
3220     }
3221 
3222     @Test
3223     public void testCallableApiReturningTwoOutputThreeResultSetsTransacted()
3224             throws InterruptedException {
3225         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3226         db //
3227                 .call("call in0out2rs3(?, ?)") //
3228                 .transacted() //
3229                 .out(Type.INTEGER, Integer.class) //
3230                 .out(Type.INTEGER, Integer.class) //
3231                 .autoMap(Person2.class) //
3232                 .autoMap(Person2.class) //
3233                 .autoMap(Person2.class) //
3234                 .input(0, 10, 20) //
3235                 .flatMap(Tx.flattenToValuesOnly()) //
3236                 .doOnNext(x -> {
3237                     assertEquals(2, x.outs().size());
3238                     assertEquals(1, x.outs().get(0));
3239                     assertEquals(2, x.outs().get(1));
3240                 }) //
3241                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())
3242                         .zipWith(x.results3(), (y, z) -> y + z.name())) //
3243                 .test() //
3244                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3245                 .assertNoErrors() //
3246                 .assertValues("FREDSARAHFRED", "SARAHFREDSARAH", "FREDSARAHFRED", "SARAHFREDSARAH",
3247                         "FREDSARAHFRED", "SARAHFREDSARAH") //
3248                 .assertComplete();
3249     }
3250 
3251     @Test
3252     public void testCallableApiReturningTenOutParameters() {
3253         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3254         db //
3255                 .call("call out10(?,?,?,?,?,?,?,?,?,?)") //
3256                 .out(Type.INTEGER, Integer.class) //
3257                 .out(Type.INTEGER, Integer.class) //
3258                 .out(Type.INTEGER, Integer.class) //
3259                 .out(Type.INTEGER, Integer.class) //
3260                 .out(Type.INTEGER, Integer.class) //
3261                 .out(Type.INTEGER, Integer.class) //
3262                 .out(Type.INTEGER, Integer.class) //
3263                 .out(Type.INTEGER, Integer.class) //
3264                 .out(Type.INTEGER, Integer.class) //
3265                 .out(Type.INTEGER, Integer.class) //
3266                 .input(0, 10) //
3267                 .test() //
3268                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3269                 .assertNoErrors() //
3270                 .assertValueCount(2) //
3271                 .assertComplete();
3272     }
3273 
3274     @Test
3275     public void testCallableApiReturningTenOutParametersTransacted() {
3276         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3277         db //
3278                 .call("call out10(?,?,?,?,?,?,?,?,?,?)") //
3279                 .transacted() //
3280                 .out(Type.INTEGER, Integer.class) //
3281                 .out(Type.INTEGER, Integer.class) //
3282                 .out(Type.INTEGER, Integer.class) //
3283                 .out(Type.INTEGER, Integer.class) //
3284                 .out(Type.INTEGER, Integer.class) //
3285                 .out(Type.INTEGER, Integer.class) //
3286                 .out(Type.INTEGER, Integer.class) //
3287                 .out(Type.INTEGER, Integer.class) //
3288                 .out(Type.INTEGER, Integer.class) //
3289                 .out(Type.INTEGER, Integer.class) //
3290                 .input(0, 10) //
3291                 .flatMap(Tx.flattenToValuesOnly()) //
3292                 .test() //
3293                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3294                 .assertNoErrors() //
3295                 .assertValueCount(2) //
3296                 .assertComplete();
3297     }
3298 
3299     @Test
3300     public void testCallableApiReturningTenResultSets() {
3301         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3302         db //
3303                 .call("call rs10()") //
3304                 .autoMap(Person2.class) //
3305                 .autoMap(Person2.class) //
3306                 .autoMap(Person2.class) //
3307                 .autoMap(Person2.class) //
3308                 .autoMap(Person2.class) //
3309                 .autoMap(Person2.class) //
3310                 .autoMap(Person2.class) //
3311                 .autoMap(Person2.class) //
3312                 .autoMap(Person2.class) //
3313                 .autoMap(Person2.class) //
3314                 .input(0, 10) //
3315                 .doOnNext(x -> {
3316                     assertEquals(0, x.outs().size());
3317                     assertEquals(10, x.results().size());
3318                 }) //
3319                    // just zip the first and last result sets
3320                 .flatMap(x -> x.results(0) //
3321                         .zipWith(x.results(9), //
3322                                 (y, z) -> ((Person2) y).name() + ((Person2) z).name()))
3323                 .test() //
3324                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3325                 .assertNoErrors() //
3326                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED") //
3327                 .assertComplete();
3328     }
3329 
3330     @Test
3331     public void testCallableApiReturningTenResultSetsTransacted() {
3332         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3333         db //
3334                 .call("call rs10()") //
3335                 .transacted() //
3336                 .autoMap(Person2.class) //
3337                 .autoMap(Person2.class) //
3338                 .autoMap(Person2.class) //
3339                 .autoMap(Person2.class) //
3340                 .autoMap(Person2.class) //
3341                 .autoMap(Person2.class) //
3342                 .autoMap(Person2.class) //
3343                 .autoMap(Person2.class) //
3344                 .autoMap(Person2.class) //
3345                 .autoMap(Person2.class) //
3346                 .input(0, 10) //
3347                 .flatMap(Tx.flattenToValuesOnly()) //
3348                 .doOnNext(x -> {
3349                     assertEquals(0, x.outs().size());
3350                     assertEquals(10, x.results().size());
3351                 }) //
3352                    // just zip the first and last result sets
3353                 .flatMap(x -> x.results(0) //
3354                         .zipWith(x.results(9), //
3355                                 (y, z) -> ((Person2) y).name() + ((Person2) z).name()))
3356                 .test() //
3357                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3358                 .assertNoErrors() //
3359                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED") //
3360                 .assertComplete();
3361     }
3362 
3363     @Test
3364     public void testCallableApiReturningOneResultSetGetAs() throws InterruptedException {
3365         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3366         db //
3367                 .call("call in0out0rs1()") //
3368                 .getAs(String.class, Integer.class) //
3369                 .input(1) //
3370                 .flatMap(x -> x.results()) //
3371                 .test() //
3372                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3373                 .assertValueAt(0, p -> "FRED".equalsIgnoreCase(p._1()) && p._2() == 24) //
3374                 .assertValueAt(1, p -> "SARAH".equalsIgnoreCase(p._1()) && p._2() == 26) //
3375                 .assertComplete();
3376     }
3377 
3378     @Test
3379     public void testH2InClauseWithoutSetArray() {
3380         db().apply(con -> {
3381             try (PreparedStatement ps = con
3382                     .prepareStatement("select count(*) from person where name in (?, ?)")) {
3383                 ps.setString(1, "FRED");
3384                 ps.setString(2, "JOSEPH");
3385                 ResultSet rs = ps.executeQuery();
3386                 rs.next();
3387                 return rs.getInt(1);
3388             }
3389         }).test() //
3390                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS).assertComplete() // ;
3391                 .assertValue(2); //
3392     }
3393 
3394     @Test
3395     @Ignore
3396     public void testH2InClauseWithSetArray() {
3397         db().apply(con -> {
3398             try (PreparedStatement ps = con
3399                     .prepareStatement("select count(*) from person where name in (?)")) {
3400                 ps.setArray(1, con.createArrayOf("VARCHAR", new String[] { "FRED", "JOSEPH" }));
3401                 ResultSet rs = ps.executeQuery();
3402                 rs.next();
3403                 return rs.getInt(1);
3404             }
3405         }).test() //
3406                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS).assertComplete() // ;
3407                 .assertValue(2); //
3408     }
3409 
3410     @Test
3411     public void testUpdateTxPerformed() {
3412         Database db = db(1);
3413         db.update("update person set score = 1000") //
3414                 .transacted() //
3415                 .counts() //
3416                 .test() //
3417                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3418                 .assertComplete() //
3419                 .assertValueCount(2); // value and complete
3420 
3421         db.select("select count(*) from person where score=1000") //
3422                 .getAs(Integer.class) //
3423                 .test() //
3424                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3425                 .assertComplete() //
3426                 .assertValue(3);
3427 
3428     }
3429 
3430     @Test
3431     public void testIssue20AutoCommitEnabledAndConnectionThrowsOnCommit() {
3432         ConnectionProvider cp = DatabaseCreator.connectionProvider();
3433         Database db = Database.fromBlocking(new ConnectionProvider() {
3434 
3435             @Override
3436             public Connection get() {
3437                 Connection c = cp.get();
3438                 try {
3439                     c.setAutoCommit(true);
3440                 } catch (SQLException e) {
3441                     throw new SQLRuntimeException(e);
3442                 }
3443                 return new DelegatedConnection() {
3444 
3445                     @Override
3446                     public Connection con() {
3447                         return c;
3448                     }
3449 
3450                     @Override
3451                     public void commit() throws SQLException {
3452                         System.out.println("COMMITTING");
3453                         if (this.getAutoCommit()) {
3454                             throw new SQLException("cannot commit when autoCommit is true");
3455                         } else {
3456                             con().commit();
3457                         }
3458                     }
3459 
3460                 };
3461             }
3462 
3463             @Override
3464             public void close() {
3465                 // do nothing
3466             }
3467         });
3468         db.update("insert into note(text) values(?)") //
3469                 .parameters("HI", "THERE") //
3470                 .returnGeneratedKeys() //
3471                 .getAs(Integer.class)//
3472                 .test() //
3473                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3474                 .assertValues(1, 2) //
3475                 .assertComplete();
3476     }
3477     
3478     @Test
3479     public void testMapInTransactionIssue35() {
3480         Database.test() //
3481                 .select(Person10.class) //
3482                 .transacted() //
3483                 .valuesOnly() //
3484                 .get() //
3485                 .map(p -> p.name()) //
3486                 .blockingForEach(System.out::println);
3487     }
3488 
3489     private static final class Plugins {
3490 
3491         private static final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
3492 
3493         static void reset() {
3494             list.clear();
3495             RxJavaPlugins.setErrorHandler(e -> list.add(e));
3496         }
3497 
3498         static Throwable getSingleError() {
3499             assertEquals(1, list.size());
3500             RxJavaPlugins.reset();
3501             return list.get(0);
3502         }
3503     }
3504 
3505     @Test
3506     public void testIssue27ConnectionErrorReportedToRxJavaPlugins() {
3507         try (Database db = Database.nonBlocking()
3508                 .url("jdbc:driverdoesnotexist://doesnotexist:1527/notThere").build()) {
3509             Plugins.reset();
3510             db.select("select count(*) from person") //
3511                     .getAs(Long.class) //
3512                     .test() //
3513                     .awaitDone(TIMEOUT_SECONDS / 5, TimeUnit.SECONDS) //
3514                     .assertNoValues() //
3515                     .assertNotTerminated() //
3516                     .cancel();
3517             Throwable e = Plugins.getSingleError();
3518             assertTrue(e instanceof UndeliverableException);
3519             assertTrue(e.getMessage().toLowerCase().contains("no suitable driver"));
3520         }
3521     }
3522     
3523     @Test
3524     public void testAutoMapInTransactionIssue35() {
3525         try (Database db = Database.test()) {
3526             db.select(Person10.class) //
3527                     .transacted() //
3528                     .valuesOnly() //
3529                     .get(x -> x.name()) //
3530                     .sorted() //
3531                     .test() //
3532                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3533                     .assertValues("FRED", "JOSEPH", "MARMADUKE") //
3534                     .assertComplete();
3535         }
3536     }
3537 
3538     public interface PersonWithDefaultMethod {
3539         @Column
3540         String name();
3541 
3542         @Column
3543         int score();
3544 
3545         public default String nameLower() {
3546             return name().toLowerCase();
3547         }
3548     }
3549 
3550     interface PersonWithDefaultMethodNonPublic {
3551         @Column
3552         String name();
3553 
3554         @Column
3555         int score();
3556 
3557         default String nameLower() {
3558             return name().toLowerCase();
3559         }
3560     }
3561 
3562     interface PersonDistinct1 {
3563         @Column
3564         String name();
3565 
3566         @Column
3567         int score();
3568 
3569     }
3570 
3571     interface PersonDistinct2 {
3572         @Column
3573         String name();
3574 
3575         @Column
3576         int score();
3577 
3578     }
3579 
3580     interface Score {
3581         @Column
3582         int score();
3583     }
3584 
3585     interface Person {
3586         @Column
3587         String name();
3588     }
3589 
3590     interface Person2 {
3591         @Column
3592         String name();
3593 
3594         @Column
3595         int score();
3596     }
3597 
3598     interface Person3 {
3599         @Column("name")
3600         String fullName();
3601 
3602         @Column("score")
3603         int examScore();
3604     }
3605 
3606     interface Person4 {
3607         @Column("namez")
3608         String fullName();
3609 
3610         @Column("score")
3611         int examScore();
3612     }
3613 
3614     interface Person5 {
3615         @Index(1)
3616         String fullName();
3617 
3618         @Index(2)
3619         int examScore();
3620     }
3621 
3622     interface Person6 {
3623         @Index(1)
3624         String fullName();
3625 
3626         @Index(3)
3627         int examScore();
3628     }
3629 
3630     interface Person7 {
3631         @Index(1)
3632         String fullName();
3633 
3634         @Index(0)
3635         int examScore();
3636     }
3637 
3638     interface Person8 {
3639         @Column
3640         int name();
3641     }
3642 
3643     interface Person9 {
3644         @Column
3645         String name();
3646 
3647         @Index(2)
3648         int score();
3649     }
3650 
3651     interface PersonNoAnnotation {
3652         String name();
3653     }
3654 
3655     @Query("select name, score from person order by name")
3656     interface Person10 {
3657 
3658         @Column
3659         String name();
3660 
3661         @Column
3662         int score();
3663     }
3664 
3665 }