1 package org.davidmoten.rx.jdbc;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertTrue;
7
8 import java.io.ByteArrayInputStream;
9 import java.io.ByteArrayOutputStream;
10 import java.io.File;
11 import java.io.FileNotFoundException;
12 import java.io.FileReader;
13 import java.io.IOException;
14 import java.io.InputStream;
15 import java.io.Reader;
16 import java.sql.Blob;
17 import java.sql.CallableStatement;
18 import java.sql.Clob;
19 import java.sql.Connection;
20 import java.sql.PreparedStatement;
21 import java.sql.ResultSet;
22 import java.sql.SQLException;
23 import java.sql.SQLSyntaxErrorException;
24 import java.sql.Statement;
25 import java.sql.Time;
26 import java.sql.Timestamp;
27 import java.sql.Types;
28 import java.time.Instant;
29 import java.time.ZoneOffset;
30 import java.time.ZonedDateTime;
31 import java.util.Arrays;
32 import java.util.Calendar;
33 import java.util.Date;
34 import java.util.GregorianCalendar;
35 import java.util.HashSet;
36 import java.util.List;
37 import java.util.Optional;
38 import java.util.Set;
39 import java.util.concurrent.CopyOnWriteArrayList;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.TimeoutException;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicInteger;
46
47 import org.apache.log4j.Level;
48 import org.apache.log4j.LogManager;
49 import org.davidmoten.rx.jdbc.annotations.Column;
50 import org.davidmoten.rx.jdbc.annotations.Index;
51 import org.davidmoten.rx.jdbc.annotations.Query;
52 import org.davidmoten.rx.jdbc.exceptions.AnnotationsNotFoundException;
53 import org.davidmoten.rx.jdbc.exceptions.AutomappedInterfaceInaccessibleException;
54 import org.davidmoten.rx.jdbc.exceptions.CannotForkTransactedConnection;
55 import org.davidmoten.rx.jdbc.exceptions.ColumnIndexOutOfRangeException;
56 import org.davidmoten.rx.jdbc.exceptions.ColumnNotFoundException;
57 import org.davidmoten.rx.jdbc.exceptions.MoreColumnsRequestedThanExistException;
58 import org.davidmoten.rx.jdbc.exceptions.NamedParameterFoundButSqlDoesNotHaveNamesException;
59 import org.davidmoten.rx.jdbc.exceptions.NamedParameterMissingException;
60 import org.davidmoten.rx.jdbc.exceptions.QueryAnnotationMissingException;
61 import org.davidmoten.rx.jdbc.exceptions.SQLRuntimeException;
62 import org.davidmoten.rx.jdbc.internal.DelegatedConnection;
63 import org.davidmoten.rx.jdbc.pool.DatabaseCreator;
64 import org.davidmoten.rx.jdbc.pool.DatabaseType;
65 import org.davidmoten.rx.jdbc.pool.NonBlockingConnectionPool;
66 import org.davidmoten.rx.jdbc.pool.Pools;
67 import org.davidmoten.rx.jdbc.tuple.Tuple2;
68 import org.davidmoten.rx.jdbc.tuple.Tuple3;
69 import org.davidmoten.rx.jdbc.tuple.Tuple4;
70 import org.davidmoten.rx.jdbc.tuple.Tuple5;
71 import org.davidmoten.rx.jdbc.tuple.Tuple6;
72 import org.davidmoten.rx.jdbc.tuple.Tuple7;
73 import org.davidmoten.rx.jdbc.tuple.TupleN;
74 import org.davidmoten.rx.pool.PoolClosedException;
75 import org.h2.jdbc.JdbcSQLException;
76 import org.hsqldb.jdbc.JDBCBlobFile;
77 import org.hsqldb.jdbc.JDBCClobFile;
78 import org.junit.Assert;
79 import org.junit.FixMethodOrder;
80 import org.junit.Ignore;
81 import org.junit.Test;
82 import org.junit.runners.MethodSorters;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
85
86 import com.github.davidmoten.guavamini.Lists;
87 import com.github.davidmoten.guavamini.Sets;
88
89 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
90 import io.reactivex.Completable;
91 import io.reactivex.Flowable;
92 import io.reactivex.Observable;
93 import io.reactivex.Scheduler;
94 import io.reactivex.Single;
95 import io.reactivex.exceptions.UndeliverableException;
96 import io.reactivex.functions.Predicate;
97 import io.reactivex.plugins.RxJavaPlugins;
98 import io.reactivex.schedulers.Schedulers;
99 import io.reactivex.schedulers.TestScheduler;
100 import io.reactivex.subscribers.TestSubscriber;
101
102 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
103 public class DatabaseTest {
104
105 private static final long FRED_REGISTERED_TIME = 1442515672690L;
106 private static final int NAMES_COUNT_BIG = 5163;
107 private static final Logger log = LoggerFactory.getLogger(DatabaseTest.class);
108 private static final int TIMEOUT_SECONDS = 10;
109
110 private static Database db() {
111 return DatabaseCreator.create(1);
112 }
113
114 private static Database blocking() {
115 return DatabaseCreator.createBlocking();
116 }
117
118 private static Database db(int poolSize) {
119 return DatabaseCreator.create(poolSize);
120 }
121
122 private static Database big(int poolSize) {
123 return DatabaseCreator.create(poolSize, true, Schedulers.computation());
124 }
125
126 @Test
127 public void testSelectUsingQuestionMark() {
128 try (Database db = db()) {
129 db.select("select score from person where name=?")
130 .parameters("FRED", "JOSEPH")
131 .getAs(Integer.class)
132 .test()
133 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
134 .assertNoErrors()
135 .assertValues(21, 34)
136 .assertComplete();
137 }
138 }
139
140 @Test
141 public void testDemonstrateDbClosureOnTerminate() {
142 Database db = db();
143 db.select("select score from person where name=?")
144 .parameters("FRED", "JOSEPH")
145 .getAs(Integer.class)
146 .doOnTerminate(() -> db.close())
147 .test()
148 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
149 .assertNoErrors()
150 .assertValues(21, 34)
151 .assertComplete();
152 }
153
154 @Test
155 public void testFromDataSource() {
156 ConnectionProvider cp = DatabaseCreator.connectionProvider();
157 AtomicInteger count = new AtomicInteger();
158 Set<Connection> connections = new HashSet<>();
159 AtomicInteger closed = new AtomicInteger();
160 Database db = Database.fromBlocking(new ConnectionProvider() {
161
162 @Override
163 public Connection get() {
164 count.incrementAndGet();
165 Connection c = cp.get();
166 connections.add(c);
167 return new DelegatedConnection() {
168
169 @Override
170 public Connection con() {
171 return c;
172 }
173
174 @Override
175 public void close() {
176 closed.incrementAndGet();
177 }
178 };
179 }
180
181 @Override
182 public void close() {
183
184 }
185 });
186 db.select("select count(*) from person")
187 .getAs(Integer.class)
188 .blockingSubscribe();
189 db.select("select count(*) from person")
190 .getAs(Integer.class)
191 .blockingSubscribe();
192 assertEquals(2, count.get());
193 assertEquals(2, connections.size());
194 assertEquals(2, closed.get());
195 }
196
197 @Test
198 public void testBlockingForEachWhenError() {
199 try {
200 Flowable
201 .error(new RuntimeException("boo"))
202 .blockingForEach(System.out::println);
203 } catch (RuntimeException e) {
204 assertEquals("boo", e.getMessage());
205 }
206 }
207
208 @Test
209 public void testSelectUsingQuestionMarkAndInClauseIssue10() {
210 Database.test()
211 .select("select score from person where name in (?) order by score")
212 .parameters("FRED", "JOSEPH")
213 .getAs(Integer.class)
214 .test()
215 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
216 .assertNoErrors()
217 .assertValues(21, 34)
218 .assertComplete();
219 }
220
221 @Test
222 public void testSelectUsingQuestionMarkAndInClauseWithSetParameter() {
223 Database.test()
224 .select("select score from person where name in (?) order by score")
225 .parameter(Sets.newHashSet("FRED", "JOSEPH"))
226 .getAs(Integer.class)
227 .test()
228 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
229 .assertNoErrors()
230 .assertValues(21, 34)
231 .assertComplete();
232 }
233
234 @Test
235 public void testUpdateWithInClauseBatchSize0() {
236 Database.test()
237 .update("update person set score=50 where name in (?)")
238 .batchSize(0)
239 .parameter(Sets.newHashSet("FRED", "JOSEPH"))
240 .counts()
241 .test()
242 .awaitDone(TIMEOUT_SECONDS, TimeUnit.DAYS)
243 .assertComplete()
244 .assertValues(2);
245 }
246
247 @Test
248 public void testUpdateWithInClauseBatchSize10() {
249 Database.test()
250 .update("update person set score=50 where name in (?)")
251 .batchSize(10)
252 .parameter(Sets.newHashSet("FRED", "JOSEPH"))
253 .counts()
254 .test()
255 .awaitDone(TIMEOUT_SECONDS, TimeUnit.DAYS)
256 .assertComplete()
257 .assertValues(2);
258 }
259
260 @Test
261 public void testSelectUsingQuestionMarkAndInClauseWithSetParameterUsingParametersMethod() {
262 Database.test()
263 .select("select score from person where name in (?) order by score")
264 .parameters(Sets.newHashSet("FRED", "JOSEPH"))
265 .getAs(Integer.class)
266 .test()
267 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
268 .assertNoErrors()
269 .assertValues(21, 34)
270 .assertComplete();
271 }
272
273 @Test
274 public void testSelectUsingInClauseWithListParameter() {
275 Database.test()
276 .select("select score from person where score > ? and name in (?) order by score")
277 .parameters(0, Lists.newArrayList("FRED", "JOSEPH"))
278 .getAs(Integer.class)
279 .test()
280 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
281 .assertNoErrors()
282 .assertValues(21, 34)
283 .assertComplete();
284 }
285
286 @Test
287 public void testSelectUsingInClauseWithNamedListParameter() {
288 Database.test()
289 .select("select score from person where score > :score and name in (:names) order by score")
290 .parameter("score", 0)
291 .parameter("names", Lists.newArrayList("FRED", "JOSEPH"))
292 .getAs(Integer.class)
293 .test()
294 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
295 .assertNoErrors()
296 .assertValues(21, 34)
297 .assertComplete();
298 }
299
300 @Test
301 public void testSelectUsingInClauseWithRepeatedNamedListParameter() {
302 Database.test()
303 .select("select score from person where name in (:names) and name in (:names) order by score")
304 .parameter("names", Lists.newArrayList("FRED", "JOSEPH"))
305 .getAs(Integer.class)
306 .test()
307 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
308 .assertNoErrors()
309 .assertValues(21, 34)
310 .assertComplete();
311 }
312
313 @Test
314 public void testSelectUsingInClauseWithRepeatedNamedListParameterAndRepeatedNonListParameter() {
315 Database.test()
316 .select("select score from person where name in (:names) and score >:score and name in (:names) and score >:score order by score")
317 .parameter("names", Lists.newArrayList("FRED", "JOSEPH"))
318 .parameter("score", 0)
319 .getAs(Integer.class)
320 .test()
321 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
322 .assertNoErrors()
323 .assertValues(21, 34)
324 .assertComplete();
325 }
326
327 @Test
328 public void testSelectUsingNamedParameterList() {
329 try (Database db = db()) {
330 db.select("select score from person where name=:name")
331 .parameters(Parameter.named("name", "FRED").value("JOSEPH").list())
332 .getAs(Integer.class)
333 .test()
334 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
335 .assertNoErrors()
336 .assertValues(21, 34)
337 .assertComplete();
338 }
339 }
340
341 @Test
342 public void testSelectUsingQuestionMarkFlowableParameters() {
343 try (Database db = db()) {
344 db.select("select score from person where name=?")
345 .parameterStream(Flowable.just("FRED", "JOSEPH"))
346 .getAs(Integer.class)
347 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
348 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
349 .assertNoErrors()
350 .assertValues(21, 34)
351 .assertComplete();
352 }
353 }
354
355 @Test
356 public void testSelectUsingQuestionMarkFlowableParametersInLists() {
357 try (Database db = db()) {
358 db.select("select score from person where name=?")
359 .parameterListStream(
360 Flowable.just(Arrays.asList("FRED"), Arrays.asList("JOSEPH")))
361 .getAs(Integer.class)
362 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
363 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
364 .assertNoErrors()
365 .assertValues(21, 34)
366 .assertComplete();
367 }
368 }
369
370 @Test
371 public void testDrivingSelectWithoutParametersUsingParameterStream() {
372 try (Database db = db()) {
373 db.select("select count(*) from person")
374 .parameters(1, 2, 3)
375 .getAs(Integer.class)
376 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
377 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
378 .assertValues(3, 3, 3)
379 .assertComplete();
380 }
381 }
382
383 @Test
384 public void testSelectUsingQuestionMarkFlowableParametersTwoParametersPerQuery() {
385 try (Database db = db()) {
386 db.select("select score from person where name=? and score = ?")
387 .parameterStream(Flowable.just("FRED", 21, "JOSEPH", 34))
388 .getAs(Integer.class)
389 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
390 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
391 .assertNoErrors()
392 .assertValues(21, 34)
393 .assertComplete();
394 }
395 }
396
397 @Test
398 public void testSelectUsingQuestionMarkFlowableParameterListsTwoParametersPerQuery() {
399 try (Database db = db()) {
400 db.select("select score from person where name=? and score = ?")
401 .parameterListStream(
402 Flowable.just(Arrays.asList("FRED", 21), Arrays.asList("JOSEPH", 34)))
403 .getAs(Integer.class)
404 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
405 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
406 .assertNoErrors()
407 .assertValues(21, 34)
408 .assertComplete();
409 }
410 }
411
412 @Test
413 public void testSelectUsingQuestionMarkWithPublicTestingDatabase() {
414 try (Database db = Database.test()) {
415 db
416 .select("select score from person where name=?")
417 .parameters("FRED", "JOSEPH")
418 .getAs(Integer.class)
419 .test()
420 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
421 .assertNoErrors()
422 .assertValues(21, 34)
423 .assertComplete();
424 }
425 }
426
427 @Test
428 public void testSelectWithFetchSize() {
429 try (Database db = db()) {
430 db.select("select score from person order by name")
431 .fetchSize(2)
432 .getAs(Integer.class)
433 .test()
434 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
435 .assertNoErrors()
436 .assertValues(21, 34, 25)
437 .assertComplete();
438 }
439 }
440
441 @Test
442 public void testSelectWithFetchSizeZero() {
443 try (Database db = db()) {
444 db.select("select score from person order by name")
445 .fetchSize(0)
446 .getAs(Integer.class)
447 .test()
448 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
449 .assertNoErrors()
450 .assertValues(21, 34, 25)
451 .assertComplete();
452 }
453 }
454
455 @Test(expected = IllegalArgumentException.class)
456 public void testSelectWithFetchSizeNegative() {
457 try (Database db = db()) {
458 db.select("select score from person order by name")
459 .fetchSize(-1);
460 }
461 }
462
463 @Test
464 public void testSelectUsingNonBlockingBuilder() {
465 NonBlockingConnectionPool pool = Pools
466 .nonBlocking()
467 .connectionProvider(DatabaseCreator.connectionProvider())
468 .maxIdleTime(1, TimeUnit.MINUTES)
469 .idleTimeBeforeHealthCheck(1, TimeUnit.MINUTES)
470 .connectionRetryInterval(1, TimeUnit.SECONDS)
471 .healthCheck(c -> c.prepareStatement("select 1").execute())
472 .maxPoolSize(3)
473 .build();
474
475 try (Database db = Database.from(pool)) {
476 db.select("select score from person where name=?")
477 .parameters("FRED", "JOSEPH")
478 .getAs(Integer.class)
479 .test()
480 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
481 .assertNoErrors()
482 .assertValues(21, 34)
483 .assertComplete();
484 }
485 }
486
487 @Test
488 public void testSelectSpecifyingHealthCheck() {
489 try (Database db = Database
490 .nonBlocking()
491 .connectionProvider(DatabaseCreator.connectionProvider())
492 .maxIdleTime(1, TimeUnit.MINUTES)
493 .idleTimeBeforeHealthCheck(1, TimeUnit.MINUTES)
494 .connectionRetryInterval(1, TimeUnit.SECONDS)
495 .healthCheck(DatabaseType.H2)
496 .maxPoolSize(3)
497 .build()) {
498 db.select("select score from person where name=?")
499 .parameters("FRED", "JOSEPH")
500 .getAs(Integer.class)
501 .test()
502 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
503 .assertNoErrors()
504 .assertValues(21, 34)
505 .assertComplete();
506 }
507 }
508
509 @Test(expected = IllegalArgumentException.class)
510 public void testCreateTestDatabaseWithZeroSizePoolThrows() {
511 Database.test(0);
512 }
513
514 @Test
515 public void testSelectSpecifyingHealthCheckAsSql() {
516 final AtomicInteger success = new AtomicInteger();
517 NonBlockingConnectionPool pool = Pools
518 .nonBlocking()
519 .connectionProvider(DatabaseCreator.connectionProvider())
520 .maxIdleTime(1, TimeUnit.MINUTES)
521 .healthCheck(DatabaseType.H2)
522 .idleTimeBeforeHealthCheck(1, TimeUnit.MINUTES)
523 .connectionRetryInterval(1, TimeUnit.SECONDS)
524 .healthCheck("select 1")
525 .connectionListener(error -> {
526 if (error.isPresent()) {
527 success.set(Integer.MIN_VALUE);
528 } else {
529 success.incrementAndGet();
530 }
531 })
532 .maxPoolSize(3)
533 .build();
534
535 try (Database db = Database.from(pool)) {
536 db.select("select score from person where name=?")
537 .parameters("FRED", "JOSEPH")
538 .getAs(Integer.class)
539 .test()
540 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
541 .assertNoErrors()
542 .assertValues(21, 34)
543 .assertComplete();
544 }
545 assertEquals(1, success.get());
546 }
547
548 @Test(expected = IllegalArgumentException.class)
549 public void testNonBlockingPoolWithTrampolineSchedulerThrows() {
550 Pools.nonBlocking().scheduler(Schedulers.trampoline());
551 }
552
553 @Test(timeout = 40000)
554 public void testSelectUsingNonBlockingBuilderConcurrencyTest()
555 throws InterruptedException, TimeoutException {
556 info();
557 try {
558 try (Database db = db(3)) {
559 Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(50));
560 int n = 10000;
561 CountDownLatch latch = new CountDownLatch(n);
562 AtomicInteger count = new AtomicInteger();
563 for (int i = 0; i < n; i++) {
564 db.select("select score from person where name=?")
565 .parameters("FRED", "JOSEPH")
566 .getAs(Integer.class)
567 .subscribeOn(scheduler)
568 .toList()
569 .doOnSuccess(x -> {
570 if (!x.equals(Lists.newArrayList(21, 34))) {
571 throw new RuntimeException("run broken");
572 } else {
573
574 }
575 })
576 .doOnSuccess(x -> {
577 count.incrementAndGet();
578 latch.countDown();
579 })
580 .doOnError(x -> latch.countDown())
581 .subscribe();
582 }
583 if (!latch.await(20, TimeUnit.SECONDS)) {
584 throw new TimeoutException("timeout");
585 }
586 assertEquals(n, count.get());
587 }
588 } finally {
589 debug();
590 }
591 }
592
593 @Test(timeout = 5000)
594 public void testSelectConcurrencyTest() throws InterruptedException, TimeoutException {
595 debug();
596 try {
597 try (Database db = db(1)) {
598 Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(2));
599 int n = 2;
600 CountDownLatch latch = new CountDownLatch(n);
601 AtomicInteger count = new AtomicInteger();
602 for (int i = 0; i < n; i++) {
603 db.select("select score from person where name=?")
604 .parameters("FRED", "JOSEPH")
605 .getAs(Integer.class)
606 .subscribeOn(scheduler)
607 .toList()
608 .doOnSuccess(x -> {
609 if (!x.equals(Lists.newArrayList(21, 34))) {
610 throw new RuntimeException("run broken");
611 }
612 })
613 .doOnSuccess(x -> {
614 count.incrementAndGet();
615 latch.countDown();
616 })
617 .doOnError(x -> latch.countDown())
618 .subscribe();
619 log.info("submitted " + i);
620 }
621 if (!latch.await(5000, TimeUnit.SECONDS)) {
622 throw new TimeoutException("timeout");
623 }
624 assertEquals(n, count.get());
625 }
626 } finally {
627 debug();
628 }
629 }
630
631 @Test
632 public void testDatabaseClose() {
633 try (Database db = db()) {
634 db.select("select score from person where name=?")
635 .parameters("FRED", "JOSEPH")
636 .getAs(Integer.class)
637 .test()
638 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
639 .assertNoErrors()
640 .assertValues(21, 34)
641 .assertComplete();
642 }
643 }
644
645 @Test
646 public void testSelectUsingName() {
647 try (Database db = db()) {
648 db
649 .select("select score from person where name=:name")
650 .parameter("name", "FRED")
651 .parameter("name", "JOSEPH")
652 .getAs(Integer.class)
653 .test()
654 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
655 .assertValues(21, 34)
656 .assertComplete();
657 }
658 }
659
660 @Test
661 public void testSelectUsingNameNotGiven() {
662 try (Database db = db()) {
663 db
664 .select("select score from person where name=:name and name<>:name2")
665 .parameter("name", "FRED")
666 .parameter("name", "JOSEPH")
667 .getAs(Integer.class)
668 .test()
669 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
670 .assertError(NamedParameterMissingException.class)
671 .assertNoValues();
672 }
673 }
674
675 @Test
676 public void testSelectUsingParameterNameNullNameWhenSqlHasNoNames() {
677 db()
678 .select("select score from person where name=?")
679 .parameter(Parameter.create("name", "FRED"))
680 .getAs(Integer.class)
681 .test()
682 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
683 .assertError(NamedParameterFoundButSqlDoesNotHaveNamesException.class)
684 .assertNoValues();
685 }
686
687 @Test
688 public void testUpdateWithNullNamedParameter() {
689 try (Database db = db()) {
690 db
691 .update("update person set date_of_birth = :dob")
692 .parameter(Parameter.create("dob", null))
693 .counts()
694 .test()
695 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
696 .assertValue(3)
697 .assertComplete();
698 }
699 }
700
701 @Test
702 public void testUpdateWithNullParameter() {
703 try (Database db = db()) {
704 db
705 .update("update person set date_of_birth = ?")
706 .parameter(null)
707 .counts()
708 .test()
709 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
710 .assertValue(3)
711 .assertComplete();
712 }
713 }
714
715 @Test
716 public void testUpdateWithNullStreamParameter() {
717 try (Database db = db()) {
718 db
719 .update("update person set date_of_birth = ?")
720 .parameterStream(Flowable.just(Parameter.NULL))
721 .counts()
722 .test()
723 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
724 .assertValue(3)
725 .assertComplete();
726 }
727 }
728
729 @Test
730 public void testUpdateWithTestDatabaseForReadme() {
731 try (Database db = db()) {
732 db
733 .update("update person set date_of_birth = ?")
734 .parameterStream(Flowable.just(Parameter.NULL))
735 .counts()
736 .test()
737 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
738 .assertValue(3)
739 .assertComplete();
740 }
741 }
742
743 @Test
744 public void testUpdateClobWithNull() {
745 try (Database db = db()) {
746 insertNullClob(db);
747 db
748 .update("update person_clob set document = :doc")
749 .parameter("doc", Database.NULL_CLOB)
750 .counts()
751 .test()
752 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
753 .assertValue(1)
754 .assertComplete();
755 }
756 }
757
758 @Test
759 public void testUpdateClobWithClob() throws SQLException {
760 try (Database db = db()) {
761 Clob clob = new JDBCClobFile(new File("src/test/resources/big.txt"));
762 insertNullClob(db);
763 db
764 .update("update person_clob set document = :doc")
765 .parameter("doc", clob)
766 .counts()
767 .test()
768 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
769 .assertValue(1)
770 .assertComplete();
771 }
772 }
773
774 @Test
775 public void testUpdateClobWithReader() throws FileNotFoundException {
776 try (Database db = db()) {
777 Reader reader = new FileReader(new File("src/test/resources/big.txt"));
778 insertNullClob(db);
779 db
780 .update("update person_clob set document = :doc")
781 .parameter("doc", reader)
782 .counts()
783 .test()
784 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
785 .assertValue(1)
786 .assertComplete();
787 }
788 }
789
790 @Test
791 public void testUpdateBlobWithBlob() throws SQLException {
792 try (Database db = db()) {
793 Blob blob = new JDBCBlobFile(new File("src/test/resources/big.txt"));
794 insertPersonBlob(db);
795 db
796 .update("update person_blob set document = :doc")
797 .parameter("doc", blob)
798 .counts()
799 .test()
800 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
801 .assertValue(1)
802 .assertComplete();
803 }
804 }
805
806 @Test
807 public void testUpdateBlobWithNull() {
808 try (Database db = db()) {
809 insertPersonBlob(db);
810 db
811 .update("update person_blob set document = :doc")
812 .parameter("doc", Database.NULL_BLOB)
813 .counts()
814 .test()
815 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
816 .assertValue(1)
817 .assertComplete();
818 }
819 }
820
821 @Test(expected = NullPointerException.class)
822 public void testSelectUsingNullNameInParameter() {
823 try (Database db = db()) {
824 db
825 .select("select score from person where name=:name")
826 .parameter(null, "FRED");
827 }
828 }
829
830 @Test(expected = IllegalArgumentException.class)
831 public void testSelectUsingNameDoesNotExist() {
832 try (Database db = db()) {
833 db
834 .select("select score from person where name=:name")
835 .parameters("nam", "FRED");
836 }
837 }
838
839 @Test(expected = IllegalArgumentException.class)
840 public void testSelectUsingNameWithoutSpecifyingNameThrowsImmediately() {
841 try (Database db = db()) {
842 db
843 .select("select score from person where name=:name")
844 .parameters("FRED", "JOSEPH");
845 }
846 }
847
848 @Test
849 public void testSelectTransacted() {
850 try (Database db = db()) {
851 db
852 .select("select score from person where name=?")
853 .parameters("FRED", "JOSEPH")
854 .transacted()
855 .getAs(Integer.class)
856 .doOnNext(tx -> log
857 .debug(tx.isComplete() ? "complete" : String.valueOf(tx.value())))
858 .test()
859 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
860 .assertValueCount(3)
861 .assertComplete();
862 }
863 }
864
865 @Test
866 public void testSelectAutomappedAnnotatedTransacted() {
867 try (Database db = db()) {
868 db
869 .select(Person10.class)
870 .transacted()
871 .valuesOnly()
872 .get().test()
873 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
874 .assertValueCount(3)
875 .assertComplete();
876 }
877 }
878
879 @Test
880 public void testSelectAutomappedTransactedValuesOnly() {
881 try (Database db = db()) {
882 db
883 .select("select name, score from person")
884 .transacted()
885 .valuesOnly()
886 .autoMap(Person2.class)
887 .test()
888 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
889 .assertValueCount(3)
890 .assertComplete();
891 }
892 }
893
894 @Test
895 public void testSelectAutomappedTransacted() {
896 try (Database db = db()) {
897 db
898 .select("select name, score from person")
899 .transacted()
900 .autoMap(Person2.class)
901 .test()
902 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
903 .assertValueCount(4)
904 .assertComplete();
905 }
906 }
907
908 @Test
909 public void testSelectTransactedTuple2() {
910 try (Database db = db()) {
911 Tx<Tuple2<String, Integer>> t = db
912 .select("select name, score from person where name=?")
913 .parameters("FRED")
914 .transacted()
915 .getAs(String.class, Integer.class)
916 .test()
917 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
918 .assertValueCount(2)
919 .assertComplete()
920 .values().get(0);
921 assertEquals("FRED", t.value()._1());
922 assertEquals(21, (int) t.value()._2());
923 }
924 }
925
926 @Test
927 public void testSelectTransactedTuple3() {
928 try (Database db = db()) {
929 Tx<Tuple3<String, Integer, String>> t = db
930 .select("select name, score, name from person where name=?")
931 .parameters("FRED")
932 .transacted()
933 .getAs(String.class, Integer.class, String.class)
934 .test()
935 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
936 .assertValueCount(2)
937 .assertComplete()
938 .values().get(0);
939 assertEquals("FRED", t.value()._1());
940 assertEquals(21, (int) t.value()._2());
941 assertEquals("FRED", t.value()._3());
942 }
943 }
944
945 @Test
946 public void testSelectTransactedTuple4() {
947 try (Database db = db()) {
948 Tx<Tuple4<String, Integer, String, Integer>> t = db
949 .select("select name, score, name, score from person where name=?")
950 .parameters("FRED")
951 .transacted()
952 .getAs(String.class, Integer.class, String.class, Integer.class)
953 .test()
954 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
955 .assertValueCount(2)
956 .assertComplete()
957 .values().get(0);
958 assertEquals("FRED", t.value()._1());
959 assertEquals(21, (int) t.value()._2());
960 assertEquals("FRED", t.value()._3());
961 assertEquals(21, (int) t.value()._4());
962 }
963 }
964
965 @Test
966 public void testSelectTransactedTuple5() {
967 try (Database db = db()) {
968 Tx<Tuple5<String, Integer, String, Integer, String>> t = db
969 .select("select name, score, name, score, name from person where name=?")
970 .parameters("FRED")
971 .transacted()
972 .getAs(String.class, Integer.class, String.class, Integer.class, String.class)
973 .test()
974 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
975 .assertValueCount(2)
976 .assertComplete()
977 .values().get(0);
978 assertEquals("FRED", t.value()._1());
979 assertEquals(21, (int) t.value()._2());
980 assertEquals("FRED", t.value()._3());
981 assertEquals(21, (int) t.value()._4());
982 assertEquals("FRED", t.value()._5());
983 }
984 }
985
986 @Test
987 public void testSelectTransactedTuple6() {
988 try (Database db = db()) {
989 Tx<Tuple6<String, Integer, String, Integer, String, Integer>> t = db
990 .select("select name, score, name, score, name, score from person where name=?")
991 .parameters("FRED")
992 .transacted()
993 .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
994 Integer.class)
995 .test()
996 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
997 .assertValueCount(2)
998 .assertComplete()
999 .values().get(0);
1000 assertEquals("FRED", t.value()._1());
1001 assertEquals(21, (int) t.value()._2());
1002 assertEquals("FRED", t.value()._3());
1003 assertEquals(21, (int) t.value()._4());
1004 assertEquals("FRED", t.value()._5());
1005 assertEquals(21, (int) t.value()._6());
1006 }
1007 }
1008
1009 @Test
1010 public void testSelectTransactedTuple7() {
1011 try (Database db = db()) {
1012 Tx<Tuple7<String, Integer, String, Integer, String, Integer, String>> t = db
1013 .select("select name, score, name, score, name, score, name from person where name=?")
1014 .parameters("FRED")
1015 .transacted()
1016 .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
1017 Integer.class, String.class)
1018 .test()
1019 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1020 .assertValueCount(2)
1021 .assertComplete()
1022 .values().get(0);
1023 assertEquals("FRED", t.value()._1());
1024 assertEquals(21, (int) t.value()._2());
1025 assertEquals("FRED", t.value()._3());
1026 assertEquals(21, (int) t.value()._4());
1027 assertEquals("FRED", t.value()._5());
1028 assertEquals(21, (int) t.value()._6());
1029 assertEquals("FRED", t.value()._7());
1030 }
1031 }
1032
1033 @Test
1034 public void testSelectTransactedTupleN() {
1035 try (Database db = db()) {
1036 List<Tx<TupleN<Object>>> list = db
1037 .select("select name, score from person where name=?")
1038 .parameters("FRED")
1039 .transacted()
1040 .getTupleN()
1041 .test()
1042 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1043 .assertValueCount(2)
1044 .assertComplete()
1045 .values();
1046 assertEquals("FRED", list.get(0).value().values().get(0));
1047 assertEquals(21, (int) list.get(0).value().values().get(1));
1048 assertTrue(list.get(1).isComplete());
1049 assertEquals(2, list.size());
1050 }
1051 }
1052
1053 @Test
1054 public void testSelectTransactedCount() {
1055 try (Database db = db()) {
1056 db
1057 .select("select name, score, name, score, name, score, name from person where name=?")
1058 .parameters("FRED")
1059 .transacted()
1060 .count()
1061 .test()
1062 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1063 .assertValueCount(1)
1064 .assertComplete();
1065 }
1066 }
1067
1068 @Test
1069 public void testSelectTransactedGetAs() {
1070 try (Database db = db()) {
1071 db
1072 .select("select name from person")
1073 .transacted()
1074 .getAs(String.class)
1075 .test()
1076 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1077 .assertValueCount(4)
1078 .assertComplete();
1079 }
1080 }
1081
1082 @Test
1083 public void testSelectTransactedGetAsOptional() {
1084 try (Database db = db()) {
1085 List<Tx<Optional<String>>> list = db
1086 .select("select name from person where name=?")
1087 .parameters("FRED")
1088 .transacted()
1089 .getAsOptional(String.class)
1090 .test()
1091 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1092 .assertValueCount(2)
1093 .assertComplete()
1094 .values();
1095 assertTrue(list.get(0).isValue());
1096 assertEquals("FRED", list.get(0).value().get());
1097 assertTrue(list.get(1).isComplete());
1098 }
1099 }
1100
1101 @Test
1102 public void testDatabaseFrom() {
1103 Database.from(DatabaseCreator.nextUrl(), 3)
1104 .select("select name from person")
1105 .getAs(String.class)
1106 .count()
1107 .test()
1108 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1109 .assertError(JdbcSQLException.class);
1110 }
1111
1112 @Test
1113 public void testSelectTransactedChained() throws Exception {
1114 try (Database db = db()) {
1115 db
1116 .select("select score from person where name=?")
1117 .parameters("FRED", "JOSEPH")
1118 .transacted()
1119 .transactedValuesOnly()
1120 .getAs(Integer.class)
1121 .doOnNext(tx -> log
1122 .debug(tx.isComplete() ? "complete" : String.valueOf(tx.value())))
1123 .flatMap(tx -> tx
1124 .select("select name from person where score = ?")
1125 .parameter(tx.value())
1126 .valuesOnly()
1127 .getAs(String.class))
1128 .test()
1129 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1130 .assertNoErrors()
1131 .assertValues("FRED", "JOSEPH")
1132 .assertComplete();
1133 }
1134 }
1135
1136 @Test
1137 public void databaseIsAutoCloseable() {
1138 try (Database db = db()) {
1139 log.debug(db.toString());
1140 }
1141 }
1142
1143 private static void println(Object o) {
1144 log.debug("{}", o);
1145 }
1146
1147 @Test
1148 public void testSelectChained() {
1149 try (Database db = db(1)) {
1150
1151 db.select("select score from person where name=?")
1152 .parameters("FRED", "JOSEPH")
1153 .getAs(Integer.class)
1154 .doOnNext(DatabaseTest::println)
1155 .concatMap(score -> {
1156 log.info("score={}", score);
1157 return db
1158 .select("select name from person where score = ?")
1159 .parameter(score)
1160 .getAs(String.class)
1161 .doOnComplete(
1162 () -> log.info("completed select where score=" + score));
1163 })
1164 .test()
1165 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1166 .assertNoErrors()
1167 .assertValues("FRED", "JOSEPH")
1168 .assertComplete();
1169 }
1170 }
1171
1172 @Test
1173 @SuppressFBWarnings
1174 public void testReadMeFragment1() {
1175 try (Database db = db()) {
1176 db.select("select name from person")
1177 .getAs(String.class)
1178 .forEach(DatabaseTest::println);
1179 }
1180 }
1181
1182 @Test
1183 public void testReadMeFragmentColumnDoesNotExistEmitsSqlSyntaxErrorException() {
1184 try (Database db = Database.test()) {
1185 db.select("select nam from person")
1186 .getAs(String.class)
1187 .test()
1188 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1189 .assertNoValues()
1190 .assertError(SQLSyntaxErrorException.class);
1191 }
1192 }
1193
1194 @Test
1195 public void testReadMeFragmentDerbyHealthCheck() {
1196 try (Database db = Database.test()) {
1197 db.select("select 'a' from sysibm.sysdummy1")
1198 .getAs(String.class)
1199 .test()
1200 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1201 .assertValue("a")
1202 .assertComplete();
1203 }
1204 }
1205
1206 @Test
1207 @SuppressFBWarnings
1208 public void testTupleSupport() {
1209 try (Database db = db()) {
1210 db.select("select name, score from person")
1211 .getAs(String.class, Integer.class)
1212 .forEach(DatabaseTest::println);
1213 }
1214 }
1215
1216 @Test
1217 public void testDelayedCallsAreNonBlocking() throws InterruptedException {
1218 List<String> list = new CopyOnWriteArrayList<String>();
1219 try (Database db = db(1)) {
1220 db.select("select score from person where name=?")
1221 .parameter("FRED")
1222 .getAs(Integer.class)
1223 .doOnNext(x -> Thread.sleep(1000))
1224 .subscribeOn(Schedulers.io())
1225 .subscribe();
1226 Thread.sleep(100);
1227 CountDownLatch latch = new CountDownLatch(1);
1228 db.select("select score from person where name=?")
1229 .parameter("FRED")
1230 .getAs(Integer.class)
1231 .doOnNext(x -> list.add("emitted"))
1232 .doOnNext(x -> log.debug("emitted on " + Thread.currentThread().getName()))
1233 .doOnNext(x -> latch.countDown())
1234 .subscribe();
1235 list.add("subscribed");
1236 assertTrue(latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
1237 assertEquals(Arrays.asList("subscribed", "emitted"), list);
1238 }
1239 }
1240
1241 @Test
1242 public void testAutoMapToInterface() {
1243 try (Database db = db()) {
1244 db
1245 .select("select name from person")
1246 .autoMap(Person.class)
1247 .map(p -> p.name())
1248 .test()
1249 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1250 .assertValueCount(3)
1251 .assertComplete();
1252 }
1253 }
1254
1255 @Test
1256 public void testAutoMapToInterfaceWithoutAnnotationstsError() {
1257 try (Database db = db()) {
1258 db
1259 .select("select name from person")
1260 .autoMap(PersonNoAnnotation.class)
1261 .map(p -> p.name())
1262 .test()
1263 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1264 .assertNoValues()
1265 .assertError(AnnotationsNotFoundException.class);
1266 }
1267 }
1268
1269 @Test
1270 public void testAutoMapToInterfaceWithTwoMethods() {
1271 try (Database db = db()) {
1272 db
1273 .select("select name, score from person order by name")
1274 .autoMap(Person2.class)
1275 .firstOrError()
1276 .map(Person2::score)
1277 .test()
1278 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1279 .assertValue(21)
1280 .assertComplete();
1281 }
1282 }
1283
1284 @Test
1285 public void testAutoMapToInterfaceWithExplicitColumnName() {
1286 try (Database db = db()) {
1287 db
1288 .select("select name, score from person order by name")
1289 .autoMap(Person3.class)
1290 .firstOrError()
1291 .map(Person3::examScore)
1292 .test()
1293 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1294 .assertValue(21)
1295 .assertComplete();
1296 }
1297 }
1298
1299 @Test
1300 public void testAutoMapToInterfaceWithExplicitColumnNameThatDoesNotExist() {
1301 try (Database db = db()) {
1302 db
1303 .select("select name, score from person order by name")
1304 .autoMap(Person4.class)
1305 .firstOrError()
1306 .map(Person4::examScore)
1307 .test()
1308 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1309 .assertNoValues()
1310 .assertError(ColumnNotFoundException.class);
1311 }
1312 }
1313
1314 @Test
1315 public void testAutoMapToInterfaceWithIndex() {
1316 try (Database db = db()) {
1317 db
1318 .select("select name, score from person order by name")
1319 .autoMap(Person5.class)
1320 .firstOrError()
1321 .map(Person5::examScore)
1322 .test()
1323 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1324 .assertValue(21)
1325 .assertComplete();
1326 }
1327 }
1328
1329 @Test
1330 public void testAutoMapToInterfaceWithIndexTooLarge() {
1331 try (Database db = db()) {
1332 db
1333 .select("select name, score from person order by name")
1334 .autoMap(Person6.class)
1335 .firstOrError()
1336 .map(Person6::examScore)
1337 .test()
1338 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1339 .assertNoValues()
1340 .assertError(ColumnIndexOutOfRangeException.class);
1341 }
1342 }
1343
1344 @Test
1345 public void testAutoMapToInterfaceWithIndexTooSmall() {
1346 try (Database db = db()) {
1347 db
1348 .select("select name, score from person order by name")
1349 .autoMap(Person7.class)
1350 .firstOrError()
1351 .map(Person7::examScore)
1352 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1353 .assertNoValues()
1354 .assertError(ColumnIndexOutOfRangeException.class);
1355 }
1356 }
1357
1358 @Test
1359 public void testAutoMapWithUnmappableColumnType() {
1360 try (Database db = db()) {
1361 db
1362 .select("select name from person order by name")
1363 .autoMap(Person8.class)
1364 .map(p -> p.name())
1365 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1366 .assertNoValues()
1367 .assertError(ClassCastException.class);
1368 }
1369 }
1370
1371 @Test
1372 public void testAutoMapWithMixIndexAndName() {
1373 try (Database db = db()) {
1374 db
1375 .select("select name, score from person order by name")
1376 .autoMap(Person9.class)
1377 .firstOrError()
1378 .map(Person9::score)
1379 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1380 .assertValue(21)
1381 .assertComplete();
1382 }
1383 }
1384
1385 @Test
1386 public void testAutoMapWithQueryInAnnotation() {
1387 try (Database db = db()) {
1388 db.select(Person10.class)
1389 .get()
1390 .firstOrError()
1391 .map(Person10::score)
1392 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1393 .assertValue(21)
1394 .assertComplete();
1395 }
1396 }
1397
1398 @Test
1399 @Ignore
1400 public void testAutoMapForReadMe() {
1401 try (Database db = Database.test()) {
1402 db.select(Person10.class)
1403 .get(Person10::name)
1404 .blockingForEach(DatabaseTest::println);
1405 }
1406 }
1407
1408 @Test(expected = QueryAnnotationMissingException.class)
1409 public void testAutoMapWithoutQueryInAnnotation() {
1410 try (Database db = db()) {
1411 db.select(Person.class);
1412 }
1413 }
1414
1415 @Test
1416 public void testSelectWithoutWhereClause() {
1417 try (Database db = db()) {
1418 Assert.assertEquals(3, (long) db.select("select name from person")
1419 .count()
1420 .blockingGet());
1421 }
1422 }
1423
1424 @Test
1425 public void testTuple3() {
1426 try (Database db = db()) {
1427 db
1428 .select("select name, score, name from person order by name")
1429 .getAs(String.class, Integer.class, String.class)
1430 .firstOrError()
1431 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1432 .assertComplete()
1433 .assertValue(Tuple3.create("FRED", 21, "FRED"));
1434 }
1435 }
1436
1437 @Test
1438 public void testTuple4() {
1439 try (Database db = db()) {
1440 db
1441 .select("select name, score, name, score from person order by name")
1442 .getAs(String.class, Integer.class, String.class, Integer.class)
1443 .firstOrError()
1444 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1445 .assertComplete()
1446 .assertValue(Tuple4.create("FRED", 21, "FRED", 21));
1447 }
1448 }
1449
1450 @Test
1451 public void testTuple5() {
1452 try (Database db = db()) {
1453 db
1454 .select("select name, score, name, score, name from person order by name")
1455 .getAs(String.class, Integer.class, String.class, Integer.class, String.class)
1456 .firstOrError()
1457 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1458 .assertComplete().assertValue(Tuple5.create("FRED", 21, "FRED", 21, "FRED"));
1459 }
1460 }
1461
1462 @Test
1463 public void testTuple6() {
1464 try (Database db = db()) {
1465 db
1466 .select("select name, score, name, score, name, score from person order by name")
1467 .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
1468 Integer.class)
1469 .firstOrError()
1470 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1471 .assertComplete()
1472 .assertValue(Tuple6.create("FRED", 21, "FRED", 21, "FRED", 21));
1473 }
1474 }
1475
1476 @Test
1477 public void testTuple7() {
1478 try (Database db = db()) {
1479 db
1480 .select("select name, score, name, score, name, score, name from person order by name")
1481 .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
1482 Integer.class, String.class)
1483 .firstOrError()
1484 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1485 .assertComplete()
1486 .assertValue(Tuple7.create("FRED", 21, "FRED", 21, "FRED", 21, "FRED"));
1487 }
1488 }
1489
1490 @Test
1491 public void testTupleN() {
1492 try (Database db = db()) {
1493 db
1494 .select("select name, score, name from person order by name")
1495 .getTupleN()
1496 .firstOrError().test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1497 .assertComplete()
1498 .assertValue(TupleN.create("FRED", 21, "FRED"));
1499 }
1500 }
1501
1502 @Test
1503 public void testTupleNWithClass() {
1504 try (Database db = db()) {
1505 db
1506 .select("select score a, score b from person order by name")
1507 .getTupleN(Integer.class)
1508 .firstOrError()
1509 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1510 .assertComplete()
1511 .assertValue(TupleN.create(21, 21));
1512 }
1513 }
1514
1515 @Test
1516 public void testTupleNWithClassInTransaction() {
1517 try (Database db = db()) {
1518 db
1519 .select("select score a, score b from person order by name")
1520 .transactedValuesOnly()
1521 .getTupleN(Integer.class)
1522 .map(x -> x.value())
1523 .firstOrError()
1524 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1525 .assertComplete()
1526 .assertValue(TupleN.create(21, 21));
1527 }
1528 }
1529
1530 @Test
1531 public void testHealthCheck() throws InterruptedException {
1532 AtomicBoolean once = new AtomicBoolean(true);
1533 testHealthCheck(c -> {
1534 log.debug("doing health check");
1535 return !once.compareAndSet(true, false);
1536 });
1537 }
1538
1539 @Test
1540 public void testHealthCheckThatThrows() throws InterruptedException {
1541 AtomicBoolean once = new AtomicBoolean(true);
1542 testHealthCheck(c -> {
1543 log.debug("doing health check");
1544 if (!once.compareAndSet(true, false))
1545 return true;
1546 else
1547 throw new RuntimeException("health check failed");
1548 });
1549 }
1550
1551 @Test
1552 public void testUpdateOneRow() {
1553 try (Database db = db()) {
1554 db.update("update person set score=20 where name='FRED'")
1555 .counts()
1556 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1557 .assertValue(1)
1558 .assertComplete();
1559 }
1560 }
1561
1562 @Test
1563 public void testUpdateThreeRows() {
1564 try (Database db = db()) {
1565 db.update("update person set score=20")
1566 .counts()
1567 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1568 .assertValue(3)
1569 .assertComplete();
1570 }
1571 }
1572
1573 @Test
1574 public void testUpdateWithParameter() {
1575 try (Database db = db()) {
1576 db.update("update person set score=20 where name=?")
1577 .parameter("FRED").counts()
1578 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1579 .assertValue(1)
1580 .assertComplete();
1581 }
1582 }
1583
1584 @Test
1585 public void testUpdateWithParameterTwoRuns() {
1586 try (Database db = db()) {
1587 db.update("update person set score=20 where name=?")
1588 .parameters("FRED", "JOSEPH").counts()
1589 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1590 .assertValues(1, 1)
1591 .assertComplete();
1592 }
1593 }
1594
1595 @Test
1596 public void testUpdateAllWithParameterFourRuns() {
1597 try (Database db = db()) {
1598 db.update("update person set score=?")
1599 .parameters(1, 2, 3, 4)
1600 .counts()
1601 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1602 .assertValues(3, 3, 3, 3)
1603 .assertComplete();
1604 }
1605 }
1606
1607 @Test
1608 public void testUpdateWithBatchSize2() {
1609 try (Database db = db()) {
1610 db.update("update person set score=?")
1611 .batchSize(2)
1612 .parameters(1, 2, 3, 4)
1613 .counts()
1614 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1615 .assertValues(3, 3, 3, 3)
1616 .assertComplete();
1617 }
1618 }
1619
1620 @Test
1621 public void testUpdateWithBatchSize3GreaterThanNumRecords() {
1622 try (Database db = db()) {
1623 db.update("update person set score=?")
1624 .batchSize(3)
1625 .parameters(1, 2, 3, 4)
1626 .counts()
1627 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1628 .assertValues(3, 3, 3, 3)
1629 .assertComplete();
1630 }
1631 }
1632
1633 @Test
1634 public void testInsert() {
1635 try (Database db = db()) {
1636 db.update("insert into person(name, score) values(?,?)")
1637 .parameters("DAVE", 12, "ANNE", 18)
1638 .counts()
1639 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1640 .assertValues(1, 1)
1641 .assertComplete();
1642 List<Tuple2<String, Integer>> list = db.select("select name, score from person")
1643 .getAs(String.class, Integer.class)
1644 .toList()
1645 .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1646 .blockingGet();
1647 assertTrue(list.contains(Tuple2.create("DAVE", 12)));
1648 assertTrue(list.contains(Tuple2.create("ANNE", 18)));
1649 }
1650 }
1651
1652 @Test
1653 public void testReturnGeneratedKeys() {
1654 try (Database db = db()) {
1655
1656 db.update("insert into note(text) values(?)")
1657 .parameters("HI", "THERE")
1658 .returnGeneratedKeys()
1659 .getAs(Integer.class)
1660 .test()
1661 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1662 .assertValues(1, 2)
1663 .assertComplete();
1664
1665 db.update("insert into note(text) values(?)")
1666 .parameters("ME", "TOO")
1667 .returnGeneratedKeys()
1668 .getAs(Integer.class)
1669 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1670 .assertValues(3, 4)
1671 .assertComplete();
1672 }
1673 }
1674
1675 @Test
1676 public void testReturnGeneratedKeysDerby() {
1677 Database db = DatabaseCreator.createDerby(1);
1678
1679
1680 db.update("insert into note2(text) values(?)")
1681 .parameters("HI", "THERE")
1682 .returnGeneratedKeys()
1683 .getAs(Integer.class)
1684 .test()
1685 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1686 .assertNoErrors().assertValues(1, 3)
1687 .assertComplete();
1688
1689 db.update("insert into note2(text) values(?)")
1690 .parameters("ME", "TOO")
1691 .returnGeneratedKeys()
1692 .getAs(Integer.class)
1693 .test()
1694 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1695 .assertValues(5, 7)
1696 .assertComplete();
1697 }
1698
1699 @Test(expected = IllegalArgumentException.class)
1700 public void testReturnGeneratedKeysWithBatchSizeShouldThrow() {
1701 try (Database db = db()) {
1702
1703 db.update("insert into note(text) values(?)")
1704 .parameters("HI", "THERE")
1705 .batchSize(2)
1706 .returnGeneratedKeys();
1707 }
1708 }
1709
1710 @Test
1711 public void testTransactedReturnGeneratedKeys() {
1712 try (Database db = db()) {
1713
1714 db.update("insert into note(text) values(?)")
1715 .parameters("HI", "THERE")
1716 .transacted()
1717 .returnGeneratedKeys()
1718 .valuesOnly()
1719 .getAs(Integer.class)
1720 .test()
1721 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1722 .assertValues(1, 2)
1723 .assertComplete();
1724
1725 db.update("insert into note(text) values(?)")
1726 .parameters("ME", "TOO")
1727 .transacted()
1728 .returnGeneratedKeys()
1729 .valuesOnly()
1730 .getAs(Integer.class)
1731 .test()
1732 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1733 .assertValues(3, 4)
1734 .assertComplete();
1735 }
1736 }
1737
1738 @Test
1739 public void testTransactedReturnGeneratedKeys2() {
1740 try (Database db = db()) {
1741
1742 Flowable<Integer> a = db.update("insert into note(text) values(?)")
1743 .parameters("HI", "THERE")
1744 .transacted()
1745 .returnGeneratedKeys()
1746 .valuesOnly()
1747 .getAs(Integer.class);
1748
1749 db.update("insert into note(text) values(?)")
1750 .parameters("ME", "TOO")
1751 .transacted()
1752 .returnGeneratedKeys()
1753 .valuesOnly()
1754 .getAs(Integer.class)
1755 .startWith(a)
1756 .test()
1757 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1758 .assertValues(1, 2, 3, 4)
1759 .assertComplete();
1760 }
1761 }
1762
1763 @Test
1764 public void testUpdateWithinTransaction() {
1765 try (Database db = db()) {
1766 db
1767 .select("select name from person")
1768 .transactedValuesOnly()
1769 .getAs(String.class)
1770 .doOnNext(DatabaseTest::println)
1771 .flatMap(tx -> tx
1772 .update("update person set score=-1 where name=:name")
1773 .batchSize(1)
1774 .parameter("name", tx.value())
1775 .valuesOnly()
1776 .counts())
1777 .test()
1778 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1779 .assertValues(1, 1, 1)
1780 .assertComplete();
1781 }
1782 }
1783
1784 @Test
1785 public void testSelectDependsOnFlowable() {
1786 try (Database db = db()) {
1787 Flowable<Integer> a = db.update("update person set score=100 where name=?")
1788 .parameter("FRED")
1789 .counts();
1790 db.select("select score from person where name=?")
1791 .parameter("FRED")
1792 .dependsOn(a)
1793 .getAs(Integer.class)
1794 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1795 .assertValues(100)
1796 .assertComplete();
1797 }
1798 }
1799
1800 @Test
1801 public void testSelectDependsOnObservable() {
1802 try (Database db = db()) {
1803 Observable<Integer> a = db.update("update person set score=100 where name=?")
1804 .parameter("FRED")
1805 .counts().toObservable();
1806 db.select("select score from person where name=?")
1807 .parameter("FRED")
1808 .dependsOn(a)
1809 .getAs(Integer.class)
1810 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1811 .assertValues(100)
1812 .assertComplete();
1813 }
1814 }
1815
1816 @Test
1817 public void testSelectDependsOnOnSingle() {
1818 try (Database db = db()) {
1819 Single<Long> a = db.update("update person set score=100 where name=?")
1820 .parameter("FRED")
1821 .counts().count();
1822 db.select("select score from person where name=?")
1823 .parameter("FRED")
1824 .dependsOn(a)
1825 .getAs(Integer.class)
1826 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1827 .assertValues(100)
1828 .assertComplete();
1829 }
1830 }
1831
1832 @Test
1833 public void testSelectDependsOnCompletable() {
1834 try (Database db = db()) {
1835 Completable a = db.update("update person set score=100 where name=?")
1836 .parameter("FRED")
1837 .counts().ignoreElements();
1838 db.select("select score from person where name=?")
1839 .parameter("FRED")
1840 .dependsOn(a)
1841 .getAs(Integer.class)
1842 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1843 .assertValues(100)
1844 .assertComplete();
1845 }
1846 }
1847
1848 @Test
1849 public void testUpdateWithinTransactionBatchSize0() {
1850 try (Database db = db()) {
1851 db
1852 .select("select name from person")
1853 .transactedValuesOnly()
1854 .getAs(String.class)
1855 .doOnNext(DatabaseTest::println)
1856 .flatMap(tx -> tx
1857 .update("update person set score=-1 where name=:name")
1858 .batchSize(0)
1859 .parameter("name", tx.value())
1860 .valuesOnly()
1861 .counts())
1862 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1863 .assertValues(1, 1, 1)
1864 .assertComplete();
1865 }
1866 }
1867
1868 private static void info() {
1869 LogManager.getRootLogger().setLevel(Level.INFO);
1870 }
1871
1872 private static void debug() {
1873 LogManager.getRootLogger().setLevel(Level.INFO);
1874 }
1875
1876 @Test
1877 public void testCreateBig() {
1878 info();
1879 big(5).select("select count(*) from person")
1880 .getAs(Integer.class)
1881 .test().awaitDone(20, TimeUnit.SECONDS)
1882 .assertValue(5163)
1883 .assertComplete();
1884 debug();
1885 }
1886
1887 @Test
1888 public void testTxWithBig() {
1889 info();
1890 big(1)
1891 .select("select name from person")
1892 .transactedValuesOnly()
1893 .getAs(String.class)
1894 .flatMap(tx -> tx
1895 .update("update person set score=-1 where name=:name")
1896 .batchSize(1)
1897 .parameter("name", tx.value())
1898 .valuesOnly()
1899 .counts())
1900 .count()
1901 .test().awaitDone(20, TimeUnit.SECONDS)
1902 .assertValue((long) NAMES_COUNT_BIG)
1903 .assertComplete();
1904 debug();
1905 }
1906
1907 @Test
1908 public void testTxWithBigInputBatchSize2000() {
1909 info();
1910 big(1)
1911 .select("select name from person")
1912 .transactedValuesOnly()
1913 .getAs(String.class)
1914 .flatMap(tx -> tx
1915 .update("update person set score=-1 where name=:name")
1916 .batchSize(2000)
1917 .parameter("name", tx.value())
1918 .valuesOnly()
1919 .counts())
1920 .count()
1921 .test().awaitDone(20, TimeUnit.SECONDS)
1922 .assertValue((long) NAMES_COUNT_BIG)
1923 .assertComplete();
1924 debug();
1925 }
1926
1927 @Test
1928 public void testInsertNullClobAndReadClobAsString() {
1929 try (Database db = db()) {
1930 insertNullClob(db);
1931 db.select("select document from person_clob where name='FRED'")
1932 .getAsOptional(String.class)
1933 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1934 .assertValue(Optional.<String>empty())
1935 .assertComplete();
1936 }
1937 }
1938
1939 @Test
1940 public void testAutomapClobIssue32() {
1941 try (Database db = db()) {
1942 db.update("insert into person_clob(name, document) values(?, ? )")
1943 .parameters("fred", "hello there")
1944 .complete()
1945 .blockingAwait(TIMEOUT_SECONDS, TimeUnit.SECONDS);
1946 db.select("select * from person_clob")
1947 .autoMap(PersonClob.class)
1948 .map(pc -> {
1949 System.out.println(pc);
1950 return pc.document();
1951 })
1952 .test()
1953 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1954 .assertValueCount(1)
1955 .assertComplete();
1956 }
1957 }
1958
1959 private static interface PersonClob {
1960 @Column
1961 String name();
1962 @Column
1963 String document();
1964 }
1965
1966 private static void insertNullClob(Database db) {
1967 db.update("insert into person_clob(name,document) values(?,?)")
1968 .parameters("FRED", Database.NULL_CLOB)
1969 .counts()
1970 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1971 .assertValue(1)
1972 .assertComplete();
1973 }
1974
1975 @Test
1976 public void testClobMethod() {
1977 assertEquals(Database.NULL_CLOB, Database.clob(null));
1978 }
1979
1980 @Test
1981 public void testBlobMethod() {
1982 assertEquals(Database.NULL_BLOB, Database.blob(null));
1983 }
1984
1985 @Test
1986 public void testClobMethodPresent() {
1987 assertEquals("a", Database.clob("a"));
1988 }
1989
1990 @Test
1991 public void testBlobMethodPresent() {
1992 byte[] b = new byte[1];
1993 assertEquals(b, Database.blob(b));
1994 }
1995
1996 @Test
1997 public void testDateOfBirthNullableForReadMe() {
1998 Database.test()
1999 .select("select date_of_birth from person where name='FRED'")
2000 .getAsOptional(Instant.class)
2001 .blockingForEach(DatabaseTest::println);
2002 }
2003
2004 @Test
2005 public void testInsertNullClobAndReadClobAsTuple2() {
2006 try (Database db = db()) {
2007 insertNullClob(db);
2008 db.select("select document, document from person_clob where name='FRED'")
2009 .getAs(String.class, String.class)
2010 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2011 .assertValue(Tuple2.create(null, null))
2012 .assertComplete();
2013 }
2014 }
2015
2016 @Test
2017 public void testInsertClobAndReadClobAsString() {
2018 try (Database db = db()) {
2019 db.update("insert into person_clob(name,document) values(?,?)")
2020 .parameters("FRED", "some text here")
2021 .counts()
2022 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2023 .assertValue(1)
2024 .assertComplete();
2025 db.select("select document from person_clob where name='FRED'")
2026 .getAs(String.class)
2027 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2028
2029
2030
2031 .assertComplete();
2032 }
2033 }
2034
2035 @Test
2036 public void testInsertClobAndReadClobUsingReader() {
2037 try (Database db = db()) {
2038 db.update("insert into person_clob(name,document) values(?,?)")
2039 .parameters("FRED", "some text here")
2040 .counts()
2041 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2042 .assertValue(1)
2043 .assertComplete();
2044 db.select("select document from person_clob where name='FRED'")
2045 .getAs(Reader.class)
2046 .map(r -> read(r)).test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2047 .assertValue("some text here")
2048 .assertComplete();
2049 }
2050 }
2051
2052 @Test
2053 public void testInsertBlobAndReadBlobAsByteArray() {
2054 try (Database db = db()) {
2055 insertPersonBlob(db);
2056 db.select("select document from person_blob where name='FRED'")
2057 .getAs(byte[].class)
2058 .map(b -> new String(b))
2059 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2060 .assertValue("some text here")
2061 .assertComplete();
2062 }
2063 }
2064
2065 private static void insertPersonBlob(Database db) {
2066 byte[] bytes = "some text here".getBytes();
2067 db.update("insert into person_blob(name,document) values(?,?)")
2068 .parameters("FRED", bytes)
2069 .counts()
2070 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2071 .assertValue(1)
2072 .assertComplete();
2073 }
2074
2075 @Test
2076 public void testInsertBlobAndReadBlobAsInputStream() {
2077 try (Database db = db()) {
2078 byte[] bytes = "some text here".getBytes();
2079 db.update("insert into person_blob(name,document) values(?,?)")
2080 .parameters("FRED", new ByteArrayInputStream(bytes))
2081 .counts()
2082 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2083 .assertValue(1)
2084 .assertComplete();
2085 db.select("select document from person_blob where name='FRED'")
2086 .getAs(InputStream.class)
2087 .map(is -> read(is))
2088 .map(b -> new String(b))
2089 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2090 .assertValue("some text here")
2091 .assertComplete();
2092 }
2093 }
2094
2095 private static String read(Reader reader) throws IOException {
2096 StringBuffer s = new StringBuffer();
2097 char[] ch = new char[128];
2098 int n = 0;
2099 while ((n = reader.read(ch)) != -1) {
2100 s.append(ch, 0, n);
2101 }
2102 reader.close();
2103 return s.toString();
2104 }
2105
2106 private static byte[] read(InputStream is) throws IOException {
2107 ByteArrayOutputStream bytes = new ByteArrayOutputStream();
2108 byte[] b = new byte[128];
2109 int n = 0;
2110 while ((n = is.read(b)) != -1) {
2111 bytes.write(b, 0, n);
2112 }
2113 is.close();
2114 return bytes.toByteArray();
2115 }
2116
2117 private void testHealthCheck(Predicate<Connection> healthy) throws InterruptedException {
2118 TestScheduler scheduler = new TestScheduler();
2119
2120 NonBlockingConnectionPool pool = Pools
2121 .nonBlocking()
2122 .connectionProvider(DatabaseCreator.connectionProvider())
2123 .maxIdleTime(10, TimeUnit.MINUTES)
2124 .idleTimeBeforeHealthCheck(0, TimeUnit.MINUTES)
2125 .healthCheck(healthy)
2126 .scheduler(scheduler)
2127 .maxPoolSize(1)
2128 .build();
2129
2130 try (Database db = Database.from(pool)) {
2131 TestSubscriber<Integer> ts0 = db.select(
2132 "select score from person where name=?")
2133 .parameter("FRED")
2134 .getAs(Integer.class)
2135 .test();
2136 ts0.assertValueCount(0)
2137 .assertNotComplete();
2138 scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
2139 ts0.assertValueCount(1)
2140 .assertComplete();
2141 TestSubscriber<Integer> ts = db.select(
2142 "select score from person where name=?")
2143 .parameter("FRED")
2144 .getAs(Integer.class)
2145 .test()
2146 .assertValueCount(0);
2147 log.debug("done2");
2148 scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
2149 Thread.sleep(200);
2150 ts.assertValueCount(1);
2151 Thread.sleep(200);
2152 ts.assertValue(21)
2153 .assertComplete();
2154 }
2155 }
2156
2157 @Test
2158 public void testShutdownBeforeUse() {
2159 NonBlockingConnectionPool pool = Pools
2160 .nonBlocking()
2161 .connectionProvider(DatabaseCreator.connectionProvider())
2162 .scheduler(Schedulers.io())
2163 .maxPoolSize(1)
2164 .build();
2165 pool.close();
2166 Database.from(pool)
2167 .select("select score from person where name=?")
2168 .parameter("FRED")
2169 .getAs(Integer.class)
2170 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2171 .assertNoValues()
2172 .assertError(PoolClosedException.class);
2173 }
2174
2175 @Test
2176 public void testFewerColumnsMappedThanAvailable() {
2177 try (Database db = db()) {
2178 db.select("select name, score from person where name='FRED'")
2179 .getAs(String.class)
2180 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2181 .assertValues("FRED")
2182 .assertComplete();
2183 }
2184 }
2185
2186 @Test
2187 public void testMoreColumnsMappedThanAvailable() {
2188 try (Database db = db()) {
2189 db
2190 .select("select name, score from person where name='FRED'")
2191 .getAs(String.class, Integer.class, String.class)
2192 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2193 .assertNoValues()
2194 .assertError(MoreColumnsRequestedThanExistException.class);
2195 }
2196 }
2197
2198 @Test
2199 public void testSelectTimestamp() {
2200 try (Database db = db()) {
2201 db
2202 .select("select registered from person where name='FRED'")
2203 .getAs(Long.class)
2204 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2205 .assertValue(FRED_REGISTERED_TIME)
2206 .assertComplete();
2207 }
2208 }
2209
2210 @Test
2211 public void testSelectTimestampAsDate() {
2212 try (Database db = db()) {
2213 db
2214 .select("select registered from person where name='FRED'")
2215 .getAs(Date.class)
2216 .map(d -> d.getTime())
2217 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2218 .assertValue(FRED_REGISTERED_TIME)
2219 .assertComplete();
2220 }
2221 }
2222
2223 @Test
2224 public void testSelectTimestampAsInstant() {
2225 try (Database db = db()) {
2226 db
2227 .select("select registered from person where name='FRED'")
2228 .getAs(Instant.class)
2229 .map(d -> d.toEpochMilli())
2230 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2231 .assertValue(FRED_REGISTERED_TIME)
2232 .assertComplete();
2233 }
2234 }
2235
2236 @Test
2237 public void testUpdateCalendarParameter() {
2238 Calendar c = GregorianCalendar.from(ZonedDateTime.parse("2017-03-25T15:37Z"));
2239 try (Database db = db()) {
2240 db.update("update person set registered=?")
2241 .parameter(c)
2242 .counts()
2243 .test()
2244 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2245 .assertValue(3)
2246 .assertComplete();
2247 db.select("select registered from person")
2248 .getAs(Long.class)
2249 .firstOrError()
2250 .test()
2251 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2252 .assertValue(c.getTimeInMillis())
2253 .assertComplete();
2254 }
2255 }
2256
2257 @Test
2258 public void testUpdateTimeParameter() {
2259 try (Database db = db()) {
2260 Time t = new Time(1234);
2261 db.update("update person set registered=?")
2262 .parameter(t)
2263 .counts()
2264 .test()
2265 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2266 .assertValue(3)
2267 .assertComplete();
2268 db.select("select registered from person")
2269 .getAs(Long.class)
2270 .firstOrError()
2271 .test()
2272 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2273 .assertValue(1234L)
2274 .assertComplete();
2275 }
2276 }
2277
2278 @Test
2279 public void testUpdateTimestampParameter() {
2280 try (Database db = db()) {
2281 Timestamp t = new Timestamp(1234);
2282 db.update("update person set registered=?")
2283 .parameter(t)
2284 .counts()
2285 .test()
2286 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2287 .assertValue(3)
2288 .assertComplete();
2289 db.select("select registered from person")
2290 .getAs(Long.class)
2291 .firstOrError()
2292 .test()
2293 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2294 .assertValue(1234L)
2295 .assertComplete();
2296 }
2297 }
2298
2299 @Test
2300 public void testUpdateSqlDateParameter() {
2301 try (Database db = db()) {
2302 java.sql.Date t = new java.sql.Date(1234);
2303
2304 db.update("update person set registered=?")
2305 .parameter(t)
2306 .counts()
2307 .test()
2308 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2309 .assertValue(3)
2310 .assertComplete();
2311 db.select("select registered from person")
2312 .getAs(Long.class)
2313 .firstOrError()
2314 .test()
2315 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2316
2317 .assertValue(x -> Math.abs(x - 1234) <= TimeUnit.HOURS.toMillis(24))
2318 .assertComplete();
2319 }
2320 }
2321
2322 @Test
2323 public void testUpdateUtilDateParameter() {
2324 try (Database db = db()) {
2325 Date d = new Date(1234);
2326 db.update("update person set registered=?")
2327 .parameter(d)
2328 .counts()
2329 .test()
2330 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2331 .assertValue(3)
2332 .assertComplete();
2333 db.select("select registered from person")
2334 .getAs(Long.class)
2335 .firstOrError()
2336 .test()
2337 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2338 .assertValue(1234L)
2339 .assertComplete();
2340 }
2341 }
2342
2343 @Test
2344 public void testUpdateTimestampAsInstant() {
2345 try (Database db = db()) {
2346 db.update("update person set registered=? where name='FRED'")
2347 .parameter(Instant.ofEpochMilli(FRED_REGISTERED_TIME))
2348 .counts()
2349 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2350 .assertValue(1)
2351 .assertComplete();
2352 db.select("select registered from person where name='FRED'")
2353 .getAs(Long.class)
2354 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2355 .assertValue(FRED_REGISTERED_TIME)
2356 .assertComplete();
2357 }
2358 }
2359
2360 @Test
2361 public void testUpdateTimestampAsZonedDateTime() {
2362 try (Database db = db()) {
2363 db.update("update person set registered=? where name='FRED'")
2364 .parameter(ZonedDateTime.ofInstant(Instant.ofEpochMilli(FRED_REGISTERED_TIME),
2365 ZoneOffset.UTC.normalized()))
2366 .counts()
2367 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2368 .assertValue(1)
2369 .assertComplete();
2370 db.select("select registered from person where name='FRED'")
2371 .getAs(Long.class)
2372 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2373 .assertValue(FRED_REGISTERED_TIME)
2374 .assertComplete();
2375 }
2376 }
2377
2378 @Test
2379 public void testCompleteCompletes() {
2380 try (Database db = db(1)) {
2381 db
2382 .update("update person set score=-3 where name='FRED'")
2383 .complete()
2384 .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2385 .blockingAwait();
2386
2387 int score = db.select("select score from person where name='FRED'")
2388 .getAs(Integer.class)
2389 .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2390 .blockingFirst();
2391 assertEquals(-3, score);
2392 }
2393 }
2394
2395 @Test
2396 public void testComplete() throws InterruptedException {
2397 try (Database db = db(1)) {
2398 Completable a = db
2399 .update("update person set score=-3 where name='FRED'")
2400 .complete();
2401 db.update("update person set score=-4 where score = -3")
2402 .dependsOn(a)
2403 .counts()
2404 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2405 .assertValue(1)
2406 .assertComplete();
2407 }
2408 }
2409
2410 @Test
2411 public void testCountsOnlyInTransaction() {
2412 try (Database db = db()) {
2413 db.update("update person set score = -3")
2414 .transacted()
2415 .countsOnly()
2416 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2417 .assertValues(3)
2418 .assertComplete();
2419 }
2420 }
2421
2422 @Test
2423 public void testCountsInTransaction() {
2424 try (Database db = db()) {
2425 db.update("update person set score = -3")
2426 .transacted()
2427 .counts()
2428 .doOnNext(DatabaseTest::println)
2429 .toList()
2430 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2431 .assertValue(list -> list.get(0).isValue() && list.get(0).value() == 3
2432 && list.get(1).isComplete() && list.size() == 2)
2433 .assertComplete();
2434 }
2435 }
2436
2437 @Test
2438 public void testTx() throws InterruptedException {
2439 Database db = db(3);
2440 Flowable<Tx<?>> transaction = db
2441 .update("update person set score=-3 where name='FRED'")
2442 .transaction();
2443
2444 transaction
2445 .doOnCancel(() -> log.debug("disposing"))
2446 .doOnNext(DatabaseTest::println)
2447 .flatMap(tx -> {
2448 log.debug("flatmapping");
2449 return tx
2450 .update("update person set score = -4 where score = -3")
2451 .countsOnly()
2452 .doOnSubscribe(s -> log.debug("subscribed"))
2453 .doOnNext(num -> log.debug("num=" + num));
2454 })
2455 .test()
2456 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2457 .assertNoErrors()
2458 .assertValue(1)
2459 .assertComplete();
2460 }
2461
2462 @Test
2463 public void testTxAfterSelect() {
2464 Database db = db(3);
2465 Single<Tx<Integer>> transaction = db
2466 .select("select score from person where name='FRED'")
2467 .transactedValuesOnly()
2468 .getAs(Integer.class)
2469 .firstOrError();
2470
2471 transaction
2472 .doOnDispose(() -> log.debug("disposing"))
2473 .doOnSuccess(DatabaseTest::println)
2474 .flatMapPublisher(tx -> {
2475 log.debug("flatmapping");
2476 return tx
2477 .update("update person set score = -4 where score = ?")
2478 .parameter(tx.value())
2479 .countsOnly()
2480 .doOnSubscribe(s -> log.debug("subscribed"))
2481 .doOnNext(num -> log.debug("num=" + num));
2482 })
2483 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2484 .assertNoErrors()
2485 .assertValue(1)
2486 .assertComplete();
2487 }
2488
2489 @Test
2490 public void testUseTxOnComplete() {
2491 db(1)
2492 .select(Person10.class)
2493 .transacted()
2494 .get()
2495 .lastOrError()
2496 .map(tx -> tx.select("select count(*) from person")
2497 .count().blockingGet())
2498 .test()
2499 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2500 .assertError(CannotForkTransactedConnection.class)
2501 .assertValueCount(0);
2502 }
2503
2504 @Test
2505 public void testSingleFlatMap() {
2506 Single.just(1).flatMapPublisher(n -> Flowable.just(1)).test(1).assertValue(1)
2507 .assertComplete();
2508 }
2509
2510 @Test
2511 public void testAutomappedInstanceHasMeaningfulToStringMethod() {
2512 info();
2513 String s = Database.test()
2514 .select("select name, score from person where name=?")
2515 .parameterStream(Flowable.just("FRED"))
2516 .autoMap(Person2.class)
2517 .map(x -> x.toString())
2518 .blockingSingle();
2519 assertEquals("Person2[name=FRED, score=21]", s);
2520 }
2521
2522 @Test
2523 public void testAutomappedEquals() {
2524 boolean b = Database.test()
2525 .select("select name, score from person where name=?")
2526 .parameterStream(Flowable.just("FRED"))
2527 .autoMap(Person2.class)
2528 .map(x -> x.equals(x))
2529 .blockingSingle();
2530 assertTrue(b);
2531 }
2532
2533 @Test
2534 public void testAutomappedDoesNotEqualNull() {
2535 boolean b = Database.test()
2536 .select("select name, score from person where name=?")
2537 .parameterStream(Flowable.just("FRED"))
2538 .autoMap(Person2.class)
2539 .map(x -> x.equals(null))
2540 .blockingSingle();
2541 assertFalse(b);
2542 }
2543
2544 @Test
2545 public void testAutomappedDoesNotEqual() {
2546 boolean b = Database.test()
2547 .select("select name, score from person where name=?")
2548 .parameterStream(Flowable.just("FRED"))
2549 .autoMap(Person2.class)
2550 .map(x -> x.equals(new Object()))
2551 .blockingSingle();
2552 assertFalse(b);
2553 }
2554
2555 @Test
2556 public void testAutomappedHashCode() {
2557 Person2 p = Database.test()
2558 .select("select name, score from person where name=?")
2559 .parameterStream(Flowable.just("FRED"))
2560 .autoMap(Person2.class)
2561 .blockingSingle();
2562 assertTrue(p.hashCode() != 0);
2563 }
2564
2565 @Test
2566 public void testAutomappedWithParametersThatProvokesMoreThanOneQuery() {
2567 Database.test()
2568 .select("select name, score from person where name=?")
2569 .parameters("FRED", "FRED")
2570 .autoMap(Person2.class)
2571 .doOnNext(DatabaseTest::println)
2572 .test()
2573 .awaitDone(2000, TimeUnit.SECONDS)
2574 .assertNoErrors()
2575 .assertValueCount(2)
2576 .assertComplete();
2577 }
2578
2579 @Test
2580 public void testAutomappedObjectsEqualsAndHashCodeIsDistinctOnValues() {
2581 Database.test()
2582 .select("select name, score from person where name=?")
2583 .parameters("FRED", "FRED")
2584 .autoMap(Person2.class)
2585 .distinct()
2586 .doOnNext(DatabaseTest::println)
2587 .test()
2588 .awaitDone(2000, TimeUnit.SECONDS)
2589 .assertNoErrors()
2590 .assertValueCount(1)
2591 .assertComplete();
2592 }
2593
2594 @Test
2595 public void testAutomappedObjectsEqualsDifferentiatesDifferentInterfacesWithSameMethodNamesAndValues() {
2596 PersonDistinct1 p1 = Database.test()
2597 .select("select name, score from person where name=?")
2598 .parameters("FRED")
2599 .autoMap(PersonDistinct1.class)
2600 .blockingFirst();
2601
2602 PersonDistinct2 p2 = Database.test()
2603 .select("select name, score from person where name=?")
2604 .parameters("FRED")
2605 .autoMap(PersonDistinct2.class)
2606 .blockingFirst();
2607
2608 assertNotEquals(p1, p2);
2609 }
2610
2611 @Test
2612 public void testAutomappedObjectsWhenDefaultMethodInvoked() {
2613
2614 if (System.getProperty("java.version").startsWith("1.8.")) {
2615 PersonWithDefaultMethod p = Database.test()
2616 .select("select name, score from person where name=?")
2617 .parameters("FRED")
2618 .autoMap(PersonWithDefaultMethod.class)
2619 .blockingFirst();
2620 assertEquals("fred", p.nameLower());
2621 }
2622 }
2623
2624 @Test(expected = AutomappedInterfaceInaccessibleException.class)
2625 public void testAutomappedObjectsWhenDefaultMethodInvokedAndIsNonPublicThrows() {
2626 PersonWithDefaultMethodNonPublic p = Database.test()
2627 .select("select name, score from person where name=?")
2628 .parameters("FRED")
2629 .autoMap(PersonWithDefaultMethodNonPublic.class)
2630 .blockingFirst();
2631 p.nameLower();
2632 }
2633
2634 @Test
2635 public void testBlockingDatabase() {
2636 Database db = blocking();
2637 db.select("select score from person where name=?")
2638 .parameters("FRED", "JOSEPH")
2639 .getAs(Integer.class)
2640 .test()
2641 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2642 .assertNoErrors()
2643 .assertValues(21, 34)
2644 .assertComplete();
2645 }
2646
2647 @Test
2648 public void testBlockingDatabaseTransacted() {
2649 Database db = blocking();
2650 db.select("select score from person where name=?")
2651 .parameters("FRED", "JOSEPH")
2652 .transactedValuesOnly()
2653 .getAs(Integer.class)
2654 .map(x -> x.value())
2655 .test()
2656 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2657 .assertNoErrors()
2658 .assertValues(21, 34)
2659 .assertComplete();
2660 }
2661
2662 @Test
2663 public void testBlockingDatabaseTransactedNested() {
2664 Database db = blocking();
2665 db.select("select score from person where name=?")
2666 .parameters("FRED", "JOSEPH")
2667 .transactedValuesOnly()
2668 .getAs(Integer.class)
2669 .flatMap(tx -> tx.select("select name from person where score=?")
2670 .parameter(tx.value())
2671 .valuesOnly()
2672 .getAs(String.class))
2673 .test()
2674 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2675 .assertNoErrors()
2676 .assertValues("FRED", "JOSEPH")
2677 .assertComplete();
2678 }
2679
2680 @Test
2681 public void testUsingNormalJDBCApi() {
2682 Database db = db(1);
2683 db.apply(con -> {
2684 try (PreparedStatement stmt = con
2685 .prepareStatement("select count(*) from person where name='FRED'");
2686 ResultSet rs = stmt.executeQuery()) {
2687 rs.next();
2688 return rs.getInt(1);
2689 }
2690 })
2691 .test()
2692 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2693 .assertValue(1)
2694 .assertComplete();
2695
2696
2697 db.select("select count(*) from person where name='FRED'")
2698 .getAs(Integer.class)
2699 .test()
2700 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2701 .assertValue(1)
2702 .assertComplete();
2703 }
2704
2705 @Test
2706 public void testUsingNormalJDBCApiCompletable() {
2707 Database db = db(1);
2708 db.apply(con -> {
2709 try (PreparedStatement stmt = con
2710 .prepareStatement("select count(*) from person where name='FRED'");
2711 ResultSet rs = stmt.executeQuery()) {
2712 rs.next();
2713 }
2714 })
2715 .test()
2716 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2717 .assertComplete();
2718
2719
2720 db.select("select count(*) from person where name='FRED'")
2721 .getAs(Integer.class)
2722 .test()
2723 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2724 .assertValue(1)
2725 .assertComplete();
2726 }
2727
2728 @Test
2729 public void testCallableStatement() {
2730 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2731 db.apply(con -> {
2732 try (Statement stmt = con.createStatement()) {
2733 CallableStatement st = con.prepareCall("call getPersonCount(?, ?)");
2734 st.setInt(1, 0);
2735 st.registerOutParameter(2, Types.INTEGER);
2736 st.execute();
2737 assertEquals(2, st.getInt(2));
2738 }
2739 }).blockingAwait(TIMEOUT_SECONDS, TimeUnit.SECONDS);
2740 }
2741
2742 @Test
2743 public void testCallableStatementReturningResultSets() {
2744 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2745 db.apply(con -> {
2746 try (Statement stmt = con.createStatement()) {
2747 CallableStatement st = con.prepareCall("call in1out0rs2(?)");
2748 st.setInt(1, 0);
2749 st.execute();
2750 ResultSet rs1 = st.getResultSet();
2751 st.getMoreResults(Statement.KEEP_CURRENT_RESULT);
2752 ResultSet rs2 = st.getResultSet();
2753 rs1.next();
2754 assertEquals("FRED", rs1.getString(1));
2755 rs2.next();
2756 assertEquals("SARAH", rs2.getString(1));
2757 }
2758 }).blockingAwait(TIMEOUT_SECONDS, TimeUnit.SECONDS);
2759 }
2760
2761 @Test
2762 public void testCallableApiNoParameters() {
2763 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2764 db
2765 .call("call zero()")
2766 .once()
2767 .test()
2768 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2769 .assertComplete();
2770 }
2771
2772 @Test
2773 public void testCallableApiNoParametersTransacted() {
2774 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2775 db
2776 .call("call zero()")
2777 .transacted()
2778 .once()
2779 .test()
2780 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2781 .assertValueCount(1)
2782 .assertValue(x -> x.isComplete())
2783 .assertComplete();
2784 }
2785
2786 @Test
2787 public void testCallableApiOneInOutParameter() {
2788 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2789 db
2790 .call("call inout1(?)")
2791 .inOut(Type.INTEGER, Integer.class)
2792 .input(4)
2793 .test()
2794 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2795 .assertValue(5)
2796 .assertComplete();
2797 }
2798
2799 @Test
2800 public void testCallableApiOneInOutParameterTransacted() {
2801 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2802 db
2803 .call("call inout1(?)")
2804 .transacted()
2805 .inOut(Type.INTEGER, Integer.class)
2806 .input(4)
2807 .flatMap(Tx.flattenToValuesOnly())
2808 .test()
2809 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2810 .assertValue(5)
2811 .assertComplete();
2812 }
2813
2814 @Test
2815 public void testCallableApiTwoInOutParameters() {
2816 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2817 db
2818 .call("call inout2(?, ?)")
2819 .inOut(Type.INTEGER, Integer.class)
2820 .inOut(Type.INTEGER, Integer.class)
2821 .input(4, 10)
2822 .test()
2823 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2824 .assertValue(x -> x._1() == 5 && x._2() == 12)
2825 .assertComplete();
2826 }
2827
2828 @Test
2829 public void testCallableApiTwoInOutParametersTransacted() {
2830 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2831 db
2832 .call("call inout2(?, ?)")
2833 .transacted()
2834 .inOut(Type.INTEGER, Integer.class)
2835 .inOut(Type.INTEGER, Integer.class)
2836 .input(4, 10)
2837 .flatMap(Tx.flattenToValuesOnly())
2838 .test()
2839 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2840 .assertValue(x -> x._1() == 5 && x._2() == 12)
2841 .assertComplete();
2842 }
2843
2844 @Test
2845 public void testCallableApiThreeInOutParameters() {
2846 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2847 db
2848 .call("call inout3(?, ?, ?)")
2849 .inOut(Type.INTEGER, Integer.class)
2850 .inOut(Type.INTEGER, Integer.class)
2851 .inOut(Type.INTEGER, Integer.class)
2852 .input(4, 10, 13)
2853 .test()
2854 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2855 .assertValue(x -> x._1() == 5 && x._2() == 12 && x._3() == 16)
2856 .assertComplete();
2857 }
2858
2859 @Test
2860 public void testCallableApiThreeInOutParametersTransacted() {
2861 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2862 db
2863 .call("call inout3(?, ?, ?)")
2864 .transacted()
2865 .inOut(Type.INTEGER, Integer.class)
2866 .inOut(Type.INTEGER, Integer.class)
2867 .inOut(Type.INTEGER, Integer.class)
2868 .input(4, 10, 13)
2869 .flatMap(Tx.flattenToValuesOnly())
2870 .test()
2871 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2872 .assertValue(x -> x._1() == 5 && x._2() == 12 && x._3() == 16)
2873 .assertComplete();
2874 }
2875
2876 @Test
2877 public void testCallableApiReturningOneOutParameter() throws InterruptedException {
2878 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2879 db
2880 .call("call in1out1(?,?)")
2881 .in()
2882 .out(Type.INTEGER, Integer.class)
2883 .input(0, 10, 20)
2884 .test()
2885 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2886 .assertValues(0, 10, 20)
2887 .assertComplete();
2888 }
2889
2890 @Test
2891 public void testCallableApiReturningOneOutParameterTransacted() throws InterruptedException {
2892 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2893 db
2894 .call("call in1out1(?,?)")
2895 .transacted()
2896 .in()
2897 .out(Type.INTEGER, Integer.class)
2898 .input(0, 10, 20)
2899 .flatMap(Tx.flattenToValuesOnly())
2900 .test()
2901 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2902 .assertValues(0, 10, 20)
2903 .assertComplete();
2904 }
2905
2906 @Test
2907 public void testCallableApiReturningTwoOutParameters() throws InterruptedException {
2908 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2909 db
2910 .call("call in1out2(?,?,?)")
2911 .in()
2912 .out(Type.INTEGER, Integer.class)
2913 .out(Type.INTEGER, Integer.class)
2914 .input(0, 10, 20)
2915 .test()
2916 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2917 .assertValueCount(3)
2918 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1)
2919 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11)
2920 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21)
2921 .assertComplete();
2922
2923 db.call("call in1out2(?,?,?)").in().out(Type.INTEGER, Integer.class)
2924 .out(Type.INTEGER, Integer.class).input(0, 10, 20)
2925 .blockingForEach(System.out::println);
2926 }
2927
2928 @Test
2929 public void testCallableApiReturningTwoOutParametersTransacted() throws InterruptedException {
2930 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2931 db
2932 .call("call in1out2(?,?,?)")
2933 .transacted()
2934 .in()
2935 .out(Type.INTEGER, Integer.class)
2936 .out(Type.INTEGER, Integer.class)
2937 .input(0, 10, 20)
2938 .flatMap(Tx.flattenToValuesOnly())
2939 .test()
2940 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2941 .assertValueCount(3)
2942 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1)
2943 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11)
2944 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21)
2945 .assertComplete();
2946
2947 db.call("call in1out2(?,?,?)")
2948 .in()
2949 .out(Type.INTEGER, Integer.class)
2950 .out(Type.INTEGER, Integer.class)
2951 .input(0, 10, 20)
2952 .blockingForEach(System.out::println);
2953 }
2954
2955 @Test
2956 public void testCallableApiReturningThreeOutParameters() throws InterruptedException {
2957 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2958 db
2959 .call("call in1out3(?,?,?,?)")
2960 .in()
2961 .out(Type.INTEGER, Integer.class)
2962 .out(Type.INTEGER, Integer.class)
2963 .out(Type.INTEGER, Integer.class)
2964 .input(0, 10, 20)
2965 .test()
2966 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2967 .assertValueCount(3)
2968 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1 && x._3() == 2)
2969 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11 && x._3() == 12)
2970 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21 && x._3() == 22)
2971 .assertComplete();
2972 }
2973
2974 @Test
2975 public void testCallableApiReturningThreeOutParametersTransacted() throws InterruptedException {
2976 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2977 db
2978 .call("call in1out3(?,?,?,?)")
2979 .transacted()
2980 .in()
2981 .out(Type.INTEGER, Integer.class)
2982 .out(Type.INTEGER, Integer.class)
2983 .out(Type.INTEGER, Integer.class)
2984 .input(0, 10, 20)
2985 .flatMap(Tx.flattenToValuesOnly())
2986 .test()
2987 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2988 .assertValueCount(3)
2989 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1 && x._3() == 2)
2990 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11 && x._3() == 12)
2991 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21 && x._3() == 22)
2992 .assertComplete();
2993 }
2994
2995 @Test
2996 public void testCallableApiReturningOneResultSet() throws InterruptedException {
2997 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2998 db
2999 .call("call in0out0rs1()")
3000 .autoMap(Person2.class)
3001 .input(0, 10, 20)
3002 .doOnNext(x -> {
3003 assertTrue(x.outs().isEmpty());
3004 })
3005 .flatMap(x -> x.results())
3006 .test()
3007 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3008 .assertValueAt(0, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3009 .assertValueAt(1, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3010 .assertValueAt(2, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3011 .assertValueAt(3, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3012 .assertValueAt(4, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3013 .assertValueAt(5, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3014 .assertValueCount(6)
3015 .assertComplete();
3016 }
3017
3018 @Test
3019 public void testCallableApiReturningOneResultSetTransacted() throws InterruptedException {
3020 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3021 db
3022 .call("call in0out0rs1()")
3023 .transacted()
3024 .autoMap(Person2.class)
3025 .input(0, 10, 20)
3026 .flatMap(Tx.flattenToValuesOnly()).doOnNext(x -> {
3027 assertTrue(x.outs().isEmpty());
3028 })
3029 .flatMap(x -> x.results())
3030 .test()
3031 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3032 .assertValueAt(0, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3033 .assertValueAt(1, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3034 .assertValueAt(2, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3035 .assertValueAt(3, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3036 .assertValueAt(4, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3037 .assertValueAt(5, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3038 .assertValueCount(6)
3039 .assertComplete();
3040 }
3041
3042 @Test
3043 public void testCallableApiReturningTwoResultSetsWithAutoMap() throws InterruptedException {
3044 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3045 db
3046 .call("call in1out0rs2(?)")
3047 .in()
3048 .autoMap(Person2.class)
3049 .autoMap(Person2.class)
3050 .input(0, 10, 20)
3051 .doOnNext(x -> assertTrue(x.outs().isEmpty()))
3052 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name()))
3053 .test()
3054 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3055 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3056 "SARAHFRED")
3057 .assertComplete();
3058 }
3059
3060 @Test
3061 public void testCallableApiReturningTwoResultSetsWithAutoMapTransacted()
3062 throws InterruptedException {
3063 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3064 db
3065 .call("call in1out0rs2(?)")
3066 .transacted()
3067 .in()
3068 .autoMap(Person2.class)
3069 .autoMap(Person2.class)
3070 .input(0, 10, 20)
3071 .flatMap(Tx.flattenToValuesOnly())
3072 .doOnNext(x -> assertTrue(x.outs().isEmpty()))
3073 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name()))
3074 .test()
3075 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3076 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3077 "SARAHFRED")
3078 .assertComplete();
3079 }
3080
3081 @Test
3082 public void testCallableApiReturningTwoResultSetsWithGet() throws InterruptedException {
3083 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3084 db
3085 .call("call in1out0rs2(?)")
3086 .in()
3087 .getAs(String.class, Integer.class)
3088 .getAs(String.class, Integer.class)
3089 .input(0, 10, 20)
3090 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y._1() + z._1()))
3091 .test()
3092 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3093 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3094 "SARAHFRED")
3095 .assertComplete();
3096 }
3097
3098 @Test
3099 public void testCallableApiReturningTwoResultSetsWithGetTransacted()
3100 throws InterruptedException {
3101 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3102 db
3103 .call("call in1out0rs2(?)")
3104 .transacted()
3105 .in()
3106 .getAs(String.class, Integer.class)
3107 .getAs(String.class, Integer.class)
3108 .input(0, 10, 20)
3109 .flatMap(Tx.flattenToValuesOnly())
3110 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y._1() + z._1()))
3111 .test()
3112 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3113 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3114 "SARAHFRED")
3115 .assertComplete();
3116 }
3117
3118 @Test
3119 public void testCallableApiReturningTwoResultSetsSwitchOrder1() throws InterruptedException {
3120 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3121 db
3122 .call("call in1out0rs2(?)")
3123 .autoMap(Person2.class)
3124 .in()
3125 .autoMap(Person2.class)
3126 .input(0, 10, 20)
3127 .doOnNext(x -> assertTrue(x.outs().isEmpty()))
3128 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name()))
3129 .test()
3130 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3131 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3132 "SARAHFRED")
3133 .assertComplete();
3134 }
3135
3136 @Test
3137 public void testCallableApiReturningTwoResultSetsSwitchOrder1Transacted()
3138 throws InterruptedException {
3139 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3140 db
3141 .call("call in1out0rs2(?)")
3142 .transacted()
3143 .autoMap(Person2.class)
3144 .in()
3145 .autoMap(Person2.class)
3146 .input(0, 10, 20)
3147 .flatMap(Tx.flattenToValuesOnly())
3148 .doOnNext(x -> assertTrue(x.outs().isEmpty()))
3149 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name()))
3150 .test()
3151 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3152 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3153 "SARAHFRED")
3154 .assertComplete();
3155 }
3156
3157 @Test
3158 public void testCallableApiReturningTwoResultSetsSwitchOrder2() throws InterruptedException {
3159 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3160 db
3161 .call("call in1out0rs2(?)")
3162 .autoMap(Person2.class)
3163 .autoMap(Person2.class)
3164 .in()
3165 .input(0, 10, 20)
3166 .doOnNext(x -> assertTrue(x.outs().isEmpty()))
3167 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name()))
3168 .test()
3169 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3170 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3171 "SARAHFRED")
3172 .assertComplete();
3173 }
3174
3175 @Test
3176 public void testCallableApiReturningTwoResultSetsSwitchOrder2Transacted()
3177 throws InterruptedException {
3178 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3179 db
3180 .call("call in1out0rs2(?)")
3181 .transacted()
3182 .autoMap(Person2.class)
3183 .autoMap(Person2.class)
3184 .in()
3185 .input(0, 10, 20)
3186 .flatMap(Tx.flattenToValuesOnly())
3187 .doOnNext(x -> assertTrue(x.outs().isEmpty()))
3188 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name()))
3189 .test()
3190 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3191 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3192 "SARAHFRED")
3193 .assertComplete();
3194 }
3195
3196 @Test
3197 public void testCallableApiReturningTwoOutputThreeResultSets() throws InterruptedException {
3198 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3199 db
3200 .call("call in0out2rs3(?, ?)")
3201 .out(Type.INTEGER, Integer.class)
3202 .out(Type.INTEGER, Integer.class)
3203 .autoMap(Person2.class)
3204 .autoMap(Person2.class)
3205 .autoMap(Person2.class)
3206 .input(0, 10, 20)
3207 .doOnNext(x -> {
3208 assertEquals(2, x.outs().size());
3209 assertEquals(1, x.outs().get(0));
3210 assertEquals(2, x.outs().get(1));
3211 })
3212 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())
3213 .zipWith(x.results3(), (y, z) -> y + z.name()))
3214 .test()
3215 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3216 .assertNoErrors()
3217 .assertValues("FREDSARAHFRED", "SARAHFREDSARAH", "FREDSARAHFRED", "SARAHFREDSARAH",
3218 "FREDSARAHFRED", "SARAHFREDSARAH")
3219 .assertComplete();
3220 }
3221
3222 @Test
3223 public void testCallableApiReturningTwoOutputThreeResultSetsTransacted()
3224 throws InterruptedException {
3225 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3226 db
3227 .call("call in0out2rs3(?, ?)")
3228 .transacted()
3229 .out(Type.INTEGER, Integer.class)
3230 .out(Type.INTEGER, Integer.class)
3231 .autoMap(Person2.class)
3232 .autoMap(Person2.class)
3233 .autoMap(Person2.class)
3234 .input(0, 10, 20)
3235 .flatMap(Tx.flattenToValuesOnly())
3236 .doOnNext(x -> {
3237 assertEquals(2, x.outs().size());
3238 assertEquals(1, x.outs().get(0));
3239 assertEquals(2, x.outs().get(1));
3240 })
3241 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())
3242 .zipWith(x.results3(), (y, z) -> y + z.name()))
3243 .test()
3244 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3245 .assertNoErrors()
3246 .assertValues("FREDSARAHFRED", "SARAHFREDSARAH", "FREDSARAHFRED", "SARAHFREDSARAH",
3247 "FREDSARAHFRED", "SARAHFREDSARAH")
3248 .assertComplete();
3249 }
3250
3251 @Test
3252 public void testCallableApiReturningTenOutParameters() {
3253 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3254 db
3255 .call("call out10(?,?,?,?,?,?,?,?,?,?)")
3256 .out(Type.INTEGER, Integer.class)
3257 .out(Type.INTEGER, Integer.class)
3258 .out(Type.INTEGER, Integer.class)
3259 .out(Type.INTEGER, Integer.class)
3260 .out(Type.INTEGER, Integer.class)
3261 .out(Type.INTEGER, Integer.class)
3262 .out(Type.INTEGER, Integer.class)
3263 .out(Type.INTEGER, Integer.class)
3264 .out(Type.INTEGER, Integer.class)
3265 .out(Type.INTEGER, Integer.class)
3266 .input(0, 10)
3267 .test()
3268 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3269 .assertNoErrors()
3270 .assertValueCount(2)
3271 .assertComplete();
3272 }
3273
3274 @Test
3275 public void testCallableApiReturningTenOutParametersTransacted() {
3276 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3277 db
3278 .call("call out10(?,?,?,?,?,?,?,?,?,?)")
3279 .transacted()
3280 .out(Type.INTEGER, Integer.class)
3281 .out(Type.INTEGER, Integer.class)
3282 .out(Type.INTEGER, Integer.class)
3283 .out(Type.INTEGER, Integer.class)
3284 .out(Type.INTEGER, Integer.class)
3285 .out(Type.INTEGER, Integer.class)
3286 .out(Type.INTEGER, Integer.class)
3287 .out(Type.INTEGER, Integer.class)
3288 .out(Type.INTEGER, Integer.class)
3289 .out(Type.INTEGER, Integer.class)
3290 .input(0, 10)
3291 .flatMap(Tx.flattenToValuesOnly())
3292 .test()
3293 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3294 .assertNoErrors()
3295 .assertValueCount(2)
3296 .assertComplete();
3297 }
3298
3299 @Test
3300 public void testCallableApiReturningTenResultSets() {
3301 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3302 db
3303 .call("call rs10()")
3304 .autoMap(Person2.class)
3305 .autoMap(Person2.class)
3306 .autoMap(Person2.class)
3307 .autoMap(Person2.class)
3308 .autoMap(Person2.class)
3309 .autoMap(Person2.class)
3310 .autoMap(Person2.class)
3311 .autoMap(Person2.class)
3312 .autoMap(Person2.class)
3313 .autoMap(Person2.class)
3314 .input(0, 10)
3315 .doOnNext(x -> {
3316 assertEquals(0, x.outs().size());
3317 assertEquals(10, x.results().size());
3318 })
3319
3320 .flatMap(x -> x.results(0)
3321 .zipWith(x.results(9),
3322 (y, z) -> ((Person2) y).name() + ((Person2) z).name()))
3323 .test()
3324 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3325 .assertNoErrors()
3326 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED")
3327 .assertComplete();
3328 }
3329
3330 @Test
3331 public void testCallableApiReturningTenResultSetsTransacted() {
3332 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3333 db
3334 .call("call rs10()")
3335 .transacted()
3336 .autoMap(Person2.class)
3337 .autoMap(Person2.class)
3338 .autoMap(Person2.class)
3339 .autoMap(Person2.class)
3340 .autoMap(Person2.class)
3341 .autoMap(Person2.class)
3342 .autoMap(Person2.class)
3343 .autoMap(Person2.class)
3344 .autoMap(Person2.class)
3345 .autoMap(Person2.class)
3346 .input(0, 10)
3347 .flatMap(Tx.flattenToValuesOnly())
3348 .doOnNext(x -> {
3349 assertEquals(0, x.outs().size());
3350 assertEquals(10, x.results().size());
3351 })
3352
3353 .flatMap(x -> x.results(0)
3354 .zipWith(x.results(9),
3355 (y, z) -> ((Person2) y).name() + ((Person2) z).name()))
3356 .test()
3357 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3358 .assertNoErrors()
3359 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED")
3360 .assertComplete();
3361 }
3362
3363 @Test
3364 public void testCallableApiReturningOneResultSetGetAs() throws InterruptedException {
3365 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3366 db
3367 .call("call in0out0rs1()")
3368 .getAs(String.class, Integer.class)
3369 .input(1)
3370 .flatMap(x -> x.results())
3371 .test()
3372 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3373 .assertValueAt(0, p -> "FRED".equalsIgnoreCase(p._1()) && p._2() == 24)
3374 .assertValueAt(1, p -> "SARAH".equalsIgnoreCase(p._1()) && p._2() == 26)
3375 .assertComplete();
3376 }
3377
3378 @Test
3379 public void testH2InClauseWithoutSetArray() {
3380 db().apply(con -> {
3381 try (PreparedStatement ps = con
3382 .prepareStatement("select count(*) from person where name in (?, ?)")) {
3383 ps.setString(1, "FRED");
3384 ps.setString(2, "JOSEPH");
3385 ResultSet rs = ps.executeQuery();
3386 rs.next();
3387 return rs.getInt(1);
3388 }
3389 }).test()
3390 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS).assertComplete()
3391 .assertValue(2);
3392 }
3393
3394 @Test
3395 @Ignore
3396 public void testH2InClauseWithSetArray() {
3397 db().apply(con -> {
3398 try (PreparedStatement ps = con
3399 .prepareStatement("select count(*) from person where name in (?)")) {
3400 ps.setArray(1, con.createArrayOf("VARCHAR", new String[] { "FRED", "JOSEPH" }));
3401 ResultSet rs = ps.executeQuery();
3402 rs.next();
3403 return rs.getInt(1);
3404 }
3405 }).test()
3406 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS).assertComplete()
3407 .assertValue(2);
3408 }
3409
3410 @Test
3411 public void testUpdateTxPerformed() {
3412 Database db = db(1);
3413 db.update("update person set score = 1000")
3414 .transacted()
3415 .counts()
3416 .test()
3417 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3418 .assertComplete()
3419 .assertValueCount(2);
3420
3421 db.select("select count(*) from person where score=1000")
3422 .getAs(Integer.class)
3423 .test()
3424 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3425 .assertComplete()
3426 .assertValue(3);
3427
3428 }
3429
3430 @Test
3431 public void testIssue20AutoCommitEnabledAndConnectionThrowsOnCommit() {
3432 ConnectionProvider cp = DatabaseCreator.connectionProvider();
3433 Database db = Database.fromBlocking(new ConnectionProvider() {
3434
3435 @Override
3436 public Connection get() {
3437 Connection c = cp.get();
3438 try {
3439 c.setAutoCommit(true);
3440 } catch (SQLException e) {
3441 throw new SQLRuntimeException(e);
3442 }
3443 return new DelegatedConnection() {
3444
3445 @Override
3446 public Connection con() {
3447 return c;
3448 }
3449
3450 @Override
3451 public void commit() throws SQLException {
3452 System.out.println("COMMITTING");
3453 if (this.getAutoCommit()) {
3454 throw new SQLException("cannot commit when autoCommit is true");
3455 } else {
3456 con().commit();
3457 }
3458 }
3459
3460 };
3461 }
3462
3463 @Override
3464 public void close() {
3465
3466 }
3467 });
3468 db.update("insert into note(text) values(?)")
3469 .parameters("HI", "THERE")
3470 .returnGeneratedKeys()
3471 .getAs(Integer.class)
3472 .test()
3473 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3474 .assertValues(1, 2)
3475 .assertComplete();
3476 }
3477
3478 @Test
3479 public void testMapInTransactionIssue35() {
3480 Database.test()
3481 .select(Person10.class)
3482 .transacted()
3483 .valuesOnly()
3484 .get()
3485 .map(p -> p.name())
3486 .blockingForEach(System.out::println);
3487 }
3488
3489 private static final class Plugins {
3490
3491 private static final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
3492
3493 static void reset() {
3494 list.clear();
3495 RxJavaPlugins.setErrorHandler(e -> list.add(e));
3496 }
3497
3498 static Throwable getSingleError() {
3499 assertEquals(1, list.size());
3500 RxJavaPlugins.reset();
3501 return list.get(0);
3502 }
3503 }
3504
3505 @Test
3506 public void testIssue27ConnectionErrorReportedToRxJavaPlugins() {
3507 try (Database db = Database.nonBlocking()
3508 .url("jdbc:driverdoesnotexist://doesnotexist:1527/notThere").build()) {
3509 Plugins.reset();
3510 db.select("select count(*) from person")
3511 .getAs(Long.class)
3512 .test()
3513 .awaitDone(TIMEOUT_SECONDS / 5, TimeUnit.SECONDS)
3514 .assertNoValues()
3515 .assertNotTerminated()
3516 .cancel();
3517 Throwable e = Plugins.getSingleError();
3518 assertTrue(e instanceof UndeliverableException);
3519 assertTrue(e.getMessage().toLowerCase().contains("no suitable driver"));
3520 }
3521 }
3522
3523 @Test
3524 public void testAutoMapInTransactionIssue35() {
3525 try (Database db = Database.test()) {
3526 db.select(Person10.class)
3527 .transacted()
3528 .valuesOnly()
3529 .get(x -> x.name())
3530 .sorted()
3531 .test()
3532 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3533 .assertValues("FRED", "JOSEPH", "MARMADUKE")
3534 .assertComplete();
3535 }
3536 }
3537
3538 public interface PersonWithDefaultMethod {
3539 @Column
3540 String name();
3541
3542 @Column
3543 int score();
3544
3545 public default String nameLower() {
3546 return name().toLowerCase();
3547 }
3548 }
3549
3550 interface PersonWithDefaultMethodNonPublic {
3551 @Column
3552 String name();
3553
3554 @Column
3555 int score();
3556
3557 default String nameLower() {
3558 return name().toLowerCase();
3559 }
3560 }
3561
3562 interface PersonDistinct1 {
3563 @Column
3564 String name();
3565
3566 @Column
3567 int score();
3568
3569 }
3570
3571 interface PersonDistinct2 {
3572 @Column
3573 String name();
3574
3575 @Column
3576 int score();
3577
3578 }
3579
3580 interface Score {
3581 @Column
3582 int score();
3583 }
3584
3585 interface Person {
3586 @Column
3587 String name();
3588 }
3589
3590 interface Person2 {
3591 @Column
3592 String name();
3593
3594 @Column
3595 int score();
3596 }
3597
3598 interface Person3 {
3599 @Column("name")
3600 String fullName();
3601
3602 @Column("score")
3603 int examScore();
3604 }
3605
3606 interface Person4 {
3607 @Column("namez")
3608 String fullName();
3609
3610 @Column("score")
3611 int examScore();
3612 }
3613
3614 interface Person5 {
3615 @Index(1)
3616 String fullName();
3617
3618 @Index(2)
3619 int examScore();
3620 }
3621
3622 interface Person6 {
3623 @Index(1)
3624 String fullName();
3625
3626 @Index(3)
3627 int examScore();
3628 }
3629
3630 interface Person7 {
3631 @Index(1)
3632 String fullName();
3633
3634 @Index(0)
3635 int examScore();
3636 }
3637
3638 interface Person8 {
3639 @Column
3640 int name();
3641 }
3642
3643 interface Person9 {
3644 @Column
3645 String name();
3646
3647 @Index(2)
3648 int score();
3649 }
3650
3651 interface PersonNoAnnotation {
3652 String name();
3653 }
3654
3655 @Query("select name, score from person order by name")
3656 interface Person10 {
3657
3658 @Column
3659 String name();
3660
3661 @Column
3662 int score();
3663 }
3664
3665 }