View Javadoc
1   package org.davidmoten.rxjava3.jdbc;
2   
3   import static org.junit.Assert.assertEquals;
4   import static org.junit.Assert.assertFalse;
5   import static org.junit.Assert.assertNotEquals;
6   import static org.junit.Assert.assertTrue;
7   
8   import java.io.ByteArrayInputStream;
9   import java.io.ByteArrayOutputStream;
10  import java.io.File;
11  import java.io.FileNotFoundException;
12  import java.io.FileReader;
13  import java.io.IOException;
14  import java.io.InputStream;
15  import java.io.Reader;
16  import java.sql.Blob;
17  import java.sql.CallableStatement;
18  import java.sql.Clob;
19  import java.sql.Connection;
20  import java.sql.PreparedStatement;
21  import java.sql.ResultSet;
22  import java.sql.SQLException;
23  import java.sql.SQLSyntaxErrorException;
24  import java.sql.Statement;
25  import java.sql.Timestamp;
26  import java.sql.Types;
27  import java.time.Instant;
28  import java.time.ZoneOffset;
29  import java.time.ZonedDateTime;
30  import java.util.Arrays;
31  import java.util.Calendar;
32  import java.util.Date;
33  import java.util.GregorianCalendar;
34  import java.util.HashSet;
35  import java.util.List;
36  import java.util.Optional;
37  import java.util.Set;
38  import java.util.concurrent.CopyOnWriteArrayList;
39  import java.util.concurrent.CountDownLatch;
40  import java.util.concurrent.Executors;
41  import java.util.concurrent.TimeUnit;
42  import java.util.concurrent.TimeoutException;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  import java.util.concurrent.atomic.AtomicInteger;
45  
46  import org.davidmoten.rxjava3.jdbc.annotations.Column;
47  import org.davidmoten.rxjava3.jdbc.annotations.Index;
48  import org.davidmoten.rxjava3.jdbc.annotations.Query;
49  import org.davidmoten.rxjava3.jdbc.exceptions.AnnotationsNotFoundException;
50  import org.davidmoten.rxjava3.jdbc.exceptions.AutomappedInterfaceInaccessibleException;
51  import org.davidmoten.rxjava3.jdbc.exceptions.CannotForkTransactedConnection;
52  import org.davidmoten.rxjava3.jdbc.exceptions.ColumnIndexOutOfRangeException;
53  import org.davidmoten.rxjava3.jdbc.exceptions.ColumnNotFoundException;
54  import org.davidmoten.rxjava3.jdbc.exceptions.MoreColumnsRequestedThanExistException;
55  import org.davidmoten.rxjava3.jdbc.exceptions.NamedParameterFoundButSqlDoesNotHaveNamesException;
56  import org.davidmoten.rxjava3.jdbc.exceptions.NamedParameterMissingException;
57  import org.davidmoten.rxjava3.jdbc.exceptions.QueryAnnotationMissingException;
58  import org.davidmoten.rxjava3.jdbc.exceptions.SQLRuntimeException;
59  import org.davidmoten.rxjava3.jdbc.internal.DelegatedConnection;
60  import org.davidmoten.rxjava3.jdbc.pool.DatabaseCreator;
61  import org.davidmoten.rxjava3.jdbc.pool.DatabaseType;
62  import org.davidmoten.rxjava3.jdbc.pool.NonBlockingConnectionPool;
63  import org.davidmoten.rxjava3.jdbc.pool.Pools;
64  import org.davidmoten.rxjava3.jdbc.tuple.Tuple2;
65  import org.davidmoten.rxjava3.jdbc.tuple.Tuple3;
66  import org.davidmoten.rxjava3.jdbc.tuple.Tuple4;
67  import org.davidmoten.rxjava3.jdbc.tuple.Tuple5;
68  import org.davidmoten.rxjava3.jdbc.tuple.Tuple6;
69  import org.davidmoten.rxjava3.jdbc.tuple.Tuple7;
70  import org.davidmoten.rxjava3.jdbc.tuple.TupleN;
71  import org.davidmoten.rxjava3.pool.PoolClosedException;
72  import org.h2.jdbc.JdbcSQLSyntaxErrorException;
73  import org.hsqldb.jdbc.JDBCBlobFile;
74  import org.hsqldb.jdbc.JDBCClobFile;
75  import org.junit.Assert;
76  import org.junit.FixMethodOrder;
77  import org.junit.Ignore;
78  import org.junit.Test;
79  import org.junit.runners.MethodSorters;
80  import org.slf4j.Logger;
81  import org.slf4j.LoggerFactory;
82  
83  import com.github.davidmoten.guavamini.Lists;
84  import com.github.davidmoten.guavamini.Sets;
85  
86  import io.reactivex.rxjava3.core.Completable;
87  import io.reactivex.rxjava3.core.Flowable;
88  import io.reactivex.rxjava3.core.Observable;
89  import io.reactivex.rxjava3.core.Scheduler;
90  import io.reactivex.rxjava3.core.Single;
91  import io.reactivex.rxjava3.exceptions.UndeliverableException;
92  import io.reactivex.rxjava3.functions.Predicate;
93  import io.reactivex.rxjava3.plugins.RxJavaPlugins;
94  import io.reactivex.rxjava3.schedulers.Schedulers;
95  import io.reactivex.rxjava3.schedulers.TestScheduler;
96  import io.reactivex.rxjava3.subscribers.TestSubscriber;
97  
98  @FixMethodOrder(MethodSorters.NAME_ASCENDING)
99  public class DatabaseTest {
100 
101     private static final long FRED_REGISTERED_TIME = 1442515672690L;
102     private static final int NAMES_COUNT_BIG = 5163;
103     private static final Logger log = LoggerFactory.getLogger(DatabaseTest.class);
104     private static final int TIMEOUT_SECONDS = 10;
105 
106     private static Database db() {
107         return DatabaseCreator.create(1);
108     }
109 
110     private static Database blocking() {
111         return DatabaseCreator.createBlocking();
112     }
113 
114     private static Database db(int poolSize) {
115         return DatabaseCreator.create(poolSize);
116     }
117 
118     private static Database big(int poolSize) {
119         return DatabaseCreator.create(poolSize, true, Schedulers.computation());
120     }
121 
122     @Test
123     public void testSelectUsingQuestionMark() {
124         try (Database db = db()) {
125             db.select("select score from person where name=?") //
126                     .parameters("FRED", "JOSEPH") //
127                     .getAs(Integer.class) //
128                     .test() //
129                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
130                     .assertNoErrors() //
131                     .assertValues(21, 34) //
132                     .assertComplete();
133         }
134     }
135 
136     @Test
137     public void testDemonstrateDbClosureOnTerminate() {
138         Database db = db();
139         db.select("select score from person where name=?") //
140                 .parameters("FRED", "JOSEPH") //
141                 .getAs(Integer.class) //
142                 .doOnTerminate(() -> db.close()) //
143                 .test() //
144                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
145                 .assertNoErrors() //
146                 .assertValues(21, 34) //
147                 .assertComplete();
148     }
149 
150     @Test
151     public void testFromDataSource() {
152         ConnectionProvider cp = DatabaseCreator.connectionProvider();
153         AtomicInteger count = new AtomicInteger();
154         Set<Connection> connections = new HashSet<>();
155         AtomicInteger closed = new AtomicInteger();
156         Database db = Database.fromBlocking(new ConnectionProvider() {
157 
158             @Override
159             public Connection get() {
160                 count.incrementAndGet();
161                 Connection c = cp.get();
162                 connections.add(c);
163                 return new DelegatedConnection() {
164 
165                     @Override
166                     public Connection con() {
167                         return c;
168                     }
169 
170                     @Override
171                     public void close() {
172                         closed.incrementAndGet();
173                     }
174                 };
175             }
176 
177             @Override
178             public void close() {
179                 // do nothing
180             }
181         });
182         db.select("select count(*) from person") //
183                 .getAs(Integer.class) //
184                 .blockingSubscribe();
185         db.select("select count(*) from person") //
186                 .getAs(Integer.class) //
187                 .blockingSubscribe();
188         assertEquals(2, count.get());
189         assertEquals(2, connections.size());
190         assertEquals(2, closed.get());
191     }
192 
193     @Test
194     public void testBlockingForEachWhenError() {
195         try {
196             Flowable //
197                     .error(new RuntimeException("boo")) //
198                     .blockingForEach(x -> log.debug(x.toString()));
199         } catch (RuntimeException e) {
200             assertEquals("boo", e.getMessage());
201         }
202     }
203 
204     @Test
205     public void testSelectUsingQuestionMarkAndInClauseIssue10() {
206         Database.test() //
207                 .select("select score from person where name in (?) order by score") //
208                 .parameters("FRED", "JOSEPH") //
209                 .getAs(Integer.class) //
210                 .test() //
211                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
212                 .assertNoErrors() //
213                 .assertValues(21, 34) //
214                 .assertComplete();
215     }
216 
217     @Test
218     public void testSelectUsingQuestionMarkAndInClauseWithSetParameter() {
219         Database.test() //
220                 .select("select score from person where name in (?) order by score") //
221                 .parameter(Sets.newHashSet("FRED", "JOSEPH")) //
222                 .getAs(Integer.class) //
223                 .test() //
224                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
225                 .assertNoErrors() //
226                 .assertValues(21, 34) //
227                 .assertComplete();
228     }
229 
230     @Test
231     public void testUpdateWithInClauseBatchSize0() {
232         Database.test() //
233                 .update("update person set score=50 where name in (?)") //
234                 .batchSize(0) //
235                 .parameter(Sets.newHashSet("FRED", "JOSEPH")) //
236                 .counts() //
237                 .test() //
238                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.DAYS) //
239                 .assertComplete() //
240                 .assertValues(2);
241     }
242 
243     @Test
244     public void testUpdateWithInClauseBatchSize10() {
245         Database.test() //
246                 .update("update person set score=50 where name in (?)") //
247                 .batchSize(10) //
248                 .parameter(Sets.newHashSet("FRED", "JOSEPH")) //
249                 .counts() //
250                 .test() //
251                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.DAYS) //
252                 .assertComplete() //
253                 .assertValues(2);
254     }
255 
256     @Test
257     public void testSelectUsingQuestionMarkAndInClauseWithSetParameterUsingParametersMethod() {
258         Database.test() //
259                 .select("select score from person where name in (?) order by score") //
260                 .parameters(Sets.newHashSet("FRED", "JOSEPH")) //
261                 .getAs(Integer.class) //
262                 .test() //
263                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
264                 .assertNoErrors() //
265                 .assertValues(21, 34) //
266                 .assertComplete();
267     }
268 
269     @Test
270     public void testSelectUsingInClauseWithListParameter() {
271         Database.test() //
272                 .select("select score from person where score > ? and name in (?) order by score") //
273                 .parameters(0, Lists.newArrayList("FRED", "JOSEPH")) //
274                 .getAs(Integer.class) //
275                 .test() //
276                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
277                 .assertNoErrors() //
278                 .assertValues(21, 34) //
279                 .assertComplete();
280     }
281 
282     @Test
283     public void testSelectUsingInClauseWithNamedListParameter() {
284         Database.test() //
285                 .select("select score from person where score > :score and name in (:names) order by score") //
286                 .parameter("score", 0) //
287                 .parameter("names", Lists.newArrayList("FRED", "JOSEPH")) //
288                 .getAs(Integer.class) //
289                 .test() //
290                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
291                 .assertNoErrors() //
292                 .assertValues(21, 34) //
293                 .assertComplete();
294     }
295 
296     @Test
297     public void testSelectUsingInClauseWithRepeatedNamedListParameter() {
298         Database.test() //
299                 .select("select score from person where name in (:names) and name in (:names) order by score") //
300                 .parameter("names", Lists.newArrayList("FRED", "JOSEPH")) //
301                 .getAs(Integer.class) //
302                 .test() //
303                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
304                 .assertNoErrors() //
305                 .assertValues(21, 34) //
306                 .assertComplete();
307     }
308 
309     @Test
310     public void testSelectUsingInClauseWithRepeatedNamedListParameterAndRepeatedNonListParameter() {
311         Database.test() //
312                 .select("select score from person where name in (:names) and score >:score and name in (:names) and score >:score order by score") //
313                 .parameter("names", Lists.newArrayList("FRED", "JOSEPH")) //
314                 .parameter("score", 0) //
315                 .getAs(Integer.class) //
316                 .test() //
317                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
318                 .assertNoErrors() //
319                 .assertValues(21, 34) //
320                 .assertComplete();
321     }
322 
323     @Test
324     public void testSelectUsingNamedParameterList() {
325         try (Database db = db()) {
326             db.select("select score from person where name=:name") //
327                     .parameters(Parameter.named("name", "FRED").value("JOSEPH").list()) //
328                     .getAs(Integer.class) //
329                     .test() //
330                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
331                     .assertNoErrors() //
332                     .assertValues(21, 34) //
333                     .assertComplete();
334         }
335     }
336 
337     @Test
338     public void testSelectUsingQuestionMarkFlowableParameters() {
339         try (Database db = db()) {
340             db.select("select score from person where name=?") //
341                     .parameterStream(Flowable.just("FRED", "JOSEPH")) //
342                     .getAs(Integer.class) //
343                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
344                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
345                     .assertNoErrors() //
346                     .assertValues(21, 34) //
347                     .assertComplete();
348         }
349     }
350 
351     @Test
352     public void testSelectUsingQuestionMarkFlowableParametersInLists() {
353         try (Database db = db()) {
354             db.select("select score from person where name=?") //
355                     .parameterListStream(
356                             Flowable.just(Arrays.asList("FRED"), Arrays.asList("JOSEPH"))) //
357                     .getAs(Integer.class) //
358                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
359                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
360                     .assertNoErrors() //
361                     .assertValues(21, 34) //
362                     .assertComplete();
363         }
364     }
365 
366     @Test
367     public void testDrivingSelectWithoutParametersUsingParameterStream() {
368         try (Database db = db()) {
369             db.select("select count(*) from person") //
370                     .parameters(1, 2, 3) //
371                     .getAs(Integer.class) //
372                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
373                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
374                     .assertValues(3, 3, 3) //
375                     .assertComplete();
376         }
377     }
378 
379     @Test
380     public void testSelectUsingQuestionMarkFlowableParametersTwoParametersPerQuery() {
381         try (Database db = db()) {
382             db.select("select score from person where name=? and score = ?") //
383                     .parameterStream(Flowable.just("FRED", 21, "JOSEPH", 34)) //
384                     .getAs(Integer.class) //
385                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
386                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
387                     .assertNoErrors() //
388                     .assertValues(21, 34) //
389                     .assertComplete();
390         }
391     }
392 
393     @Test
394     public void testSelectUsingQuestionMarkFlowableParameterListsTwoParametersPerQuery() {
395         try (Database db = db()) {
396             db.select("select score from person where name=? and score = ?") //
397                     .parameterListStream(
398                             Flowable.just(Arrays.asList("FRED", 21), Arrays.asList("JOSEPH", 34))) //
399                     .getAs(Integer.class) //
400                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
401                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
402                     .assertNoErrors() //
403                     .assertValues(21, 34) //
404                     .assertComplete();
405         }
406     }
407 
408     @Test
409     public void testSelectUsingQuestionMarkWithPublicTestingDatabase() {
410         try (Database db = Database.test()) {
411             db //
412                     .select("select score from person where name=?") //
413                     .parameters("FRED", "JOSEPH") //
414                     .getAs(Integer.class) //
415                     .test() //
416                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
417                     .assertNoErrors() //
418                     .assertValues(21, 34) //
419                     .assertComplete();
420         }
421     }
422 
423     @Test
424     public void testSelectWithFetchSize() {
425         try (Database db = db()) {
426             db.select("select score from person order by name") //
427                     .fetchSize(2) //
428                     .getAs(Integer.class) //
429                     .test() //
430                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
431                     .assertNoErrors() //
432                     .assertValues(21, 34, 25) //
433                     .assertComplete();
434         }
435     }
436 
437     @Test
438     public void testSelectWithFetchSizeZero() {
439         try (Database db = db()) {
440             db.select("select score from person order by name") //
441                     .fetchSize(0) //
442                     .getAs(Integer.class) //
443                     .test() //
444                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
445                     .assertNoErrors() //
446                     .assertValues(21, 34, 25) //
447                     .assertComplete();
448         }
449     }
450 
451     @Test(expected = IllegalArgumentException.class)
452     public void testSelectWithFetchSizeNegative() {
453         try (Database db = db()) {
454             db.select("select score from person order by name") //
455                     .fetchSize(-1);
456         }
457     }
458 
459     @Test
460     public void testSelectUsingNonBlockingBuilder() {
461         NonBlockingConnectionPool pool = Pools //
462                 .nonBlocking() //
463                 .connectionProvider(DatabaseCreator.connectionProvider()) //
464                 .maxIdleTime(1, TimeUnit.MINUTES) //
465                 .idleTimeBeforeHealthCheck(1, TimeUnit.MINUTES) //
466                 .connectionRetryInterval(1, TimeUnit.SECONDS) //
467                 .healthCheck(c -> c.prepareStatement("select 1").execute()) //
468                 .maxPoolSize(3) //
469                 .build();
470 
471         try (Database db = Database.from(pool)) {
472             db.select("select score from person where name=?") //
473                     .parameters("FRED", "JOSEPH") //
474                     .getAs(Integer.class) //
475                     .test() //
476                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
477                     .assertNoErrors() //
478                     .assertValues(21, 34) //
479                     .assertComplete();
480         }
481     }
482 
483     @Test
484     public void testSelectSpecifyingHealthCheck() {
485         try (Database db = Database//
486                 .nonBlocking() //
487                 .connectionProvider(DatabaseCreator.connectionProvider()) //
488                 .maxIdleTime(1, TimeUnit.MINUTES) //
489                 .idleTimeBeforeHealthCheck(1, TimeUnit.MINUTES) //
490                 .connectionRetryInterval(1, TimeUnit.SECONDS) //
491                 .healthCheck(DatabaseType.H2) //
492                 .maxPoolSize(3) //
493                 .build()) {
494             db.select("select score from person where name=?") //
495                     .parameters("FRED", "JOSEPH") //
496                     .getAs(Integer.class) //
497                     .test() //
498                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
499                     .assertNoErrors() //
500                     .assertValues(21, 34) //
501                     .assertComplete();
502         }
503     }
504 
505     @Test(expected = IllegalArgumentException.class)
506     public void testCreateTestDatabaseWithZeroSizePoolThrows() {
507         Database.test(0);
508     }
509 
510     @Test
511     public void testSelectSpecifyingHealthCheckAsSql() {
512         final AtomicInteger success = new AtomicInteger();
513         NonBlockingConnectionPool pool = Pools //
514                 .nonBlocking() //
515                 .connectionProvider(DatabaseCreator.connectionProvider()) //
516                 .maxIdleTime(1, TimeUnit.MINUTES) //
517                 .healthCheck(DatabaseType.H2) //
518                 .idleTimeBeforeHealthCheck(1, TimeUnit.MINUTES) //
519                 .connectionRetryInterval(1, TimeUnit.SECONDS) //
520                 .healthCheck("select 1") //
521                 .connectionListener(error -> {
522                     if (error.isPresent()) {
523                         success.set(Integer.MIN_VALUE);
524                     } else {
525                         success.incrementAndGet();
526                     }
527                 }) //
528                 .maxPoolSize(3) //
529                 .build();
530 
531         try (Database db = Database.from(pool)) {
532             db.select("select score from person where name=?") //
533                     .parameters("FRED", "JOSEPH") //
534                     .getAs(Integer.class) //
535                     .test() //
536                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
537                     .assertNoErrors() //
538                     .assertValues(21, 34) //
539                     .assertComplete();
540         }
541         assertEquals(1, success.get());
542     }
543 
544     @Test(expected = IllegalArgumentException.class)
545     public void testNonBlockingPoolWithTrampolineSchedulerThrows() {
546         Pools.nonBlocking().scheduler(Schedulers.trampoline());
547     }
548 
549     @Test(timeout = 40000)
550     public void testSelectUsingNonBlockingBuilderConcurrencyTest() throws InterruptedException, TimeoutException {
551         try (Database db = db(3)) {
552             Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(50));
553             int n = 10000;
554             CountDownLatch latch = new CountDownLatch(n);
555             AtomicInteger count = new AtomicInteger();
556             for (int i = 0; i < n; i++) {
557                 db.select("select score from person where name=?") //
558                         .parameters("FRED", "JOSEPH") //
559                         .getAs(Integer.class) //
560                         .subscribeOn(scheduler) //
561                         .toList() //
562                         .doOnSuccess(x -> {
563                             if (!x.equals(Lists.newArrayList(21, 34))) {
564                                 throw new RuntimeException("run broken");
565                             } else {
566                                 // log.debug(iCopy + " succeeded");
567                             }
568                         }) //
569                         .doOnSuccess(x -> {
570                             count.incrementAndGet();
571                             latch.countDown();
572                         }) //
573                         .doOnError(x -> latch.countDown()) //
574                         .subscribe();
575             }
576             if (!latch.await(20, TimeUnit.SECONDS)) {
577                 throw new TimeoutException("timeout");
578             }
579             assertEquals(n, count.get());
580         }
581     }
582 
583     @Test(timeout = 5000)
584     public void testSelectConcurrencyTest() throws InterruptedException, TimeoutException {
585         try (Database db = db(1)) {
586             Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(2));
587             int n = 2;
588             CountDownLatch latch = new CountDownLatch(n);
589             AtomicInteger count = new AtomicInteger();
590             for (int i = 0; i < n; i++) {
591                 db.select("select score from person where name=?") //
592                         .parameters("FRED", "JOSEPH") //
593                         .getAs(Integer.class) //
594                         .subscribeOn(scheduler) //
595                         .toList() //
596                         .doOnSuccess(x -> {
597                             if (!x.equals(Lists.newArrayList(21, 34))) {
598                                 throw new RuntimeException("run broken");
599                             }
600                         }) //
601                         .doOnSuccess(x -> {
602                             count.incrementAndGet();
603                             latch.countDown();
604                         }) //
605                         .doOnError(x -> latch.countDown()) //
606                         .subscribe();
607                 log.debug("submitted " + i);
608             }
609             if (!latch.await(5000, TimeUnit.SECONDS)) {
610                 throw new TimeoutException("timeout");
611             }
612             assertEquals(n, count.get());
613         }
614     }
615 
616     @Test
617     public void testDatabaseClose() {
618         try (Database db = db()) {
619             db.select("select score from person where name=?") //
620                     .parameters("FRED", "JOSEPH") //
621                     .getAs(Integer.class) //
622                     .test() //
623                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
624                     .assertNoErrors() //
625                     .assertValues(21, 34) //
626                     .assertComplete();
627         }
628     }
629 
630     @Test
631     public void testSelectUsingName() {
632         try (Database db = db()) {
633             db //
634                     .select("select score from person where name=:name") //
635                     .parameter("name", "FRED") //
636                     .parameter("name", "JOSEPH") //
637                     .getAs(Integer.class) //
638                     .test() //
639                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
640                     .assertValues(21, 34) //
641                     .assertComplete();
642         }
643     }
644 
645     @Test
646     public void testSelectUsingNameNotGiven() {
647         try (Database db = db()) {
648             db //
649                     .select("select score from person where name=:name and name<>:name2") //
650                     .parameter("name", "FRED") //
651                     .parameter("name", "JOSEPH") //
652                     .getAs(Integer.class) //
653                     .test() //
654                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
655                     .assertError(NamedParameterMissingException.class) //
656                     .assertNoValues();
657         }
658     }
659 
660     @Test
661     public void testSelectUsingParameterNameNullNameWhenSqlHasNoNames() {
662         db() //
663                 .select("select score from person where name=?") //
664                 .parameter(Parameter.create("name", "FRED")) //
665                 .getAs(Integer.class) //
666                 .test() //
667                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
668                 .assertError(NamedParameterFoundButSqlDoesNotHaveNamesException.class) //
669                 .assertNoValues();
670     }
671 
672     @Test
673     public void testUpdateWithNullNamedParameter() {
674         try (Database db = db()) {
675             db //
676                     .update("update person set date_of_birth = :dob") //
677                     .parameter(Parameter.create("dob", null)) //
678                     .counts() //
679                     .test() //
680                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
681                     .assertValue(3) //
682                     .assertComplete();
683         }
684     }
685 
686     @Test
687     public void testUpdateWithNullParameter() {
688         try (Database db = db()) {
689             db //
690                     .update("update person set date_of_birth = ?") //
691                     .parameter(null) //
692                     .counts() //
693                     .test() //
694                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
695                     .assertValue(3) //
696                     .assertComplete();
697         }
698     }
699 
700     @Test
701     public void testUpdateWithNullStreamParameter() {
702         try (Database db = db()) {
703             db //
704                     .update("update person set date_of_birth = ?") //
705                     .parameterStream(Flowable.just(Parameter.NULL)) //
706                     .counts() //
707                     .test() //
708                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
709                     .assertValue(3) //
710                     .assertComplete();
711         }
712     }
713 
714     @Test
715     public void testUpdateWithTestDatabaseForReadme() {
716         try (Database db = db()) {
717             db //
718                     .update("update person set date_of_birth = ?") //
719                     .parameterStream(Flowable.just(Parameter.NULL)) //
720                     .counts() //
721                     .test() //
722                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
723                     .assertValue(3) //
724                     .assertComplete();
725         }
726     }
727 
728     @Test
729     public void testUpdateClobWithNull() {
730         try (Database db = db()) {
731             insertNullClob(db);
732             db //
733                     .update("update person_clob set document = :doc") //
734                     .parameter("doc", Database.NULL_CLOB) //
735                     .counts() //
736                     .test() //
737                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
738                     .assertValue(1) //
739                     .assertComplete();
740         }
741     }
742 
743     @Test
744     public void testUpdateClobWithClob() throws SQLException {
745         try (Database db = db()) {
746             Clob clob = new JDBCClobFile(new File("src/test/resources/big.txt"));
747             insertNullClob(db);
748             db //
749                     .update("update person_clob set document = :doc") //
750                     .parameter("doc", clob) //
751                     .counts() //
752                     .test() //
753                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
754                     .assertValue(1) //
755                     .assertComplete();
756         }
757     }
758 
759     @Test
760     public void testUpdateClobWithReader() throws FileNotFoundException {
761         try (Database db = db()) {
762             Reader reader = new FileReader(new File("src/test/resources/big.txt"));
763             insertNullClob(db);
764             db //
765                     .update("update person_clob set document = :doc") //
766                     .parameter("doc", reader) //
767                     .counts() //
768                     .test() //
769                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
770                     .assertValue(1) //
771                     .assertComplete();
772         }
773     }
774 
775     @Test
776     public void testUpdateBlobWithBlob() throws SQLException {
777         try (Database db = db()) {
778             Blob blob = new JDBCBlobFile(new File("src/test/resources/big.txt"));
779             insertPersonBlob(db);
780             db //
781                     .update("update person_blob set document = :doc") //
782                     .parameter("doc", blob) //
783                     .counts() //
784                     .test() //
785                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
786                     .assertValue(1) //
787                     .assertComplete();
788         }
789     }
790 
791     @Test
792     public void testUpdateBlobWithNull() {
793         try (Database db = db()) {
794             insertPersonBlob(db);
795             db //
796                     .update("update person_blob set document = :doc") //
797                     .parameter("doc", Database.NULL_BLOB) //
798                     .counts() //
799                     .test() //
800                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
801                     .assertValue(1) //
802                     .assertComplete();
803         }
804     }
805 
806     @Test(expected = NullPointerException.class)
807     public void testSelectUsingNullNameInParameter() {
808         try (Database db = db()) {
809             db //
810                     .select("select score from person where name=:name") //
811                     .parameter(null, "FRED"); //
812         }
813     }
814 
815     @Test(expected = IllegalArgumentException.class)
816     public void testSelectUsingNameDoesNotExist() {
817         try (Database db = db()) {
818             db //
819                     .select("select score from person where name=:name") //
820                     .parameters("nam", "FRED");
821         }
822     }
823 
824     @Test(expected = IllegalArgumentException.class)
825     public void testSelectUsingNameWithoutSpecifyingNameThrowsImmediately() {
826         try (Database db = db()) {
827             db //
828                     .select("select score from person where name=:name") //
829                     .parameters("FRED", "JOSEPH");
830         }
831     }
832 
833     @Test
834     public void testSelectTransacted() {
835         try (Database db = db()) {
836             db //
837                     .select("select score from person where name=?") //
838                     .parameters("FRED", "JOSEPH") //
839                     .transacted() //
840                     .getAs(Integer.class) //
841                     .doOnNext(tx -> log
842                             .debug(tx.isComplete() ? "complete" : String.valueOf(tx.value()))) //
843                     .test() //
844                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
845                     .assertValueCount(3) //
846                     .assertComplete();
847         }
848     }
849 
850     @Test
851     public void testSelectAutomappedAnnotatedTransacted() {
852         try (Database db = db()) {
853             db //
854                     .select(Person10.class) //
855                     .transacted() //
856                     .valuesOnly() //
857                     .get().test() //
858                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
859                     .assertValueCount(3) //
860                     .assertComplete();
861         }
862     }
863 
864     @Test
865     public void testSelectAutomappedTransactedValuesOnly() {
866         try (Database db = db()) {
867             db //
868                     .select("select name, score from person") //
869                     .transacted() //
870                     .valuesOnly() //
871                     .autoMap(Person2.class) //
872                     .test() //
873                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
874                     .assertValueCount(3) //
875                     .assertComplete();
876         }
877     }
878 
879     @Test
880     public void testSelectAutomappedTransacted() {
881         try (Database db = db()) {
882             db //
883                     .select("select name, score from person") //
884                     .transacted() //
885                     .autoMap(Person2.class) //
886                     .test() //
887                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
888                     .assertValueCount(4) //
889                     .assertComplete();
890         }
891     }
892 
893     @Test
894     public void testSelectTransactedTuple2() {
895         try (Database db = db()) {
896             Tx<Tuple2<String, Integer>> t = db //
897                     .select("select name, score from person where name=?") //
898                     .parameters("FRED") //
899                     .transacted() //
900                     .getAs(String.class, Integer.class) //
901                     .test() //
902                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
903                     .assertValueCount(2) //
904                     .assertComplete() //
905                     .values().get(0);
906             assertEquals("FRED", t.value()._1());
907             assertEquals(21, (int) t.value()._2());
908         }
909     }
910 
911     @Test
912     public void testSelectTransactedTuple3() {
913         try (Database db = db()) {
914             Tx<Tuple3<String, Integer, String>> t = db //
915                     .select("select name, score, name from person where name=?") //
916                     .parameters("FRED") //
917                     .transacted() //
918                     .getAs(String.class, Integer.class, String.class) //
919                     .test() //
920                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
921                     .assertValueCount(2) //
922                     .assertComplete() //
923                     .values().get(0);
924             assertEquals("FRED", t.value()._1());
925             assertEquals(21, (int) t.value()._2());
926             assertEquals("FRED", t.value()._3());
927         }
928     }
929 
930     @Test
931     public void testSelectTransactedTuple4() {
932         try (Database db = db()) {
933             Tx<Tuple4<String, Integer, String, Integer>> t = db //
934                     .select("select name, score, name, score from person where name=?") //
935                     .parameters("FRED") //
936                     .transacted() //
937                     .getAs(String.class, Integer.class, String.class, Integer.class) //
938                     .test() //
939                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
940                     .assertValueCount(2) //
941                     .assertComplete() //
942                     .values().get(0);
943             assertEquals("FRED", t.value()._1());
944             assertEquals(21, (int) t.value()._2());
945             assertEquals("FRED", t.value()._3());
946             assertEquals(21, (int) t.value()._4());
947         }
948     }
949 
950     @Test
951     public void testSelectTransactedTuple5() {
952         try (Database db = db()) {
953             Tx<Tuple5<String, Integer, String, Integer, String>> t = db //
954                     .select("select name, score, name, score, name from person where name=?") //
955                     .parameters("FRED") //
956                     .transacted() //
957                     .getAs(String.class, Integer.class, String.class, Integer.class, String.class) //
958                     .test() //
959                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
960                     .assertValueCount(2) //
961                     .assertComplete() //
962                     .values().get(0);
963             assertEquals("FRED", t.value()._1());
964             assertEquals(21, (int) t.value()._2());
965             assertEquals("FRED", t.value()._3());
966             assertEquals(21, (int) t.value()._4());
967             assertEquals("FRED", t.value()._5());
968         }
969     }
970 
971     @Test
972     public void testSelectTransactedTuple6() {
973         try (Database db = db()) {
974             Tx<Tuple6<String, Integer, String, Integer, String, Integer>> t = db //
975                     .select("select name, score, name, score, name, score from person where name=?") //
976                     .parameters("FRED") //
977                     .transacted() //
978                     .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
979                             Integer.class) //
980                     .test() //
981                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
982                     .assertValueCount(2) //
983                     .assertComplete() //
984                     .values().get(0);
985             assertEquals("FRED", t.value()._1());
986             assertEquals(21, (int) t.value()._2());
987             assertEquals("FRED", t.value()._3());
988             assertEquals(21, (int) t.value()._4());
989             assertEquals("FRED", t.value()._5());
990             assertEquals(21, (int) t.value()._6());
991         }
992     }
993 
994     @Test
995     public void testSelectTransactedTuple7() {
996         try (Database db = db()) {
997             Tx<Tuple7<String, Integer, String, Integer, String, Integer, String>> t = db //
998                     .select("select name, score, name, score, name, score, name from person where name=?") //
999                     .parameters("FRED") //
1000                     .transacted() //
1001                     .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
1002                             Integer.class, String.class) //
1003                     .test() //
1004                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1005                     .assertValueCount(2) //
1006                     .assertComplete() //
1007                     .values().get(0);
1008             assertEquals("FRED", t.value()._1());
1009             assertEquals(21, (int) t.value()._2());
1010             assertEquals("FRED", t.value()._3());
1011             assertEquals(21, (int) t.value()._4());
1012             assertEquals("FRED", t.value()._5());
1013             assertEquals(21, (int) t.value()._6());
1014             assertEquals("FRED", t.value()._7());
1015         }
1016     }
1017 
1018     @Test
1019     public void testSelectTransactedTupleN() {
1020         try (Database db = db()) {
1021             List<Tx<TupleN<Object>>> list = db //
1022                     .select("select name, score from person where name=?") //
1023                     .parameters("FRED") //
1024                     .transacted() //
1025                     .getTupleN() //
1026                     .test() //
1027                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1028                     .assertValueCount(2) //
1029                     .assertComplete() //
1030                     .values();
1031             assertEquals("FRED", list.get(0).value().values().get(0));
1032             assertEquals(21, (int) list.get(0).value().values().get(1));
1033             assertTrue(list.get(1).isComplete());
1034             assertEquals(2, list.size());
1035         }
1036     }
1037 
1038     @Test
1039     public void testSelectTransactedCount() {
1040         try (Database db = db()) {
1041             db //
1042                     .select("select name, score, name, score, name, score, name from person where name=?") //
1043                     .parameters("FRED") //
1044                     .transacted() //
1045                     .count() //
1046                     .test() //
1047                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1048                     .assertValueCount(1) //
1049                     .assertComplete();
1050         }
1051     }
1052 
1053     @Test
1054     public void testSelectTransactedGetAs() {
1055         try (Database db = db()) {
1056             db //
1057                     .select("select name from person") //
1058                     .transacted() //
1059                     .getAs(String.class) //
1060                     .test() //
1061                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1062                     .assertValueCount(4) //
1063                     .assertComplete();
1064         }
1065     }
1066 
1067     @Test
1068     public void testSelectTransactedGetAsOptional() {
1069         try (Database db = db()) {
1070             List<Tx<Optional<String>>> list = db //
1071                     .select("select name from person where name=?") //
1072                     .parameters("FRED") //
1073                     .transacted() //
1074                     .getAsOptional(String.class) //
1075                     .test() //
1076                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1077                     .assertValueCount(2) //
1078                     .assertComplete() //
1079                     .values();
1080             assertTrue(list.get(0).isValue());
1081             assertEquals("FRED", list.get(0).value().get());
1082             assertTrue(list.get(1).isComplete());
1083         }
1084     }
1085 
1086     @Test
1087     public void testDatabaseFrom() {
1088         Database.from(DatabaseCreator.nextUrl(), 3) //
1089                 .select("select name from person") //
1090                 .getAs(String.class) //
1091                 .count() //
1092                 .test() //
1093                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1094                 .assertError(JdbcSQLSyntaxErrorException.class);
1095     }
1096 
1097     @Test
1098     public void testSelectTransactedChained() throws Exception {
1099         try (Database db = db()) {
1100             db //
1101                     .select("select score from person where name=?") //
1102                     .parameters("FRED", "JOSEPH") //
1103                     .transacted() //
1104                     .transactedValuesOnly() //
1105                     .getAs(Integer.class) //
1106                     .doOnNext(tx -> log
1107                             .debug(tx.isComplete() ? "complete" : String.valueOf(tx.value())))//
1108                     .flatMap(tx -> tx //
1109                             .select("select name from person where score = ?") //
1110                             .parameter(tx.value()) //
1111                             .valuesOnly() //
1112                             .getAs(String.class)) //
1113                     .test() //
1114                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1115                     .assertNoErrors() //
1116                     .assertValues("FRED", "JOSEPH") //
1117                     .assertComplete();
1118         }
1119     }
1120 
1121     @Test
1122     public void databaseIsAutoCloseable() {
1123         try (Database db = db()) {
1124             log.debug(db.toString());
1125         }
1126     }
1127 
1128     private static void println(Object o) {
1129         log.debug("{}", o);
1130     }
1131 
1132     @Test
1133     public void testSelectChained() {
1134         try (Database db = db(1)) {
1135             // we can do this with 1 connection only!
1136             db.select("select score from person where name=?") //
1137                     .parameters("FRED", "JOSEPH") //
1138                     .getAs(Integer.class) //
1139                     .doOnNext(DatabaseTest::println) //
1140                     .concatMap(score -> {
1141                         log.debug("score={}", score);
1142                         return db //
1143                                 .select("select name from person where score = ?") //
1144                                 .parameter(score) //
1145                                 .getAs(String.class) //
1146                                 .doOnComplete(
1147                                         () -> log.debug("completed select where score=" + score));
1148                     }) //
1149                     .test() //
1150                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1151                     .assertNoErrors() //
1152                     .assertValues("FRED", "JOSEPH") //
1153                     .assertComplete(); //
1154         }
1155     }
1156 
1157     @Test
1158     public void testReadMeFragment1() {
1159         try (Database db = db()) {
1160             db.select("select name from person") //
1161                     .getAs(String.class) //
1162                     .forEach(DatabaseTest::println);
1163         }
1164     }
1165 
1166     @Test
1167     public void testReadMeFragmentColumnDoesNotExistEmitsSqlSyntaxErrorException() {
1168         try (Database db = Database.test()) {
1169             db.select("select nam from person") //
1170                     .getAs(String.class) //
1171                     .test() //
1172                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1173                     .assertNoValues() //
1174                     .assertError(SQLSyntaxErrorException.class);
1175         }
1176     }
1177 
1178     @Test
1179     public void testReadMeFragmentDerbyHealthCheck() {
1180         try (Database db = Database.test()) {
1181             db.select("select 'a' from sysibm.sysdummy1") //
1182                     .getAs(String.class) //
1183                     .test() //
1184                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1185                     .assertValue("a") //
1186                     .assertComplete();
1187         }
1188     }
1189 
1190     @Test
1191     public void testTupleSupport() {
1192         try (Database db = db()) {
1193             db.select("select name, score from person") //
1194                     .getAs(String.class, Integer.class) //
1195                     .forEach(DatabaseTest::println);
1196         }
1197     }
1198 
1199     @Test
1200     public void testDelayedCallsAreNonBlocking() throws InterruptedException {
1201         List<String> list = new CopyOnWriteArrayList<String>();
1202         try (Database db = db(1)) { //
1203             db.select("select score from person where name=?") //
1204                     .parameter("FRED") //
1205                     .getAs(Integer.class) //
1206                     .doOnNext(x -> Thread.sleep(1000)) //
1207                     .subscribeOn(Schedulers.io()) //
1208                     .subscribe();
1209             Thread.sleep(100);
1210             CountDownLatch latch = new CountDownLatch(1);
1211             db.select("select score from person where name=?") //
1212                     .parameter("FRED") //
1213                     .getAs(Integer.class) //
1214                     .doOnNext(x -> list.add("emitted")) //
1215                     .doOnNext(x -> log.debug("emitted on " + Thread.currentThread().getName())) //
1216                     .doOnNext(x -> latch.countDown()) //
1217                     .subscribe();
1218             list.add("subscribed");
1219             assertTrue(latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
1220             assertEquals(Arrays.asList("subscribed", "emitted"), list);
1221         }
1222     }
1223 
1224     @Test
1225     public void testAutoMapToInterface() {
1226         try (Database db = db()) {
1227             db //
1228                     .select("select name from person") //
1229                     .autoMap(Person.class) //
1230                     .map(p -> p.name()) //
1231                     .test() //
1232                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1233                     .assertValueCount(3) //
1234                     .assertComplete();
1235         }
1236     }
1237 
1238     @Test
1239     public void testAutoMapToInterfaceWithEnum() {
1240         try (Database db = db()) {
1241             db //
1242                     .select("select name from person") //
1243                     .autoMap(Person.class) //
1244                     .map(p -> p.name()) //
1245                     .test() //
1246                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1247                     .assertValueCount(3) //
1248                     .assertComplete();
1249         }
1250     }
1251 
1252     @Test
1253     public void testAutoMapToInterfaceWithoutAnnotationstsError() {
1254         try (Database db = db()) {
1255             db //
1256                     .select("select name from person") //
1257                     .autoMap(PersonNoAnnotation.class) //
1258                     .map(p -> p.name()) //
1259                     .test() //
1260                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1261                     .assertNoValues() //
1262                     .assertError(AnnotationsNotFoundException.class);
1263         }
1264     }
1265 
1266     @Test
1267     public void testAutoMapToInterfaceWithTwoMethods() {
1268         try (Database db = db()) {
1269             db //
1270                     .select("select name, score from person order by name") //
1271                     .autoMap(Person2.class) //
1272                     .firstOrError() //
1273                     .map(Person2::score) //
1274                     .test() //
1275                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1276                     .assertValue(21) //
1277                     .assertComplete();
1278         }
1279     }
1280 
1281     @Test
1282     public void testAutoMapToInterfaceWithExplicitColumnName() {
1283         try (Database db = db()) {
1284             db //
1285                     .select("select name, score from person order by name") //
1286                     .autoMap(Person3.class) //
1287                     .firstOrError() //
1288                     .map(Person3::examScore) //
1289                     .test() //
1290                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1291                     .assertValue(21) //
1292                     .assertComplete();
1293         }
1294     }
1295 
1296     @Test
1297     public void testAutoMapToInterfaceWithExplicitColumnNameThatDoesNotExist() {
1298         try (Database db = db()) {
1299             db //
1300                     .select("select name, score from person order by name") //
1301                     .autoMap(Person4.class) //
1302                     .firstOrError() //
1303                     .map(Person4::examScore) //
1304                     .test() //
1305                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1306                     .assertNoValues() //
1307                     .assertError(ColumnNotFoundException.class);
1308         }
1309     }
1310 
1311     @Test
1312     public void testAutoMapToInterfaceWithIndex() {
1313         try (Database db = db()) {
1314             db //
1315                     .select("select name, score from person order by name") //
1316                     .autoMap(Person5.class) //
1317                     .firstOrError() //
1318                     .map(Person5::examScore) //
1319                     .test() //
1320                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1321                     .assertValue(21) //
1322                     .assertComplete();
1323         }
1324     }
1325 
1326     @Test
1327     public void testAutoMapToInterfaceWithIndexTooLarge() {
1328         try (Database db = db()) {
1329             db //
1330                     .select("select name, score from person order by name") //
1331                     .autoMap(Person6.class) //
1332                     .firstOrError() //
1333                     .map(Person6::examScore) //
1334                     .test() //
1335                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1336                     .assertNoValues() //
1337                     .assertError(ColumnIndexOutOfRangeException.class);
1338         }
1339     }
1340 
1341     @Test
1342     public void testAutoMapToInterfaceWithIndexTooSmall() {
1343         try (Database db = db()) {
1344             db //
1345                     .select("select name, score from person order by name") //
1346                     .autoMap(Person7.class) //
1347                     .firstOrError() //
1348                     .map(Person7::examScore) //
1349                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1350                     .assertNoValues() //
1351                     .assertError(ColumnIndexOutOfRangeException.class);
1352         }
1353     }
1354 
1355     @Test
1356     public void testAutoMapWithUnmappableColumnType() {
1357         try (Database db = db()) {
1358             db //
1359                     .select("select name from person order by name") //
1360                     .autoMap(Person8.class) //
1361                     .map(p -> p.name()) //
1362                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1363                     .assertNoValues() //
1364                     .assertError(ClassCastException.class);
1365         }
1366     }
1367 
1368     @Test
1369     public void testAutoMapWithMixIndexAndName() {
1370         try (Database db = db()) {
1371             db //
1372                     .select("select name, score from person order by name") //
1373                     .autoMap(Person9.class) //
1374                     .firstOrError() //
1375                     .map(Person9::score) //
1376                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1377                     .assertValue(21) //
1378                     .assertComplete();
1379         }
1380     }
1381 
1382     @Test
1383     public void testAutoMapWithQueryInAnnotation() {
1384         try (Database db = db()) {
1385             db.select(Person10.class) //
1386                     .get() //
1387                     .firstOrError() //
1388                     .map(Person10::score) //
1389                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1390                     .assertValue(21) //
1391                     .assertComplete();
1392         }
1393     }
1394 
1395     @Test
1396     @Ignore
1397     public void testAutoMapForReadMe() {
1398         try (Database db = Database.test()) {
1399             db.select(Person10.class) //
1400                     .get(Person10::name) //
1401                     .blockingForEach(DatabaseTest::println);
1402         }
1403     }
1404 
1405     @Test(expected = QueryAnnotationMissingException.class)
1406     public void testAutoMapWithoutQueryInAnnotation() {
1407         try (Database db = db()) {
1408             db.select(Person.class);
1409         }
1410     }
1411 
1412     @Test
1413     public void testSelectWithoutWhereClause() {
1414         try (Database db = db()) {
1415             Assert.assertEquals(3, (long) db.select("select name from person") //
1416                     .count() //
1417                     .blockingGet());
1418         }
1419     }
1420 
1421     @Test
1422     public void testTuple3() {
1423         try (Database db = db()) {
1424             db //
1425                     .select("select name, score, name from person order by name") //
1426                     .getAs(String.class, Integer.class, String.class) //
1427                     .firstOrError() //
1428                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1429                     .assertComplete() //
1430                     .assertValue(Tuple3.create("FRED", 21, "FRED")); //
1431         }
1432     }
1433 
1434     @Test
1435     public void testTuple4() {
1436         try (Database db = db()) {
1437             db //
1438                     .select("select name, score, name, score from person order by name") //
1439                     .getAs(String.class, Integer.class, String.class, Integer.class) //
1440                     .firstOrError() //
1441                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1442                     .assertComplete() //
1443                     .assertValue(Tuple4.create("FRED", 21, "FRED", 21)); //
1444         }
1445     }
1446 
1447     @Test
1448     public void testTuple5() {
1449         try (Database db = db()) {
1450             db //
1451                     .select("select name, score, name, score, name from person order by name") //
1452                     .getAs(String.class, Integer.class, String.class, Integer.class, String.class) //
1453                     .firstOrError() //
1454                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1455                     .assertComplete().assertValue(Tuple5.create("FRED", 21, "FRED", 21, "FRED")); //
1456         }
1457     }
1458 
1459     @Test
1460     public void testTuple6() {
1461         try (Database db = db()) {
1462             db //
1463                     .select("select name, score, name, score, name, score from person order by name") //
1464                     .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
1465                             Integer.class) //
1466                     .firstOrError() //
1467                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1468                     .assertComplete()
1469                     .assertValue(Tuple6.create("FRED", 21, "FRED", 21, "FRED", 21)); //
1470         }
1471     }
1472 
1473     @Test
1474     public void testTuple7() {
1475         try (Database db = db()) {
1476             db //
1477                     .select("select name, score, name, score, name, score, name from person order by name") //
1478                     .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
1479                             Integer.class, String.class) //
1480                     .firstOrError() //
1481                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1482                     .assertComplete()
1483                     .assertValue(Tuple7.create("FRED", 21, "FRED", 21, "FRED", 21, "FRED")); //
1484         }
1485     }
1486 
1487     @Test
1488     public void testTupleN() {
1489         try (Database db = db()) {
1490             db //
1491                     .select("select name, score, name from person order by name") //
1492                     .getTupleN() //
1493                     .firstOrError().test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1494                     .assertComplete() //
1495                     .assertValue(TupleN.create("FRED", 21, "FRED")); //
1496         }
1497     }
1498 
1499     @Test
1500     public void testTupleNWithClass() {
1501         try (Database db = db()) {
1502             db //
1503                     .select("select score a, score b from person order by name") //
1504                     .getTupleN(Integer.class) //
1505                     .firstOrError() //
1506                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1507                     .assertComplete() //
1508                     .assertValue(TupleN.create(21, 21)); //
1509         }
1510     }
1511 
1512     @Test
1513     public void testTupleNWithClassInTransaction() {
1514         try (Database db = db()) {
1515             db //
1516                     .select("select score a, score b from person order by name") //
1517                     .transactedValuesOnly() //
1518                     .getTupleN(Integer.class) //
1519                     .map(x -> x.value()) //
1520                     .firstOrError() //
1521                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1522                     .assertComplete() //
1523                     .assertValue(TupleN.create(21, 21)); //
1524         }
1525     }
1526 
1527     @Test
1528     public void testHealthCheck() throws InterruptedException {
1529         AtomicBoolean once = new AtomicBoolean(true);
1530         testHealthCheck(c -> {
1531             log.debug("doing health check");
1532             return !once.compareAndSet(true, false);
1533         });
1534     }
1535 
1536     @Test
1537     public void testHealthCheckThatThrows() throws InterruptedException {
1538         AtomicBoolean once = new AtomicBoolean(true);
1539         testHealthCheck(c -> {
1540             log.debug("doing health check");
1541             if (!once.compareAndSet(true, false))
1542                 return true;
1543             else
1544                 throw new RuntimeException("health check failed");
1545         });
1546     }
1547 
1548     @Test
1549     public void testUpdateOneRow() {
1550         try (Database db = db()) {
1551             db.update("update person set score=20 where name='FRED'") //
1552                     .counts() //
1553                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1554                     .assertValue(1) //
1555                     .assertComplete();
1556         }
1557     }
1558 
1559     @Test
1560     public void testUpdateThreeRows() {
1561         try (Database db = db()) {
1562             db.update("update person set score=20") //
1563                     .counts() //
1564                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1565                     .assertValue(3) //
1566                     .assertComplete();
1567         }
1568     }
1569 
1570     @Test
1571     public void testUpdateWithParameter() {
1572         try (Database db = db()) {
1573             db.update("update person set score=20 where name=?") //
1574                     .parameter("FRED").counts() //
1575                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1576                     .assertValue(1) //
1577                     .assertComplete();
1578         }
1579     }
1580 
1581     @Test
1582     public void testUpdateWithParameterTwoRuns() {
1583         try (Database db = db()) {
1584             db.update("update person set score=20 where name=?") //
1585                     .parameters("FRED", "JOSEPH").counts() //
1586                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1587                     .assertValues(1, 1) //
1588                     .assertComplete();
1589         }
1590     }
1591 
1592     @Test
1593     public void testUpdateAllWithParameterFourRuns() {
1594         try (Database db = db()) {
1595             db.update("update person set score=?") //
1596                     .parameters(1, 2, 3, 4) //
1597                     .counts() //
1598                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1599                     .assertValues(3, 3, 3, 3) //
1600                     .assertComplete();
1601         }
1602     }
1603 
1604     @Test
1605     public void testUpdateWithBatchSize2() {
1606         try (Database db = db()) {
1607             db.update("update person set score=?") //
1608                     .batchSize(2) //
1609                     .parameters(1, 2, 3, 4) //
1610                     .counts() //
1611                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1612                     .assertValues(3, 3, 3, 3) //
1613                     .assertComplete();
1614         }
1615     }
1616 
1617     @Test
1618     public void testUpdateWithBatchSize3GreaterThanNumRecords() {
1619         try (Database db = db()) {
1620             db.update("update person set score=?") //
1621                     .batchSize(3) //
1622                     .parameters(1, 2, 3, 4) //
1623                     .counts() //
1624                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1625                     .assertValues(3, 3, 3, 3) //
1626                     .assertComplete();
1627         }
1628     }
1629 
1630     @Test
1631     public void testInsert() {
1632         try (Database db = db()) {
1633             db.update("insert into person(name, score) values(?,?)") //
1634                     .parameters("DAVE", 12, "ANNE", 18) //
1635                     .counts() //
1636                     .test() //
1637                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1638                     .assertValues(1, 1) //
1639                     .assertComplete();
1640             List<Tuple2<String, Integer>> list = db.select("select name, score from person") //
1641                     .getAs(String.class, Integer.class) //
1642                     .toList() //
1643                     .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1644                     .blockingGet();
1645             assertTrue(list.contains(Tuple2.create("DAVE", 12)));
1646             assertTrue(list.contains(Tuple2.create("ANNE", 18)));
1647         }
1648     }
1649 
1650     @Test
1651     public void testReturnGeneratedKeys() {
1652         try (Database db = db()) {
1653             // note is a table with auto increment
1654             db.update("insert into note(text) values(?)") //
1655                     .parameters("HI", "THERE") //
1656                     .returnGeneratedKeys() //
1657                     .getAs(Integer.class)//
1658                     .test() //
1659                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1660                     .assertValues(1, 2) //
1661                     .assertComplete();
1662 
1663             db.update("insert into note(text) values(?)") //
1664                     .parameters("ME", "TOO") //
1665                     .returnGeneratedKeys() //
1666                     .getAs(Integer.class)//
1667                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1668                     .assertValues(3, 4) //
1669                     .assertComplete();
1670         }
1671     }
1672 
1673     @Test
1674     public void testReturnGeneratedKeysDerby() {
1675         Database db = DatabaseCreator.createDerby(1);
1676 
1677         // note is a table with auto increment
1678         db.update("insert into note2(text) values(?)") //
1679                 .parameters("HI", "THERE") //
1680                 .returnGeneratedKeys() //
1681                 .getAs(Integer.class)//
1682                 .test() //
1683                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1684                 .assertNoErrors().assertValues(1, 3) //
1685                 .assertComplete();
1686 
1687         db.update("insert into note2(text) values(?)") //
1688                 .parameters("ME", "TOO") //
1689                 .returnGeneratedKeys() //
1690                 .getAs(Integer.class)//
1691                 .test() //
1692                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1693                 .assertValues(5, 7) //
1694                 .assertComplete();
1695     }
1696 
1697     @Test(expected = IllegalArgumentException.class)
1698     public void testReturnGeneratedKeysWithBatchSizeShouldThrow() {
1699         try (Database db = db()) {
1700             // note is a table with auto increment
1701             db.update("insert into note(text) values(?)") //
1702                     .parameters("HI", "THERE") //
1703                     .batchSize(2) //
1704                     .returnGeneratedKeys();
1705         }
1706     }
1707 
1708     @Test
1709     public void testTransactedReturnGeneratedKeys() {
1710         try (Database db = db()) {
1711             // note is a table with auto increment
1712             db.update("insert into note(text) values(?)") //
1713                     .parameters("HI", "THERE") //
1714                     .transacted() //
1715                     .returnGeneratedKeys() //
1716                     .valuesOnly() //
1717                     .getAs(Integer.class)//
1718                     .test() //
1719                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1720                     .assertValues(1, 2) //
1721                     .assertComplete();
1722 
1723             db.update("insert into note(text) values(?)") //
1724                     .parameters("ME", "TOO") //
1725                     .transacted() //
1726                     .returnGeneratedKeys() //
1727                     .valuesOnly() //
1728                     .getAs(Integer.class)//
1729                     .test() //
1730                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1731                     .assertValues(3, 4) //
1732                     .assertComplete();
1733         }
1734     }
1735 
1736     @Test
1737     public void testTransactedReturnGeneratedKeys2() {
1738         try (Database db = db()) {
1739             // note is a table with auto increment
1740             Flowable<Integer> a = db.update("insert into note(text) values(?)") //
1741                     .parameters("HI", "THERE") //
1742                     .transacted() //
1743                     .returnGeneratedKeys() //
1744                     .valuesOnly() //
1745                     .getAs(Integer.class);
1746 
1747             db.update("insert into note(text) values(?)") //
1748                     .parameters("ME", "TOO") //
1749                     .transacted() //
1750                     .returnGeneratedKeys() //
1751                     .valuesOnly() //
1752                     .getAs(Integer.class)//
1753                     .startWith(a) //
1754                     .test() //
1755                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1756                     .assertValues(1, 2, 3, 4) //
1757                     .assertComplete();
1758         }
1759     }
1760 
1761     @Test
1762     public void testUpdateWithinTransaction() {
1763         try (Database db = db()) {
1764             db //
1765                     .select("select name from person") //
1766                     .transactedValuesOnly() //
1767                     .getAs(String.class) //
1768                     .doOnNext(DatabaseTest::println) //
1769                     .flatMap(tx -> tx//
1770                             .update("update person set score=-1 where name=:name") //
1771                             .batchSize(1) //
1772                             .parameter("name", tx.value()) //
1773                             .valuesOnly() //
1774                             .counts()) //
1775                     .test() //
1776                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1777                     .assertValues(1, 1, 1) //
1778                     .assertComplete();
1779         }
1780     }
1781 
1782     @Test
1783     public void testSelectDependsOnFlowable() {
1784         try (Database db = db()) {
1785             Flowable<Integer> a = db.update("update person set score=100 where name=?") //
1786                     .parameter("FRED") //
1787                     .counts();
1788             db.select("select score from person where name=?") //
1789                     .parameter("FRED") //
1790                     .dependsOn(a) //
1791                     .getAs(Integer.class)//
1792                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1793                     .assertValues(100) //
1794                     .assertComplete();
1795         }
1796     }
1797 
1798     @Test
1799     public void testSelectDependsOnObservable() {
1800         try (Database db = db()) {
1801             Observable<Integer> a = db.update("update person set score=100 where name=?") //
1802                     .parameter("FRED") //
1803                     .counts().toObservable();
1804             db.select("select score from person where name=?") //
1805                     .parameter("FRED") //
1806                     .dependsOn(a) //
1807                     .getAs(Integer.class)//
1808                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1809                     .assertValues(100) //
1810                     .assertComplete();
1811         }
1812     }
1813 
1814     @Test
1815     public void testSelectDependsOnOnSingle() {
1816         try (Database db = db()) {
1817             Single<Long> a = db.update("update person set score=100 where name=?") //
1818                     .parameter("FRED") //
1819                     .counts().count();
1820             db.select("select score from person where name=?") //
1821                     .parameter("FRED") //
1822                     .dependsOn(a) //
1823                     .getAs(Integer.class)//
1824                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1825                     .assertValues(100) //
1826                     .assertComplete();
1827         }
1828     }
1829 
1830     @Test
1831     public void testSelectDependsOnCompletable() {
1832         try (Database db = db()) {
1833             Completable a = db.update("update person set score=100 where name=?") //
1834                     .parameter("FRED") //
1835                     .counts().ignoreElements();
1836             db.select("select score from person where name=?") //
1837                     .parameter("FRED") //
1838                     .dependsOn(a) //
1839                     .getAs(Integer.class)//
1840                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1841                     .assertValues(100) //
1842                     .assertComplete();
1843         }
1844     }
1845 
1846     @Test
1847     public void testUpdateWithinTransactionBatchSize0() {
1848         try (Database db = db()) {
1849             db //
1850                     .select("select name from person") //
1851                     .transactedValuesOnly() //
1852                     .getAs(String.class) //
1853                     .doOnNext(DatabaseTest::println) //
1854                     .flatMap(tx -> tx//
1855                             .update("update person set score=-1 where name=:name") //
1856                             .batchSize(0) //
1857                             .parameter("name", tx.value()) //
1858                             .valuesOnly() //
1859                             .counts()) //
1860                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1861                     .assertValues(1, 1, 1) //
1862                     .assertComplete();
1863         }
1864     }
1865 
1866     @Test
1867     public void testCreateBig() {
1868         big(5).select("select count(*) from person") //
1869                 .getAs(Integer.class) //
1870                 .test().awaitDone(20, TimeUnit.SECONDS) //
1871                 .assertValue(5163) //
1872                 .assertComplete();
1873     }
1874 
1875     @Test
1876     public void testTxWithBig() {
1877         big(1) //
1878                 .select("select name from person") //
1879                 .transactedValuesOnly() //
1880                 .getAs(String.class) //
1881                 .flatMap(tx -> tx//
1882                         .update("update person set score=-1 where name=:name") //
1883                         .batchSize(1) //
1884                         .parameter("name", tx.value()) //
1885                         .valuesOnly() //
1886                         .counts()) //
1887                 .count() //
1888                 .test().awaitDone(20, TimeUnit.SECONDS) //
1889                 .assertValue((long) NAMES_COUNT_BIG) //
1890                 .assertComplete();
1891     }
1892 
1893     @Test
1894     public void testTxWithBigInputBatchSize2000() {
1895         big(1) //
1896                 .select("select name from person") //
1897                 .transactedValuesOnly() //
1898                 .getAs(String.class) //
1899                 .flatMap(tx -> tx//
1900                         .update("update person set score=-1 where name=:name") //
1901                         .batchSize(2000) //
1902                         .parameter("name", tx.value()) //
1903                         .valuesOnly() //
1904                         .counts()) //
1905                 .count() //
1906                 .test().awaitDone(20, TimeUnit.SECONDS) //
1907                 .assertValue((long) NAMES_COUNT_BIG) //
1908                 .assertComplete();
1909     }
1910 
1911     @Test
1912     public void testInsertNullClobAndReadClobAsString() {
1913         try (Database db = db()) {
1914             insertNullClob(db);
1915             db.select("select document from person_clob where name='FRED'") //
1916                     .getAsOptional(String.class) //
1917                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1918                     .assertValue(Optional.<String>empty()) //
1919                     .assertComplete();
1920         }
1921     }
1922 
1923     @Test
1924     public void testAutomapClobIssue32() {
1925         try (Database db = db()) {
1926             assertTrue(db.update("insert into person_clob(name, document) values(?, ? )") //
1927                     .parameters("fred", "hello there") //
1928                     .complete() //
1929                     .blockingAwait(TIMEOUT_SECONDS, TimeUnit.SECONDS));
1930             db.select("select * from person_clob") //
1931                     .autoMap(PersonClob.class) //
1932                     .map(pc -> {
1933                         log.debug(pc.toString());
1934                         return pc.document();
1935                     }) //
1936                     .test() //
1937                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1938                     .assertValueCount(1) //
1939                     .assertComplete();
1940         }
1941     }
1942 
1943     private static interface PersonClob {
1944         @Column
1945         String name();
1946 
1947         @Column
1948         String document();
1949     }
1950 
1951     private static void insertNullClob(Database db) {
1952         db.update("insert into person_clob(name,document) values(?,?)") //
1953                 .parameters("FRED", Database.NULL_CLOB) //
1954                 .counts() //
1955                 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1956                 .assertValue(1) //
1957                 .assertComplete();
1958     }
1959 
1960     @Test
1961     public void testClobMethod() {
1962         assertEquals(Database.NULL_CLOB, Database.clob(null));
1963     }
1964 
1965     @Test
1966     public void testBlobMethod() {
1967         assertEquals(Database.NULL_BLOB, Database.blob(null));
1968     }
1969 
1970     @Test
1971     public void testClobMethodPresent() {
1972         assertEquals("a", Database.clob("a"));
1973     }
1974 
1975     @Test
1976     public void testBlobMethodPresent() {
1977         byte[] b = new byte[1];
1978         assertEquals(b, Database.blob(b));
1979     }
1980 
1981     @Test
1982     public void testDateOfBirthNullableForReadMe() {
1983         Database.test() //
1984                 .select("select date_of_birth from person where name='FRED'") //
1985                 .getAsOptional(Instant.class) //
1986                 .blockingForEach(DatabaseTest::println);
1987     }
1988 
1989     @Test
1990     public void testInsertNullClobAndReadClobAsTuple2() {
1991         try (Database db = db()) {
1992             insertNullClob(db);
1993             db.select("select document, document from person_clob where name='FRED'") //
1994                     .getAs(String.class, String.class) //
1995                     .test() //
1996                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
1997                     .assertValue(Tuple2.create(null, null)) //
1998                     .assertComplete();
1999         }
2000     }
2001 
2002     @Test
2003     public void testInsertClobAndReadClobAsString() {
2004         try (Database db = db()) {
2005             db.update("insert into person_clob(name,document) values(?,?)") //
2006                     .parameters("FRED", "some text here") //
2007                     .counts() //
2008                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2009                     .assertValue(1) //
2010                     .assertComplete();
2011             db.select("select document from person_clob where name='FRED'") //
2012                     .getAs(String.class) //
2013                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) // .assertValue("some
2014                                                                          // text
2015                                                                          // here")
2016                                                                          // //
2017                     .assertComplete();
2018         }
2019     }
2020 
2021     @Test
2022     public void testInsertClobAndReadClobUsingReader() {
2023         try (Database db = db()) {
2024             db.update("insert into person_clob(name,document) values(?,?)") //
2025                     .parameters("FRED", "some text here") //
2026                     .counts() //
2027                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2028                     .assertValue(1) //
2029                     .assertComplete();
2030             db.select("select document from person_clob where name='FRED'") //
2031                     .getAs(Reader.class) //
2032                     .map(r -> read(r)).test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2033                     .assertValue("some text here") //
2034                     .assertComplete();
2035         }
2036     }
2037 
2038     @Test
2039     public void testInsertBlobAndReadBlobAsByteArray() {
2040         try (Database db = db()) {
2041             insertPersonBlob(db);
2042             db.select("select document from person_blob where name='FRED'") //
2043                     .getAs(byte[].class) //
2044                     .map(b -> new String(b)) //
2045                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2046                     .assertValue("some text here") //
2047                     .assertComplete();
2048         }
2049     }
2050 
2051     private static void insertPersonBlob(Database db) {
2052         byte[] bytes = "some text here".getBytes();
2053         db.update("insert into person_blob(name,document) values(?,?)") //
2054                 .parameters("FRED", bytes) //
2055                 .counts() //
2056                 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2057                 .assertValue(1) //
2058                 .assertComplete();
2059     }
2060 
2061     @Test
2062     public void testInsertBlobAndReadBlobAsInputStream() {
2063         try (Database db = db()) {
2064             byte[] bytes = "some text here".getBytes();
2065             db.update("insert into person_blob(name,document) values(?,?)") //
2066                     .parameters("FRED", new ByteArrayInputStream(bytes)) //
2067                     .counts() //
2068                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2069                     .assertValue(1) //
2070                     .assertComplete();
2071             db.select("select document from person_blob where name='FRED'") //
2072                     .getAs(InputStream.class) //
2073                     .map(is -> read(is)) //
2074                     .map(b -> new String(b)) //
2075                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2076                     .assertValue("some text here") //
2077                     .assertComplete();
2078         }
2079     }
2080 
2081     private static String read(Reader reader) throws IOException {
2082         StringBuffer s = new StringBuffer();
2083         char[] ch = new char[128];
2084         int n = 0;
2085         while ((n = reader.read(ch)) != -1) {
2086             s.append(ch, 0, n);
2087         }
2088         reader.close();
2089         return s.toString();
2090     }
2091 
2092     private static byte[] read(InputStream is) throws IOException {
2093         ByteArrayOutputStream bytes = new ByteArrayOutputStream();
2094         byte[] b = new byte[128];
2095         int n = 0;
2096         while ((n = is.read(b)) != -1) {
2097             bytes.write(b, 0, n);
2098         }
2099         is.close();
2100         return bytes.toByteArray();
2101     }
2102 
2103     private void testHealthCheck(Predicate<Connection> healthy) throws InterruptedException {
2104         TestScheduler scheduler = new TestScheduler();
2105 
2106         NonBlockingConnectionPool pool = Pools //
2107                 .nonBlocking() //
2108                 .connectionProvider(DatabaseCreator.connectionProvider()) //
2109                 .maxIdleTime(10, TimeUnit.MINUTES) //
2110                 .idleTimeBeforeHealthCheck(0, TimeUnit.MINUTES) //
2111                 .healthCheck(healthy) //
2112                 .scheduler(scheduler) //
2113                 .maxPoolSize(1) //
2114                 .build();
2115 
2116         try (Database db = Database.from(pool)) {
2117             TestSubscriber<Integer> ts0 = db.select( //
2118                     "select score from person where name=?") //
2119                     .parameter("FRED") //
2120                     .getAs(Integer.class) //
2121                     .test();
2122             ts0.assertValueCount(0) //
2123                     .assertNotComplete();
2124             scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
2125             ts0.assertValueCount(1) //
2126                     .assertComplete();
2127             TestSubscriber<Integer> ts = db.select( //
2128                     "select score from person where name=?") //
2129                     .parameter("FRED") //
2130                     .getAs(Integer.class) //
2131                     .test() //
2132                     .assertValueCount(0);
2133             log.debug("done2");
2134             scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
2135             Thread.sleep(200);
2136             ts.assertValueCount(1);
2137             Thread.sleep(200);
2138             ts.assertValue(21) //
2139                     .assertComplete();
2140         }
2141     }
2142 
2143     @Test
2144     public void testShutdownBeforeUse() {
2145         NonBlockingConnectionPool pool = Pools //
2146                 .nonBlocking() //
2147                 .connectionProvider(DatabaseCreator.connectionProvider()) //
2148                 .scheduler(Schedulers.io()) //
2149                 .maxPoolSize(1) //
2150                 .build();
2151         pool.close();
2152         Database.from(pool) //
2153                 .select("select score from person where name=?") //
2154                 .parameter("FRED") //
2155                 .getAs(Integer.class) //
2156                 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2157                 .assertNoValues() //
2158                 .assertError(PoolClosedException.class);
2159     }
2160 
2161     @Test
2162     public void testFewerColumnsMappedThanAvailable() {
2163         try (Database db = db()) {
2164             db.select("select name, score from person where name='FRED'") //
2165                     .getAs(String.class) //
2166                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2167                     .assertValues("FRED") //
2168                     .assertComplete();
2169         }
2170     }
2171 
2172     @Test
2173     public void testMoreColumnsMappedThanAvailable() {
2174         try (Database db = db()) {
2175             db //
2176                     .select("select name, score from person where name='FRED'") //
2177                     .getAs(String.class, Integer.class, String.class) //
2178                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2179                     .assertNoValues() //
2180                     .assertError(MoreColumnsRequestedThanExistException.class);
2181         }
2182     }
2183 
2184     @Test
2185     public void testSelectTimestamp() {
2186         try (Database db = db()) {
2187             db //
2188                     .select("select registered from person where name='FRED'") //
2189                     .getAs(Long.class) //
2190                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2191                     .assertValue(FRED_REGISTERED_TIME) //
2192                     .assertComplete();
2193         }
2194     }
2195 
2196     @Test
2197     public void testSelectTimestampAsDate() {
2198         try (Database db = db()) {
2199             db //
2200                     .select("select registered from person where name='FRED'") //
2201                     .getAs(Date.class) //
2202                     .map(d -> d.getTime()) //
2203                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2204                     .assertValue(FRED_REGISTERED_TIME) //
2205                     .assertComplete();
2206         }
2207     }
2208 
2209     @Test
2210     public void testSelectTimestampAsInstant() {
2211         try (Database db = db()) {
2212             db //
2213                     .select("select registered from person where name='FRED'") //
2214                     .getAs(Instant.class) //
2215                     .map(d -> d.toEpochMilli()) //
2216                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2217                     .assertValue(FRED_REGISTERED_TIME) //
2218                     .assertComplete();
2219         }
2220     }
2221 
2222     @Test
2223     public void testUpdateCalendarParameter() {
2224         Calendar c = GregorianCalendar.from(ZonedDateTime.parse("2017-03-25T15:37Z"));
2225         try (Database db = db()) {
2226             db.update("update person set registered=?") //
2227                     .parameter(c) //
2228                     .counts() //
2229                     .test() //
2230                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2231                     .assertValue(3) //
2232                     .assertComplete();
2233             db.select("select registered from person") //
2234                     .getAs(Long.class) //
2235                     .firstOrError() //
2236                     .test() //
2237                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2238                     .assertValue(c.getTimeInMillis()) //
2239                     .assertComplete();
2240         }
2241     }
2242 
2243     @Test
2244     public void testUpdateTimeParameter() throws InterruptedException {
2245         try (Database db = db()) {
2246             Timestamp t = new Timestamp(1234);
2247             db.update("update person set registered=?") //
2248                     .parameter(t) //
2249                     .counts() //
2250                     .test() //
2251                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2252                     .assertValue(3) //
2253                     .assertComplete();
2254             db.select("select registered from person") //
2255                     .getAs(Long.class) //
2256                     .firstOrError() //
2257                     .test() //
2258                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2259                     .assertValue(1234L) //
2260                     .assertComplete();
2261         }
2262     }
2263 
2264     @Test
2265     public void testUpdateTimestampParameter() {
2266         try (Database db = db()) {
2267             Timestamp t = new Timestamp(1234);
2268             db.update("update person set registered=?") //
2269                     .parameter(t) //
2270                     .counts() //
2271                     .test() //
2272                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2273                     .assertValue(3) //
2274                     .assertComplete();
2275             db.select("select registered from person") //
2276                     .getAs(Long.class) //
2277                     .firstOrError() //
2278                     .test() //
2279                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2280                     .assertValue(1234L) //
2281                     .assertComplete();
2282         }
2283     }
2284 
2285     @Test
2286     public void testUpdateSqlDateParameter() {
2287         try (Database db = db()) {
2288             java.sql.Date t = new java.sql.Date(1234);
2289 
2290             db.update("update person set registered=?") //
2291                     .parameter(t) //
2292                     .counts() //
2293                     .test() //
2294                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2295                     .assertValue(3) //
2296                     .assertComplete();
2297             db.select("select registered from person") //
2298                     .getAs(Long.class) //
2299                     .firstOrError() //
2300                     .test() //
2301                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2302                     // TODO make a more accurate comparison using the current TZ
2303                     .assertValue(x -> Math.abs(x - 1234) <= TimeUnit.HOURS.toMillis(24)) //
2304                     .assertComplete();
2305         }
2306     }
2307 
2308     @Test
2309     public void testUpdateUtilDateParameter() {
2310         try (Database db = db()) {
2311             Date d = new Date(1234);
2312             db.update("update person set registered=?") //
2313                     .parameter(d) //
2314                     .counts() //
2315                     .test() //
2316                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2317                     .assertValue(3) //
2318                     .assertComplete();
2319             db.select("select registered from person") //
2320                     .getAs(Long.class) //
2321                     .firstOrError() //
2322                     .test() //
2323                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2324                     .assertValue(1234L) //
2325                     .assertComplete();
2326         }
2327     }
2328 
2329     @Test
2330     public void testUpdateTimestampAsInstant() {
2331         try (Database db = db()) {
2332             db.update("update person set registered=? where name='FRED'") //
2333                     .parameter(Instant.ofEpochMilli(FRED_REGISTERED_TIME)) //
2334                     .counts() //
2335                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2336                     .assertValue(1) //
2337                     .assertComplete();
2338             db.select("select registered from person where name='FRED'") //
2339                     .getAs(Long.class) //
2340                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2341                     .assertValue(FRED_REGISTERED_TIME) //
2342                     .assertComplete();
2343         }
2344     }
2345 
2346     @Test
2347     public void testUpdateTimestampAsZonedDateTime() {
2348         try (Database db = db()) {
2349             db.update("update person set registered=? where name='FRED'") //
2350                     .parameter(ZonedDateTime.ofInstant(Instant.ofEpochMilli(FRED_REGISTERED_TIME),
2351                             ZoneOffset.UTC.normalized())) //
2352                     .counts() //
2353                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2354                     .assertValue(1) //
2355                     .assertComplete();
2356             db.select("select registered from person where name='FRED'") //
2357                     .getAs(Long.class) //
2358                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2359                     .assertValue(FRED_REGISTERED_TIME) //
2360                     .assertComplete();
2361         }
2362     }
2363 
2364     @Test
2365     public void testCompleteCompletes() {
2366         try (Database db = db(1)) {
2367             db //
2368                     .update("update person set score=-3 where name='FRED'") //
2369                     .complete() //
2370                     .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2371                     .blockingAwait();
2372 
2373             int score = db.select("select score from person where name='FRED'") //
2374                     .getAs(Integer.class) //
2375                     .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2376                     .blockingFirst();
2377             assertEquals(-3, score);
2378         }
2379     }
2380 
2381     @Test
2382     public void testComplete() throws InterruptedException {
2383         try (Database db = db(1)) {
2384             Completable a = db //
2385                     .update("update person set score=-3 where name='FRED'") //
2386                     .complete();
2387             db.update("update person set score=-4 where score = -3") //
2388                     .dependsOn(a) //
2389                     .counts() //
2390                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2391                     .assertValue(1) //
2392                     .assertComplete();
2393         }
2394     }
2395 
2396     @Test
2397     public void testCountsOnlyInTransaction() {
2398         try (Database db = db()) {
2399             db.update("update person set score = -3") //
2400                     .transacted() //
2401                     .countsOnly() //
2402                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2403                     .assertValues(3) //
2404                     .assertComplete();
2405         }
2406     }
2407 
2408     @Test
2409     public void testCountsInTransaction() {
2410         try (Database db = db()) {
2411             db.update("update person set score = -3") //
2412                     .transacted() //
2413                     .counts() //
2414                     .doOnNext(DatabaseTest::println) //
2415                     .toList() //
2416                     .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2417                     .assertValue(list -> list.get(0).isValue() && list.get(0).value() == 3
2418                             && list.get(1).isComplete() && list.size() == 2) //
2419                     .assertComplete();
2420         }
2421     }
2422 
2423     @Test
2424     public void testTx() throws InterruptedException {
2425         Database db = db(3);
2426         Flowable<Tx<?>> transaction = db //
2427                 .update("update person set score=-3 where name='FRED'") //
2428                 .transaction();
2429 
2430         transaction //
2431                 .doOnCancel(() -> log.debug("disposing")) //
2432                 .doOnNext(DatabaseTest::println) //
2433                 .flatMap(tx -> {
2434                     log.debug("flatmapping");
2435                     return tx //
2436                             .update("update person set score = -4 where score = -3") //
2437                             .countsOnly() //
2438                             .doOnSubscribe(s -> log.debug("subscribed")) //
2439                             .doOnNext(num -> log.debug("num=" + num));
2440                 }) //
2441                 .test() //
2442                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2443                 .assertNoErrors() //
2444                 .assertValue(1) //
2445                 .assertComplete();
2446     }
2447 
2448     @Test
2449     public void testTxAfterSelect() {
2450         Database db = db(3);
2451         Single<Tx<Integer>> transaction = db //
2452                 .select("select score from person where name='FRED'") //
2453                 .transactedValuesOnly() //
2454                 .getAs(Integer.class) //
2455                 .firstOrError();
2456 
2457         transaction //
2458                 .doOnDispose(() -> log.debug("disposing")) //
2459                 .doOnSuccess(DatabaseTest::println) //
2460                 .flatMapPublisher(tx -> {
2461                     log.debug("flatmapping");
2462                     return tx //
2463                             .update("update person set score = -4 where score = ?") //
2464                             .parameter(tx.value()) //
2465                             .countsOnly() //
2466                             .doOnSubscribe(s -> log.debug("subscribed")) //
2467                             .doOnNext(num -> log.debug("num=" + num));
2468                 }) //
2469                 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2470                 .assertNoErrors() //
2471                 .assertValue(1) //
2472                 .assertComplete();
2473     }
2474 
2475     @Test
2476     public void testUseTxOnComplete() {
2477         db(1) //
2478                 .select(Person10.class) //
2479                 .transacted() //
2480                 .get() //
2481                 .lastOrError() //
2482                 .map(tx -> tx.select("select count(*) from person") //
2483                         .count().blockingGet()) //
2484                 .test() //
2485                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2486                 .assertError(CannotForkTransactedConnection.class) //
2487                 .assertValueCount(0);
2488     }
2489 
2490     @Test
2491     public void testSingleFlatMap() {
2492         Single.just(1).flatMapPublisher(n -> Flowable.just(1)).test(1).assertValue(1)
2493                 .assertComplete();
2494     }
2495 
2496     @Test
2497     public void testAutomappedInstanceHasMeaningfulToStringMethod() {
2498         String s = Database.test() //
2499                 .select("select name, score from person where name=?") //
2500                 .parameterStream(Flowable.just("FRED")) //
2501                 .autoMap(Person2.class) //
2502                 .map(x -> x.toString()) //
2503                 .blockingSingle();
2504         assertEquals("Person2[name=FRED, score=21]", s);
2505     }
2506 
2507     @Test
2508     public void testAutomappedEquals() {
2509         boolean b = Database.test() //
2510                 .select("select name, score from person where name=?") //
2511                 .parameterStream(Flowable.just("FRED")) //
2512                 .autoMap(Person2.class) //
2513                 .map(x -> x.equals(x)) //
2514                 .blockingSingle();
2515         assertTrue(b);
2516     }
2517 
2518     @Test
2519     public void testAutomappedDoesNotEqualNull() {
2520         boolean b = Database.test() //
2521                 .select("select name, score from person where name=?") //
2522                 .parameterStream(Flowable.just("FRED")) //
2523                 .autoMap(Person2.class) //
2524                 .map(x -> x.equals(null)) //
2525                 .blockingSingle();
2526         assertFalse(b);
2527     }
2528 
2529     @Test
2530     public void testAutomappedDoesNotEqual() {
2531         boolean b = Database.test() //
2532                 .select("select name, score from person where name=?") //
2533                 .parameterStream(Flowable.just("FRED")) //
2534                 .autoMap(Person2.class) //
2535                 .map(x -> x.equals(new Object())) //
2536                 .blockingSingle();
2537         assertFalse(b);
2538     }
2539 
2540     @Test
2541     public void testAutomappedHashCode() {
2542         Person2 p = Database.test() //
2543                 .select("select name, score from person where name=?") //
2544                 .parameterStream(Flowable.just("FRED")) //
2545                 .autoMap(Person2.class) //
2546                 .blockingSingle();
2547         assertTrue(p.hashCode() != 0);
2548     }
2549 
2550     @Test
2551     public void testAutomappedWithParametersThatProvokesMoreThanOneQuery() {
2552         Database.test() //
2553                 .select("select name, score from person where name=?") //
2554                 .parameters("FRED", "FRED") //
2555                 .autoMap(Person2.class) //
2556                 .doOnNext(DatabaseTest::println) //
2557                 .test() //
2558                 .awaitDone(2000, TimeUnit.SECONDS) //
2559                 .assertNoErrors() //
2560                 .assertValueCount(2) //
2561                 .assertComplete();
2562     }
2563 
2564     @Test
2565     public void testAutomappedObjectsEqualsAndHashCodeIsDistinctOnValues() {
2566         Database.test() //
2567                 .select("select name, score from person where name=?") //
2568                 .parameters("FRED", "FRED") //
2569                 .autoMap(Person2.class) //
2570                 .distinct() //
2571                 .doOnNext(DatabaseTest::println) //
2572                 .test() //
2573                 .awaitDone(2000, TimeUnit.SECONDS) //
2574                 .assertNoErrors() //
2575                 .assertValueCount(1) //
2576                 .assertComplete();
2577     }
2578 
2579     @Test
2580     public void testAutomappedObjectsEqualsDifferentiatesDifferentInterfacesWithSameMethodNamesAndValues() {
2581         PersonDistinct1 p1 = Database.test() //
2582                 .select("select name, score from person where name=?") //
2583                 .parameters("FRED") //
2584                 .autoMap(PersonDistinct1.class) //
2585                 .blockingFirst();
2586 
2587         PersonDistinct2 p2 = Database.test() //
2588                 .select("select name, score from person where name=?") //
2589                 .parameters("FRED") //
2590                 .autoMap(PersonDistinct2.class) //
2591                 .blockingFirst();
2592 
2593         assertNotEquals(p1, p2);
2594     }
2595 
2596     @Test
2597     public void testAutomappedObjectsWhenDefaultMethodInvoked() {
2598         // only run test if java 8
2599         if (System.getProperty("java.version").startsWith("1.8.")) {
2600             PersonWithDefaultMethod p = Database.test() //
2601                     .select("select name, score from person where name=?") //
2602                     .parameters("FRED") //
2603                     .autoMap(PersonWithDefaultMethod.class) //
2604                     .blockingFirst();
2605             assertEquals("fred", p.nameLower());
2606         }
2607     }
2608 
2609     @Test(expected = AutomappedInterfaceInaccessibleException.class)
2610     public void testAutomappedObjectsWhenDefaultMethodInvokedAndIsNonPublicThrows() {
2611         PersonWithDefaultMethodNonPublic p = Database.test() //
2612                 .select("select name, score from person where name=?") //
2613                 .parameters("FRED") //
2614                 .autoMap(PersonWithDefaultMethodNonPublic.class) //
2615                 .blockingFirst();
2616         p.nameLower();
2617     }
2618 
2619     @Test
2620     public void testBlockingDatabase() {
2621         Database db = blocking();
2622         db.select("select score from person where name=?") //
2623                 .parameters("FRED", "JOSEPH") //
2624                 .getAs(Integer.class) //
2625                 .test() //
2626                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2627                 .assertNoErrors() //
2628                 .assertValues(21, 34) //
2629                 .assertComplete();
2630     }
2631 
2632     @Test
2633     public void testBlockingDatabaseTransacted() {
2634         Database db = blocking();
2635         db.select("select score from person where name=?") //
2636                 .parameters("FRED", "JOSEPH") //
2637                 .transactedValuesOnly() //
2638                 .getAs(Integer.class) //
2639                 .map(x -> x.value()) //
2640                 .test() //
2641                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2642                 .assertNoErrors() //
2643                 .assertValues(21, 34) //
2644                 .assertComplete();
2645     }
2646 
2647     @Test
2648     public void testBlockingDatabaseTransactedNested() {
2649         Database db = blocking();
2650         db.select("select score from person where name=?") //
2651                 .parameters("FRED", "JOSEPH") //
2652                 .transactedValuesOnly() //
2653                 .getAs(Integer.class) //
2654                 .flatMap(tx -> tx.select("select name from person where score=?") //
2655                         .parameter(tx.value()) //
2656                         .valuesOnly() //
2657                         .getAs(String.class))
2658                 .test() //
2659                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2660                 .assertNoErrors() //
2661                 .assertValues("FRED", "JOSEPH") //
2662                 .assertComplete();
2663     }
2664 
2665     @Test
2666     public void testUsingNormalJDBCApi() {
2667         Database db = db(1);
2668         db.apply(con -> {
2669             try (PreparedStatement stmt = con
2670                     .prepareStatement("select count(*) from person where name='FRED'");
2671                     ResultSet rs = stmt.executeQuery()) {
2672                 rs.next();
2673                 return rs.getInt(1);
2674             }
2675         }) //
2676                 .test() //
2677                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2678                 .assertValue(1) //
2679                 .assertComplete();
2680 
2681         // now check that the connection was returned to the pool
2682         db.select("select count(*) from person where name='FRED'") //
2683                 .getAs(Integer.class) //
2684                 .test() //
2685                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2686                 .assertValue(1) //
2687                 .assertComplete();
2688     }
2689 
2690     @Test
2691     public void testUsingNormalJDBCApiCompletable() {
2692         Database db = db(1);
2693         db.apply(con -> {
2694             try (PreparedStatement stmt = con
2695                     .prepareStatement("select count(*) from person where name='FRED'");
2696                     ResultSet rs = stmt.executeQuery()) {
2697                 rs.next();
2698             }
2699         }) //
2700                 .test() //
2701                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2702                 .assertComplete();
2703 
2704         // now check that the connection was returned to the pool
2705         db.select("select count(*) from person where name='FRED'") //
2706                 .getAs(Integer.class) //
2707                 .test() //
2708                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2709                 .assertValue(1) //
2710                 .assertComplete();
2711     }
2712 
2713     @Test
2714     public void testCallableStatement() {
2715         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2716         assertTrue(db.apply(con -> {
2717             try (Statement stmt = con.createStatement()) {
2718                 CallableStatement st = con.prepareCall("call getPersonCount(?, ?)");
2719                 st.setInt(1, 0);
2720                 st.registerOutParameter(2, Types.INTEGER);
2721                 st.execute();
2722                 assertEquals(2, st.getInt(2));
2723             }
2724         }).blockingAwait(TIMEOUT_SECONDS, TimeUnit.SECONDS));
2725     }
2726 
2727     @Test
2728     public void testCallableStatementReturningResultSets() {
2729         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2730         db.apply(con -> {
2731             try (Statement stmt = con.createStatement()) {
2732                 CallableStatement st = con.prepareCall("call in1out0rs2(?)");
2733                 st.setInt(1, 0);
2734                 st.execute();
2735                 ResultSet rs1 = st.getResultSet();
2736                 st.getMoreResults(Statement.KEEP_CURRENT_RESULT);
2737                 ResultSet rs2 = st.getResultSet();
2738                 rs1.next();
2739                 assertEquals("FRED", rs1.getString(1));
2740                 rs2.next();
2741                 assertEquals("SARAH", rs2.getString(1));
2742             }
2743         }).blockingAwait(TIMEOUT_SECONDS, TimeUnit.SECONDS);
2744     }
2745 
2746     @Test
2747     public void testCallableApiNoParameters() {
2748         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2749         db //
2750                 .call("call zero()") //
2751                 .once() //
2752                 .test() //
2753                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2754                 .assertComplete();
2755     }
2756 
2757     @Test
2758     public void testCallableApiNoParametersTransacted() {
2759         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2760         db //
2761                 .call("call zero()") //
2762                 .transacted() //
2763                 .once() //
2764                 .test() //
2765                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2766                 .assertValueCount(1) //
2767                 .assertValue(x -> x.isComplete()) //
2768                 .assertComplete();
2769     }
2770 
2771     @Test
2772     public void testCallableApiOneInOutParameter() {
2773         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2774         db //
2775                 .call("call inout1(?)") //
2776                 .inOut(Type.INTEGER, Integer.class) //
2777                 .input(4) //
2778                 .test() //
2779                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2780                 .assertValue(5) //
2781                 .assertComplete();
2782     }
2783 
2784     @Test
2785     public void testCallableApiOneInOutParameterTransacted() {
2786         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2787         db //
2788                 .call("call inout1(?)") //
2789                 .transacted() //
2790                 .inOut(Type.INTEGER, Integer.class) //
2791                 .input(4) //
2792                 .flatMap(Tx.flattenToValuesOnly()) //
2793                 .test() //
2794                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2795                 .assertValue(5) //
2796                 .assertComplete();
2797     }
2798 
2799     @Test
2800     public void testCallableApiTwoInOutParameters() {
2801         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2802         db //
2803                 .call("call inout2(?, ?)") //
2804                 .inOut(Type.INTEGER, Integer.class) //
2805                 .inOut(Type.INTEGER, Integer.class) //
2806                 .input(4, 10) //
2807                 .test() //
2808                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2809                 .assertValue(x -> x._1() == 5 && x._2() == 12) //
2810                 .assertComplete();
2811     }
2812 
2813     @Test
2814     public void testCallableApiTwoInOutParametersTransacted() {
2815         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2816         db //
2817                 .call("call inout2(?, ?)") //
2818                 .transacted() //
2819                 .inOut(Type.INTEGER, Integer.class) //
2820                 .inOut(Type.INTEGER, Integer.class) //
2821                 .input(4, 10) //
2822                 .flatMap(Tx.flattenToValuesOnly()) //
2823                 .test() //
2824                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2825                 .assertValue(x -> x._1() == 5 && x._2() == 12) //
2826                 .assertComplete();
2827     }
2828 
2829     @Test
2830     public void testCallableApiThreeInOutParameters() {
2831         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2832         db //
2833                 .call("call inout3(?, ?, ?)") //
2834                 .inOut(Type.INTEGER, Integer.class) //
2835                 .inOut(Type.INTEGER, Integer.class) //
2836                 .inOut(Type.INTEGER, Integer.class) //
2837                 .input(4, 10, 13) //
2838                 .test() //
2839                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2840                 .assertValue(x -> x._1() == 5 && x._2() == 12 && x._3() == 16) //
2841                 .assertComplete();
2842     }
2843 
2844     @Test
2845     public void testCallableApiThreeInOutParametersTransacted() {
2846         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2847         db //
2848                 .call("call inout3(?, ?, ?)") //
2849                 .transacted() //
2850                 .inOut(Type.INTEGER, Integer.class) //
2851                 .inOut(Type.INTEGER, Integer.class) //
2852                 .inOut(Type.INTEGER, Integer.class) //
2853                 .input(4, 10, 13) //
2854                 .flatMap(Tx.flattenToValuesOnly()) //
2855                 .test() //
2856                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2857                 .assertValue(x -> x._1() == 5 && x._2() == 12 && x._3() == 16) //
2858                 .assertComplete();
2859     }
2860 
2861     @Test
2862     public void testCallableApiReturningOneOutParameter() throws InterruptedException {
2863         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2864         db //
2865                 .call("call in1out1(?,?)") //
2866                 .in() //
2867                 .out(Type.INTEGER, Integer.class) //
2868                 .input(0, 10, 20) //
2869                 .test() //
2870                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2871                 .assertValues(0, 10, 20) //
2872                 .assertComplete();
2873     }
2874 
2875     @Test
2876     public void testCallableApiReturningOneOutParameterTransacted() throws InterruptedException {
2877         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2878         db //
2879                 .call("call in1out1(?,?)") //
2880                 .transacted() //
2881                 .in() //
2882                 .out(Type.INTEGER, Integer.class) //
2883                 .input(0, 10, 20) //
2884                 .flatMap(Tx.flattenToValuesOnly()) //
2885                 .test() //
2886                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2887                 .assertValues(0, 10, 20) //
2888                 .assertComplete();
2889     }
2890 
2891     @Test
2892     public void testCallableApiReturningTwoOutParameters() throws InterruptedException {
2893         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2894         db //
2895                 .call("call in1out2(?,?,?)") //
2896                 .in() //
2897                 .out(Type.INTEGER, Integer.class) //
2898                 .out(Type.INTEGER, Integer.class) //
2899                 .input(0, 10, 20) //
2900                 .test() //
2901                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2902                 .assertValueCount(3) //
2903                 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1) //
2904                 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11) //
2905                 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21) //
2906                 .assertComplete();
2907 
2908         db.call("call in1out2(?,?,?)").in().out(Type.INTEGER, Integer.class)
2909                 .out(Type.INTEGER, Integer.class).input(0, 10, 20)
2910                 .blockingForEach(DatabaseTest::println);
2911     }
2912 
2913     @Test
2914     public void testCallableApiReturningTwoOutParametersTransacted() throws InterruptedException {
2915         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2916         db //
2917                 .call("call in1out2(?,?,?)") //
2918                 .transacted() //
2919                 .in() //
2920                 .out(Type.INTEGER, Integer.class) //
2921                 .out(Type.INTEGER, Integer.class) //
2922                 .input(0, 10, 20) //
2923                 .flatMap(Tx.flattenToValuesOnly()) //
2924                 .test() //
2925                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2926                 .assertValueCount(3) //
2927                 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1) //
2928                 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11) //
2929                 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21) //
2930                 .assertComplete();
2931 
2932         db.call("call in1out2(?,?,?)") //
2933                 .in() //
2934                 .out(Type.INTEGER, Integer.class) //
2935                 .out(Type.INTEGER, Integer.class) //
2936                 .input(0, 10, 20) //
2937                 .blockingForEach(DatabaseTest::println);
2938     }
2939 
2940     @Test
2941     public void testCallableApiReturningThreeOutParameters() throws InterruptedException {
2942         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2943         db //
2944                 .call("call in1out3(?,?,?,?)") //
2945                 .in() //
2946                 .out(Type.INTEGER, Integer.class) //
2947                 .out(Type.INTEGER, Integer.class) //
2948                 .out(Type.INTEGER, Integer.class) //
2949                 .input(0, 10, 20) //
2950                 .test() //
2951                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2952                 .assertValueCount(3) //
2953                 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1 && x._3() == 2) //
2954                 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11 && x._3() == 12) //
2955                 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21 && x._3() == 22) //
2956                 .assertComplete();
2957     }
2958 
2959     @Test
2960     public void testCallableApiReturningThreeOutParametersTransacted() throws InterruptedException {
2961         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2962         db //
2963                 .call("call in1out3(?,?,?,?)") //
2964                 .transacted() //
2965                 .in() //
2966                 .out(Type.INTEGER, Integer.class) //
2967                 .out(Type.INTEGER, Integer.class) //
2968                 .out(Type.INTEGER, Integer.class) //
2969                 .input(0, 10, 20) //
2970                 .flatMap(Tx.flattenToValuesOnly()) //
2971                 .test() //
2972                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2973                 .assertValueCount(3) //
2974                 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1 && x._3() == 2) //
2975                 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11 && x._3() == 12) //
2976                 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21 && x._3() == 22) //
2977                 .assertComplete();
2978     }
2979 
2980     @Test
2981     public void testCallableApiReturningOneResultSet() throws InterruptedException {
2982         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2983         db //
2984                 .call("call in0out0rs1()") //
2985                 .autoMap(Person2.class) //
2986                 .input(0, 10, 20) //
2987                 .doOnNext(x -> {
2988                     assertTrue(x.outs().isEmpty());
2989                 }) //
2990                 .flatMap(x -> x.results()) //
2991                 .test() //
2992                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
2993                 .assertValueAt(0, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
2994                 .assertValueAt(1, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
2995                 .assertValueAt(2, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
2996                 .assertValueAt(3, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
2997                 .assertValueAt(4, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
2998                 .assertValueAt(5, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
2999                 .assertValueCount(6) //
3000                 .assertComplete();
3001     }
3002 
3003     @Test
3004     public void testCallableApiReturningOneResultSetTransacted() throws InterruptedException {
3005         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3006         db //
3007                 .call("call in0out0rs1()") //
3008                 .transacted() //
3009                 .autoMap(Person2.class) //
3010                 .input(0, 10, 20) //
3011                 .flatMap(Tx.flattenToValuesOnly()).doOnNext(x -> {
3012                     assertTrue(x.outs().isEmpty());
3013                 }) //
3014                 .flatMap(x -> x.results()) //
3015                 .test() //
3016                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3017                 .assertValueAt(0, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3018                 .assertValueAt(1, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3019                 .assertValueAt(2, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3020                 .assertValueAt(3, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3021                 .assertValueAt(4, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3022                 .assertValueAt(5, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3023                 .assertValueCount(6) //
3024                 .assertComplete();
3025     }
3026 
3027     @Test
3028     public void testCallableApiReturningTwoResultSetsWithAutoMap() throws InterruptedException {
3029         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3030         db //
3031                 .call("call in1out0rs2(?)") //
3032                 .in() //
3033                 .autoMap(Person2.class) //
3034                 .autoMap(Person2.class) //
3035                 .input(0, 10, 20) //
3036                 .doOnNext(x -> assertTrue(x.outs().isEmpty())) //
3037                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())) //
3038                 .test() //
3039                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3040                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3041                         "SARAHFRED") //
3042                 .assertComplete();
3043     }
3044 
3045     @Test
3046     public void testCallableApiReturningTwoResultSetsWithAutoMapTransacted()
3047             throws InterruptedException {
3048         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3049         db //
3050                 .call("call in1out0rs2(?)") //
3051                 .transacted() //
3052                 .in() //
3053                 .autoMap(Person2.class) //
3054                 .autoMap(Person2.class) //
3055                 .input(0, 10, 20) //
3056                 .flatMap(Tx.flattenToValuesOnly()) //
3057                 .doOnNext(x -> assertTrue(x.outs().isEmpty())) //
3058                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())) //
3059                 .test() //
3060                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3061                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3062                         "SARAHFRED") //
3063                 .assertComplete();
3064     }
3065 
3066     @Test
3067     public void testCallableApiReturningTwoResultSetsWithGet() throws InterruptedException {
3068         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3069         db //
3070                 .call("call in1out0rs2(?)") //
3071                 .in() //
3072                 .getAs(String.class, Integer.class) //
3073                 .getAs(String.class, Integer.class)//
3074                 .input(0, 10, 20) //
3075                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y._1() + z._1())) //
3076                 .test() //
3077                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3078                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3079                         "SARAHFRED") //
3080                 .assertComplete();
3081     }
3082 
3083     @Test
3084     public void testCallableApiReturningTwoResultSetsWithGetTransacted()
3085             throws InterruptedException {
3086         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3087         db //
3088                 .call("call in1out0rs2(?)") //
3089                 .transacted() //
3090                 .in() //
3091                 .getAs(String.class, Integer.class) //
3092                 .getAs(String.class, Integer.class)//
3093                 .input(0, 10, 20) //
3094                 .flatMap(Tx.flattenToValuesOnly()) //
3095                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y._1() + z._1())) //
3096                 .test() //
3097                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3098                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3099                         "SARAHFRED") //
3100                 .assertComplete();
3101     }
3102 
3103     @Test
3104     public void testCallableApiReturningTwoResultSetsSwitchOrder1() throws InterruptedException {
3105         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3106         db //
3107                 .call("call in1out0rs2(?)") //
3108                 .autoMap(Person2.class) //
3109                 .in() //
3110                 .autoMap(Person2.class) //
3111                 .input(0, 10, 20) //
3112                 .doOnNext(x -> assertTrue(x.outs().isEmpty())) //
3113                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())) //
3114                 .test() //
3115                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3116                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3117                         "SARAHFRED") //
3118                 .assertComplete();
3119     }
3120 
3121     @Test
3122     public void testCallableApiReturningTwoResultSetsSwitchOrder1Transacted()
3123             throws InterruptedException {
3124         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3125         db //
3126                 .call("call in1out0rs2(?)") //
3127                 .transacted() //
3128                 .autoMap(Person2.class) //
3129                 .in() //
3130                 .autoMap(Person2.class) //
3131                 .input(0, 10, 20) //
3132                 .flatMap(Tx.flattenToValuesOnly()) //
3133                 .doOnNext(x -> assertTrue(x.outs().isEmpty())) //
3134                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())) //
3135                 .test() //
3136                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3137                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3138                         "SARAHFRED") //
3139                 .assertComplete();
3140     }
3141 
3142     @Test
3143     public void testCallableApiReturningTwoResultSetsSwitchOrder2() throws InterruptedException {
3144         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3145         db //
3146                 .call("call in1out0rs2(?)") //
3147                 .autoMap(Person2.class) //
3148                 .autoMap(Person2.class) //
3149                 .in() //
3150                 .input(0, 10, 20) //
3151                 .doOnNext(x -> assertTrue(x.outs().isEmpty())) //
3152                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())) //
3153                 .test() //
3154                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3155                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3156                         "SARAHFRED") //
3157                 .assertComplete();
3158     }
3159 
3160     @Test
3161     public void testCallableApiReturningTwoResultSetsSwitchOrder2Transacted()
3162             throws InterruptedException {
3163         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3164         db //
3165                 .call("call in1out0rs2(?)") //
3166                 .transacted() //
3167                 .autoMap(Person2.class) //
3168                 .autoMap(Person2.class) //
3169                 .in() //
3170                 .input(0, 10, 20) //
3171                 .flatMap(Tx.flattenToValuesOnly()) //
3172                 .doOnNext(x -> assertTrue(x.outs().isEmpty())) //
3173                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())) //
3174                 .test() //
3175                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3176                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3177                         "SARAHFRED") //
3178                 .assertComplete();
3179     }
3180 
3181     @Test
3182     public void testCallableApiReturningTwoOutputThreeResultSets() throws InterruptedException {
3183         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3184         db //
3185                 .call("call in0out2rs3(?, ?)") //
3186                 .out(Type.INTEGER, Integer.class) //
3187                 .out(Type.INTEGER, Integer.class) //
3188                 .autoMap(Person2.class) //
3189                 .autoMap(Person2.class) //
3190                 .autoMap(Person2.class) //
3191                 .input(0, 10, 20) //
3192                 .doOnNext(x -> {
3193                     assertEquals(2, x.outs().size());
3194                     assertEquals(1, x.outs().get(0));
3195                     assertEquals(2, x.outs().get(1));
3196                 }) //
3197                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())
3198                         .zipWith(x.results3(), (y, z) -> y + z.name())) //
3199                 .test() //
3200                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3201                 .assertNoErrors() //
3202                 .assertValues("FREDSARAHFRED", "SARAHFREDSARAH", "FREDSARAHFRED", "SARAHFREDSARAH",
3203                         "FREDSARAHFRED", "SARAHFREDSARAH") //
3204                 .assertComplete();
3205     }
3206 
3207     @Test
3208     public void testCallableApiReturningTwoOutputThreeResultSetsTransacted()
3209             throws InterruptedException {
3210         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3211         db //
3212                 .call("call in0out2rs3(?, ?)") //
3213                 .transacted() //
3214                 .out(Type.INTEGER, Integer.class) //
3215                 .out(Type.INTEGER, Integer.class) //
3216                 .autoMap(Person2.class) //
3217                 .autoMap(Person2.class) //
3218                 .autoMap(Person2.class) //
3219                 .input(0, 10, 20) //
3220                 .flatMap(Tx.flattenToValuesOnly()) //
3221                 .doOnNext(x -> {
3222                     assertEquals(2, x.outs().size());
3223                     assertEquals(1, x.outs().get(0));
3224                     assertEquals(2, x.outs().get(1));
3225                 }) //
3226                 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())
3227                         .zipWith(x.results3(), (y, z) -> y + z.name())) //
3228                 .test() //
3229                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3230                 .assertNoErrors() //
3231                 .assertValues("FREDSARAHFRED", "SARAHFREDSARAH", "FREDSARAHFRED", "SARAHFREDSARAH",
3232                         "FREDSARAHFRED", "SARAHFREDSARAH") //
3233                 .assertComplete();
3234     }
3235 
3236     @Test
3237     public void testCallableApiReturningTenOutParameters() {
3238         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3239         db //
3240                 .call("call out10(?,?,?,?,?,?,?,?,?,?)") //
3241                 .out(Type.INTEGER, Integer.class) //
3242                 .out(Type.INTEGER, Integer.class) //
3243                 .out(Type.INTEGER, Integer.class) //
3244                 .out(Type.INTEGER, Integer.class) //
3245                 .out(Type.INTEGER, Integer.class) //
3246                 .out(Type.INTEGER, Integer.class) //
3247                 .out(Type.INTEGER, Integer.class) //
3248                 .out(Type.INTEGER, Integer.class) //
3249                 .out(Type.INTEGER, Integer.class) //
3250                 .out(Type.INTEGER, Integer.class) //
3251                 .input(0, 10) //
3252                 .test() //
3253                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3254                 .assertNoErrors() //
3255                 .assertValueCount(2) //
3256                 .assertComplete();
3257     }
3258 
3259     @Test
3260     public void testCallableApiReturningTenOutParametersTransacted() {
3261         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3262         db //
3263                 .call("call out10(?,?,?,?,?,?,?,?,?,?)") //
3264                 .transacted() //
3265                 .out(Type.INTEGER, Integer.class) //
3266                 .out(Type.INTEGER, Integer.class) //
3267                 .out(Type.INTEGER, Integer.class) //
3268                 .out(Type.INTEGER, Integer.class) //
3269                 .out(Type.INTEGER, Integer.class) //
3270                 .out(Type.INTEGER, Integer.class) //
3271                 .out(Type.INTEGER, Integer.class) //
3272                 .out(Type.INTEGER, Integer.class) //
3273                 .out(Type.INTEGER, Integer.class) //
3274                 .out(Type.INTEGER, Integer.class) //
3275                 .input(0, 10) //
3276                 .flatMap(Tx.flattenToValuesOnly()) //
3277                 .test() //
3278                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3279                 .assertNoErrors() //
3280                 .assertValueCount(2) //
3281                 .assertComplete();
3282     }
3283 
3284     @Test
3285     public void testCallableApiReturningTenResultSets() {
3286         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3287         db //
3288                 .call("call rs10()") //
3289                 .autoMap(Person2.class) //
3290                 .autoMap(Person2.class) //
3291                 .autoMap(Person2.class) //
3292                 .autoMap(Person2.class) //
3293                 .autoMap(Person2.class) //
3294                 .autoMap(Person2.class) //
3295                 .autoMap(Person2.class) //
3296                 .autoMap(Person2.class) //
3297                 .autoMap(Person2.class) //
3298                 .autoMap(Person2.class) //
3299                 .input(0, 10) //
3300                 .doOnNext(x -> {
3301                     assertEquals(0, x.outs().size());
3302                     assertEquals(10, x.results().size());
3303                 }) //
3304                    // just zip the first and last result sets
3305                 .flatMap(x -> x.results(0) //
3306                         .zipWith(x.results(9), //
3307                                 (y, z) -> ((Person2) y).name() + ((Person2) z).name()))
3308                 .test() //
3309                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3310                 .assertNoErrors() //
3311                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED") //
3312                 .assertComplete();
3313     }
3314 
3315     @Test
3316     public void testCallableApiReturningTenResultSetsTransacted() {
3317         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3318         db //
3319                 .call("call rs10()") //
3320                 .transacted() //
3321                 .autoMap(Person2.class) //
3322                 .autoMap(Person2.class) //
3323                 .autoMap(Person2.class) //
3324                 .autoMap(Person2.class) //
3325                 .autoMap(Person2.class) //
3326                 .autoMap(Person2.class) //
3327                 .autoMap(Person2.class) //
3328                 .autoMap(Person2.class) //
3329                 .autoMap(Person2.class) //
3330                 .autoMap(Person2.class) //
3331                 .input(0, 10) //
3332                 .flatMap(Tx.flattenToValuesOnly()) //
3333                 .doOnNext(x -> {
3334                     assertEquals(0, x.outs().size());
3335                     assertEquals(10, x.results().size());
3336                 }) //
3337                    // just zip the first and last result sets
3338                 .flatMap(x -> x.results(0) //
3339                         .zipWith(x.results(9), //
3340                                 (y, z) -> ((Person2) y).name() + ((Person2) z).name()))
3341                 .test() //
3342                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3343                 .assertNoErrors() //
3344                 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED") //
3345                 .assertComplete();
3346     }
3347 
3348     @Test
3349     public void testCallableApiReturningOneResultSetGetAs() throws InterruptedException {
3350         Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3351         db //
3352                 .call("call in0out0rs1()") //
3353                 .getAs(String.class, Integer.class) //
3354                 .input(1) //
3355                 .flatMap(x -> x.results()) //
3356                 .test() //
3357                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3358                 .assertValueAt(0, p -> "FRED".equalsIgnoreCase(p._1()) && p._2() == 24) //
3359                 .assertValueAt(1, p -> "SARAH".equalsIgnoreCase(p._1()) && p._2() == 26) //
3360                 .assertComplete();
3361     }
3362 
3363     @Test
3364     public void testH2InClauseWithoutSetArray() {
3365         db().apply(con -> {
3366             try (PreparedStatement ps = con
3367                     .prepareStatement("select count(*) from person where name in (?, ?)")) {
3368                 ps.setString(1, "FRED");
3369                 ps.setString(2, "JOSEPH");
3370                 ResultSet rs = ps.executeQuery();
3371                 rs.next();
3372                 return rs.getInt(1);
3373             }
3374         }).test() //
3375                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS).assertComplete() // ;
3376                 .assertValue(2); //
3377     }
3378 
3379     @Test
3380     @Ignore
3381     public void testH2InClauseWithSetArray() {
3382         db().apply(con -> {
3383             try (PreparedStatement ps = con
3384                     .prepareStatement("select count(*) from person where name in (?)")) {
3385                 ps.setArray(1, con.createArrayOf("VARCHAR", new String[] {"FRED", "JOSEPH"}));
3386                 ResultSet rs = ps.executeQuery();
3387                 rs.next();
3388                 return rs.getInt(1);
3389             }
3390         }).test() //
3391                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS).assertComplete() // ;
3392                 .assertValue(2); //
3393     }
3394 
3395     @Test
3396     public void testUpdateTxPerformed() {
3397         Database db = db(1);
3398         db.update("update person set score = 1000") //
3399                 .transacted() //
3400                 .counts() //
3401                 .test() //
3402                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3403                 .assertComplete() //
3404                 .assertValueCount(2); // value and complete
3405 
3406         db.select("select count(*) from person where score=1000") //
3407                 .getAs(Integer.class) //
3408                 .test() //
3409                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3410                 .assertComplete() //
3411                 .assertValue(3);
3412 
3413     }
3414 
3415     @Test
3416     public void testIssue20AutoCommitEnabledAndConnectionThrowsOnCommit() {
3417         ConnectionProvider cp = DatabaseCreator.connectionProvider();
3418         Database db = Database.fromBlocking(new ConnectionProvider() {
3419 
3420             @Override
3421             public Connection get() {
3422                 Connection c = cp.get();
3423                 try {
3424                     c.setAutoCommit(true);
3425                 } catch (SQLException e) {
3426                     throw new SQLRuntimeException(e);
3427                 }
3428                 return new DelegatedConnection() {
3429 
3430                     @Override
3431                     public Connection con() {
3432                         return c;
3433                     }
3434 
3435                     @Override
3436                     public void commit() throws SQLException {
3437                         log.debug("COMMITTING");
3438                         if (this.getAutoCommit()) {
3439                             throw new SQLException("cannot commit when autoCommit is true");
3440                         } else {
3441                             con().commit();
3442                         }
3443                     }
3444 
3445                 };
3446             }
3447 
3448             @Override
3449             public void close() {
3450                 // do nothing
3451             }
3452         });
3453         db.update("insert into note(text) values(?)") //
3454                 .parameters("HI", "THERE") //
3455                 .returnGeneratedKeys() //
3456                 .getAs(Integer.class)//
3457                 .test() //
3458                 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3459                 .assertValues(1, 2) //
3460                 .assertComplete();
3461     }
3462 
3463     @Test
3464     public void testMapInTransactionIssue35() {
3465         Database.test() //
3466                 .select(Person10.class) //
3467                 .transacted() //
3468                 .valuesOnly() //
3469                 .get() //
3470                 .map(p -> p.name()) //
3471                 .blockingForEach(log::debug);
3472     }
3473 
3474     private static final class Plugins {
3475 
3476         private static final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
3477 
3478         static void reset() {
3479             list.clear();
3480             RxJavaPlugins.setErrorHandler(e -> list.add(e));
3481         }
3482 
3483         static Throwable getSingleError() {
3484             assertEquals(1, list.size());
3485             RxJavaPlugins.reset();
3486             return list.get(0);
3487         }
3488     }
3489 
3490     @Test
3491     public void testIssue27ConnectionErrorReportedToRxJavaPlugins() {
3492         try (Database db = Database.nonBlocking()
3493                 .url("jdbc:driverdoesnotexist://doesnotexist:1527/notThere").build()) {
3494             Plugins.reset();
3495             db.select("select count(*) from person") //
3496                     .getAs(Long.class) //
3497                     .test() //
3498                     .awaitDone(TIMEOUT_SECONDS / 5, TimeUnit.SECONDS) //
3499                     .assertNoValues() //
3500                     .assertNotComplete() //
3501                     .assertNoErrors() //
3502                     .cancel();
3503             Throwable e = Plugins.getSingleError();
3504             assertTrue(e instanceof UndeliverableException);
3505             assertTrue(e.getMessage().toLowerCase().contains("no suitable driver"));
3506         }
3507     }
3508 
3509     @Test
3510     public void testAutoMapInTransactionIssue35() {
3511         try (Database db = Database.test()) {
3512             db.select(Person10.class) //
3513                     .transacted() //
3514                     .valuesOnly() //
3515                     .get(x -> x.name()) //
3516                     .sorted() //
3517                     .test() //
3518                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3519                     .assertValues("FRED", "JOSEPH", "MARMADUKE") //
3520                     .assertComplete();
3521         }
3522     }
3523 
3524     @Test
3525     @Ignore
3526     public void testAutoMapToEnum() {
3527         try (Database db = Database.test()) {
3528             db.select(PersonWithEnum.class) //
3529                     .get(x -> x.gender()) //
3530                     .test() //
3531                     .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3532                     .assertValues(Gender.MALE, Gender.FEMALE, Gender.MALE) //
3533                     .assertComplete();
3534         }
3535     }
3536     
3537     @Test
3538     public void testClosureOfTransactedSelectsIssue46() {
3539         try (Database db = db()) {
3540             for (int i = 0; i < 10; i++) {
3541                 db //
3542                         .select("select name, score from person") //
3543                         .transacted() //
3544                         .valuesOnly() //
3545                         .autoMap(Person2.class) //
3546                         .doOnNext(x -> log.debug(x.toString())) //
3547                         .test() //
3548                         .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
3549                         .assertValueCount(3) //
3550                         .assertComplete();
3551                 }
3552         }
3553     }
3554 
3555     public enum Gender {
3556         MALE("M"), FEMALE("F"), OTHER("O");
3557 
3558         private final String code;
3559 
3560         private Gender(String code) {
3561             this.code = code;
3562         }
3563 
3564         public static Gender fromCode(String code) {
3565             if (code == null) {
3566                 return null;
3567             }
3568             for (Gender g : Gender.values()) {
3569                 if (g.code.equals(code)) {
3570                     return g;
3571                 }
3572             }
3573             return OTHER;
3574         }
3575     }
3576 
3577     @Query("select name, gender from person order by name")
3578     public interface PersonWithEnum {
3579         @Column
3580         String name();
3581 
3582         @Column("gender")
3583         String genderCharacter();
3584 
3585         default Gender gender() {
3586             return Gender.fromCode(genderCharacter());
3587         }
3588     }
3589 
3590     public interface PersonWithDefaultMethod {
3591         @Column
3592         String name();
3593 
3594         @Column
3595         int score();
3596 
3597         public default String nameLower() {
3598             return name().toLowerCase();
3599         }
3600     }
3601 
3602     interface PersonWithDefaultMethodNonPublic {
3603         @Column
3604         String name();
3605 
3606         @Column
3607         int score();
3608 
3609         default String nameLower() {
3610             return name().toLowerCase();
3611         }
3612     }
3613 
3614     interface PersonDistinct1 {
3615         @Column
3616         String name();
3617 
3618         @Column
3619         int score();
3620 
3621     }
3622 
3623     interface PersonDistinct2 {
3624         @Column
3625         String name();
3626 
3627         @Column
3628         int score();
3629 
3630     }
3631 
3632     interface Score {
3633         @Column
3634         int score();
3635     }
3636 
3637     interface Person {
3638         @Column
3639         String name();
3640     }
3641 
3642     interface Person2 {
3643         @Column
3644         String name();
3645 
3646         @Column
3647         int score();
3648     }
3649 
3650     interface Person3 {
3651         @Column("name")
3652         String fullName();
3653 
3654         @Column("score")
3655         int examScore();
3656     }
3657 
3658     interface Person4 {
3659         @Column("namez")
3660         String fullName();
3661 
3662         @Column("score")
3663         int examScore();
3664     }
3665 
3666     interface Person5 {
3667         @Index(1)
3668         String fullName();
3669 
3670         @Index(2)
3671         int examScore();
3672     }
3673 
3674     interface Person6 {
3675         @Index(1)
3676         String fullName();
3677 
3678         @Index(3)
3679         int examScore();
3680     }
3681 
3682     interface Person7 {
3683         @Index(1)
3684         String fullName();
3685 
3686         @Index(0)
3687         int examScore();
3688     }
3689 
3690     interface Person8 {
3691         @Column
3692         int name();
3693     }
3694 
3695     interface Person9 {
3696         @Column
3697         String name();
3698 
3699         @Index(2)
3700         int score();
3701     }
3702 
3703     interface PersonNoAnnotation {
3704         String name();
3705     }
3706 
3707     @Query("select name, score from person order by name")
3708     interface Person10 {
3709 
3710         @Column
3711         String name();
3712 
3713         @Column
3714         int score();
3715     }
3716 
3717 }