1 package org.davidmoten.rxjava3.jdbc;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertTrue;
7
8 import java.io.ByteArrayInputStream;
9 import java.io.ByteArrayOutputStream;
10 import java.io.File;
11 import java.io.FileNotFoundException;
12 import java.io.FileReader;
13 import java.io.IOException;
14 import java.io.InputStream;
15 import java.io.Reader;
16 import java.sql.Blob;
17 import java.sql.CallableStatement;
18 import java.sql.Clob;
19 import java.sql.Connection;
20 import java.sql.PreparedStatement;
21 import java.sql.ResultSet;
22 import java.sql.SQLException;
23 import java.sql.SQLSyntaxErrorException;
24 import java.sql.Statement;
25 import java.sql.Timestamp;
26 import java.sql.Types;
27 import java.time.Instant;
28 import java.time.ZoneOffset;
29 import java.time.ZonedDateTime;
30 import java.util.Arrays;
31 import java.util.Calendar;
32 import java.util.Date;
33 import java.util.GregorianCalendar;
34 import java.util.HashSet;
35 import java.util.List;
36 import java.util.Optional;
37 import java.util.Set;
38 import java.util.concurrent.CopyOnWriteArrayList;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.Executors;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicInteger;
45
46 import org.davidmoten.rxjava3.jdbc.annotations.Column;
47 import org.davidmoten.rxjava3.jdbc.annotations.Index;
48 import org.davidmoten.rxjava3.jdbc.annotations.Query;
49 import org.davidmoten.rxjava3.jdbc.exceptions.AnnotationsNotFoundException;
50 import org.davidmoten.rxjava3.jdbc.exceptions.AutomappedInterfaceInaccessibleException;
51 import org.davidmoten.rxjava3.jdbc.exceptions.CannotForkTransactedConnection;
52 import org.davidmoten.rxjava3.jdbc.exceptions.ColumnIndexOutOfRangeException;
53 import org.davidmoten.rxjava3.jdbc.exceptions.ColumnNotFoundException;
54 import org.davidmoten.rxjava3.jdbc.exceptions.MoreColumnsRequestedThanExistException;
55 import org.davidmoten.rxjava3.jdbc.exceptions.NamedParameterFoundButSqlDoesNotHaveNamesException;
56 import org.davidmoten.rxjava3.jdbc.exceptions.NamedParameterMissingException;
57 import org.davidmoten.rxjava3.jdbc.exceptions.QueryAnnotationMissingException;
58 import org.davidmoten.rxjava3.jdbc.exceptions.SQLRuntimeException;
59 import org.davidmoten.rxjava3.jdbc.internal.DelegatedConnection;
60 import org.davidmoten.rxjava3.jdbc.pool.DatabaseCreator;
61 import org.davidmoten.rxjava3.jdbc.pool.DatabaseType;
62 import org.davidmoten.rxjava3.jdbc.pool.NonBlockingConnectionPool;
63 import org.davidmoten.rxjava3.jdbc.pool.Pools;
64 import org.davidmoten.rxjava3.jdbc.tuple.Tuple2;
65 import org.davidmoten.rxjava3.jdbc.tuple.Tuple3;
66 import org.davidmoten.rxjava3.jdbc.tuple.Tuple4;
67 import org.davidmoten.rxjava3.jdbc.tuple.Tuple5;
68 import org.davidmoten.rxjava3.jdbc.tuple.Tuple6;
69 import org.davidmoten.rxjava3.jdbc.tuple.Tuple7;
70 import org.davidmoten.rxjava3.jdbc.tuple.TupleN;
71 import org.davidmoten.rxjava3.pool.PoolClosedException;
72 import org.h2.jdbc.JdbcSQLSyntaxErrorException;
73 import org.hsqldb.jdbc.JDBCBlobFile;
74 import org.hsqldb.jdbc.JDBCClobFile;
75 import org.junit.Assert;
76 import org.junit.FixMethodOrder;
77 import org.junit.Ignore;
78 import org.junit.Test;
79 import org.junit.runners.MethodSorters;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
82
83 import com.github.davidmoten.guavamini.Lists;
84 import com.github.davidmoten.guavamini.Sets;
85
86 import io.reactivex.rxjava3.core.Completable;
87 import io.reactivex.rxjava3.core.Flowable;
88 import io.reactivex.rxjava3.core.Observable;
89 import io.reactivex.rxjava3.core.Scheduler;
90 import io.reactivex.rxjava3.core.Single;
91 import io.reactivex.rxjava3.exceptions.UndeliverableException;
92 import io.reactivex.rxjava3.functions.Predicate;
93 import io.reactivex.rxjava3.plugins.RxJavaPlugins;
94 import io.reactivex.rxjava3.schedulers.Schedulers;
95 import io.reactivex.rxjava3.schedulers.TestScheduler;
96 import io.reactivex.rxjava3.subscribers.TestSubscriber;
97
98 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
99 public class DatabaseTest {
100
101 private static final long FRED_REGISTERED_TIME = 1442515672690L;
102 private static final int NAMES_COUNT_BIG = 5163;
103 private static final Logger log = LoggerFactory.getLogger(DatabaseTest.class);
104 private static final int TIMEOUT_SECONDS = 10;
105
106 private static Database db() {
107 return DatabaseCreator.create(1);
108 }
109
110 private static Database blocking() {
111 return DatabaseCreator.createBlocking();
112 }
113
114 private static Database db(int poolSize) {
115 return DatabaseCreator.create(poolSize);
116 }
117
118 private static Database big(int poolSize) {
119 return DatabaseCreator.create(poolSize, true, Schedulers.computation());
120 }
121
122 @Test
123 public void testSelectUsingQuestionMark() {
124 try (Database db = db()) {
125 db.select("select score from person where name=?")
126 .parameters("FRED", "JOSEPH")
127 .getAs(Integer.class)
128 .test()
129 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
130 .assertNoErrors()
131 .assertValues(21, 34)
132 .assertComplete();
133 }
134 }
135
136 @Test
137 public void testDemonstrateDbClosureOnTerminate() {
138 Database db = db();
139 db.select("select score from person where name=?")
140 .parameters("FRED", "JOSEPH")
141 .getAs(Integer.class)
142 .doOnTerminate(() -> db.close())
143 .test()
144 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
145 .assertNoErrors()
146 .assertValues(21, 34)
147 .assertComplete();
148 }
149
150 @Test
151 public void testFromDataSource() {
152 ConnectionProvider cp = DatabaseCreator.connectionProvider();
153 AtomicInteger count = new AtomicInteger();
154 Set<Connection> connections = new HashSet<>();
155 AtomicInteger closed = new AtomicInteger();
156 Database db = Database.fromBlocking(new ConnectionProvider() {
157
158 @Override
159 public Connection get() {
160 count.incrementAndGet();
161 Connection c = cp.get();
162 connections.add(c);
163 return new DelegatedConnection() {
164
165 @Override
166 public Connection con() {
167 return c;
168 }
169
170 @Override
171 public void close() {
172 closed.incrementAndGet();
173 }
174 };
175 }
176
177 @Override
178 public void close() {
179
180 }
181 });
182 db.select("select count(*) from person")
183 .getAs(Integer.class)
184 .blockingSubscribe();
185 db.select("select count(*) from person")
186 .getAs(Integer.class)
187 .blockingSubscribe();
188 assertEquals(2, count.get());
189 assertEquals(2, connections.size());
190 assertEquals(2, closed.get());
191 }
192
193 @Test
194 public void testBlockingForEachWhenError() {
195 try {
196 Flowable
197 .error(new RuntimeException("boo"))
198 .blockingForEach(x -> log.debug(x.toString()));
199 } catch (RuntimeException e) {
200 assertEquals("boo", e.getMessage());
201 }
202 }
203
204 @Test
205 public void testSelectUsingQuestionMarkAndInClauseIssue10() {
206 Database.test()
207 .select("select score from person where name in (?) order by score")
208 .parameters("FRED", "JOSEPH")
209 .getAs(Integer.class)
210 .test()
211 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
212 .assertNoErrors()
213 .assertValues(21, 34)
214 .assertComplete();
215 }
216
217 @Test
218 public void testSelectUsingQuestionMarkAndInClauseWithSetParameter() {
219 Database.test()
220 .select("select score from person where name in (?) order by score")
221 .parameter(Sets.newHashSet("FRED", "JOSEPH"))
222 .getAs(Integer.class)
223 .test()
224 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
225 .assertNoErrors()
226 .assertValues(21, 34)
227 .assertComplete();
228 }
229
230 @Test
231 public void testUpdateWithInClauseBatchSize0() {
232 Database.test()
233 .update("update person set score=50 where name in (?)")
234 .batchSize(0)
235 .parameter(Sets.newHashSet("FRED", "JOSEPH"))
236 .counts()
237 .test()
238 .awaitDone(TIMEOUT_SECONDS, TimeUnit.DAYS)
239 .assertComplete()
240 .assertValues(2);
241 }
242
243 @Test
244 public void testUpdateWithInClauseBatchSize10() {
245 Database.test()
246 .update("update person set score=50 where name in (?)")
247 .batchSize(10)
248 .parameter(Sets.newHashSet("FRED", "JOSEPH"))
249 .counts()
250 .test()
251 .awaitDone(TIMEOUT_SECONDS, TimeUnit.DAYS)
252 .assertComplete()
253 .assertValues(2);
254 }
255
256 @Test
257 public void testSelectUsingQuestionMarkAndInClauseWithSetParameterUsingParametersMethod() {
258 Database.test()
259 .select("select score from person where name in (?) order by score")
260 .parameters(Sets.newHashSet("FRED", "JOSEPH"))
261 .getAs(Integer.class)
262 .test()
263 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
264 .assertNoErrors()
265 .assertValues(21, 34)
266 .assertComplete();
267 }
268
269 @Test
270 public void testSelectUsingInClauseWithListParameter() {
271 Database.test()
272 .select("select score from person where score > ? and name in (?) order by score")
273 .parameters(0, Lists.newArrayList("FRED", "JOSEPH"))
274 .getAs(Integer.class)
275 .test()
276 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
277 .assertNoErrors()
278 .assertValues(21, 34)
279 .assertComplete();
280 }
281
282 @Test
283 public void testSelectUsingInClauseWithNamedListParameter() {
284 Database.test()
285 .select("select score from person where score > :score and name in (:names) order by score")
286 .parameter("score", 0)
287 .parameter("names", Lists.newArrayList("FRED", "JOSEPH"))
288 .getAs(Integer.class)
289 .test()
290 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
291 .assertNoErrors()
292 .assertValues(21, 34)
293 .assertComplete();
294 }
295
296 @Test
297 public void testSelectUsingInClauseWithRepeatedNamedListParameter() {
298 Database.test()
299 .select("select score from person where name in (:names) and name in (:names) order by score")
300 .parameter("names", Lists.newArrayList("FRED", "JOSEPH"))
301 .getAs(Integer.class)
302 .test()
303 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
304 .assertNoErrors()
305 .assertValues(21, 34)
306 .assertComplete();
307 }
308
309 @Test
310 public void testSelectUsingInClauseWithRepeatedNamedListParameterAndRepeatedNonListParameter() {
311 Database.test()
312 .select("select score from person where name in (:names) and score >:score and name in (:names) and score >:score order by score")
313 .parameter("names", Lists.newArrayList("FRED", "JOSEPH"))
314 .parameter("score", 0)
315 .getAs(Integer.class)
316 .test()
317 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
318 .assertNoErrors()
319 .assertValues(21, 34)
320 .assertComplete();
321 }
322
323 @Test
324 public void testSelectUsingNamedParameterList() {
325 try (Database db = db()) {
326 db.select("select score from person where name=:name")
327 .parameters(Parameter.named("name", "FRED").value("JOSEPH").list())
328 .getAs(Integer.class)
329 .test()
330 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
331 .assertNoErrors()
332 .assertValues(21, 34)
333 .assertComplete();
334 }
335 }
336
337 @Test
338 public void testSelectUsingQuestionMarkFlowableParameters() {
339 try (Database db = db()) {
340 db.select("select score from person where name=?")
341 .parameterStream(Flowable.just("FRED", "JOSEPH"))
342 .getAs(Integer.class)
343 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
344 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
345 .assertNoErrors()
346 .assertValues(21, 34)
347 .assertComplete();
348 }
349 }
350
351 @Test
352 public void testSelectUsingQuestionMarkFlowableParametersInLists() {
353 try (Database db = db()) {
354 db.select("select score from person where name=?")
355 .parameterListStream(
356 Flowable.just(Arrays.asList("FRED"), Arrays.asList("JOSEPH")))
357 .getAs(Integer.class)
358 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
359 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
360 .assertNoErrors()
361 .assertValues(21, 34)
362 .assertComplete();
363 }
364 }
365
366 @Test
367 public void testDrivingSelectWithoutParametersUsingParameterStream() {
368 try (Database db = db()) {
369 db.select("select count(*) from person")
370 .parameters(1, 2, 3)
371 .getAs(Integer.class)
372 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
373 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
374 .assertValues(3, 3, 3)
375 .assertComplete();
376 }
377 }
378
379 @Test
380 public void testSelectUsingQuestionMarkFlowableParametersTwoParametersPerQuery() {
381 try (Database db = db()) {
382 db.select("select score from person where name=? and score = ?")
383 .parameterStream(Flowable.just("FRED", 21, "JOSEPH", 34))
384 .getAs(Integer.class)
385 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
386 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
387 .assertNoErrors()
388 .assertValues(21, 34)
389 .assertComplete();
390 }
391 }
392
393 @Test
394 public void testSelectUsingQuestionMarkFlowableParameterListsTwoParametersPerQuery() {
395 try (Database db = db()) {
396 db.select("select score from person where name=? and score = ?")
397 .parameterListStream(
398 Flowable.just(Arrays.asList("FRED", 21), Arrays.asList("JOSEPH", 34)))
399 .getAs(Integer.class)
400 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
401 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
402 .assertNoErrors()
403 .assertValues(21, 34)
404 .assertComplete();
405 }
406 }
407
408 @Test
409 public void testSelectUsingQuestionMarkWithPublicTestingDatabase() {
410 try (Database db = Database.test()) {
411 db
412 .select("select score from person where name=?")
413 .parameters("FRED", "JOSEPH")
414 .getAs(Integer.class)
415 .test()
416 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
417 .assertNoErrors()
418 .assertValues(21, 34)
419 .assertComplete();
420 }
421 }
422
423 @Test
424 public void testSelectWithFetchSize() {
425 try (Database db = db()) {
426 db.select("select score from person order by name")
427 .fetchSize(2)
428 .getAs(Integer.class)
429 .test()
430 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
431 .assertNoErrors()
432 .assertValues(21, 34, 25)
433 .assertComplete();
434 }
435 }
436
437 @Test
438 public void testSelectWithFetchSizeZero() {
439 try (Database db = db()) {
440 db.select("select score from person order by name")
441 .fetchSize(0)
442 .getAs(Integer.class)
443 .test()
444 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
445 .assertNoErrors()
446 .assertValues(21, 34, 25)
447 .assertComplete();
448 }
449 }
450
451 @Test(expected = IllegalArgumentException.class)
452 public void testSelectWithFetchSizeNegative() {
453 try (Database db = db()) {
454 db.select("select score from person order by name")
455 .fetchSize(-1);
456 }
457 }
458
459 @Test
460 public void testSelectUsingNonBlockingBuilder() {
461 NonBlockingConnectionPool pool = Pools
462 .nonBlocking()
463 .connectionProvider(DatabaseCreator.connectionProvider())
464 .maxIdleTime(1, TimeUnit.MINUTES)
465 .idleTimeBeforeHealthCheck(1, TimeUnit.MINUTES)
466 .connectionRetryInterval(1, TimeUnit.SECONDS)
467 .healthCheck(c -> c.prepareStatement("select 1").execute())
468 .maxPoolSize(3)
469 .build();
470
471 try (Database db = Database.from(pool)) {
472 db.select("select score from person where name=?")
473 .parameters("FRED", "JOSEPH")
474 .getAs(Integer.class)
475 .test()
476 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
477 .assertNoErrors()
478 .assertValues(21, 34)
479 .assertComplete();
480 }
481 }
482
483 @Test
484 public void testSelectSpecifyingHealthCheck() {
485 try (Database db = Database
486 .nonBlocking()
487 .connectionProvider(DatabaseCreator.connectionProvider())
488 .maxIdleTime(1, TimeUnit.MINUTES)
489 .idleTimeBeforeHealthCheck(1, TimeUnit.MINUTES)
490 .connectionRetryInterval(1, TimeUnit.SECONDS)
491 .healthCheck(DatabaseType.H2)
492 .maxPoolSize(3)
493 .build()) {
494 db.select("select score from person where name=?")
495 .parameters("FRED", "JOSEPH")
496 .getAs(Integer.class)
497 .test()
498 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
499 .assertNoErrors()
500 .assertValues(21, 34)
501 .assertComplete();
502 }
503 }
504
505 @Test(expected = IllegalArgumentException.class)
506 public void testCreateTestDatabaseWithZeroSizePoolThrows() {
507 Database.test(0);
508 }
509
510 @Test
511 public void testSelectSpecifyingHealthCheckAsSql() {
512 final AtomicInteger success = new AtomicInteger();
513 NonBlockingConnectionPool pool = Pools
514 .nonBlocking()
515 .connectionProvider(DatabaseCreator.connectionProvider())
516 .maxIdleTime(1, TimeUnit.MINUTES)
517 .healthCheck(DatabaseType.H2)
518 .idleTimeBeforeHealthCheck(1, TimeUnit.MINUTES)
519 .connectionRetryInterval(1, TimeUnit.SECONDS)
520 .healthCheck("select 1")
521 .connectionListener(error -> {
522 if (error.isPresent()) {
523 success.set(Integer.MIN_VALUE);
524 } else {
525 success.incrementAndGet();
526 }
527 })
528 .maxPoolSize(3)
529 .build();
530
531 try (Database db = Database.from(pool)) {
532 db.select("select score from person where name=?")
533 .parameters("FRED", "JOSEPH")
534 .getAs(Integer.class)
535 .test()
536 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
537 .assertNoErrors()
538 .assertValues(21, 34)
539 .assertComplete();
540 }
541 assertEquals(1, success.get());
542 }
543
544 @Test(expected = IllegalArgumentException.class)
545 public void testNonBlockingPoolWithTrampolineSchedulerThrows() {
546 Pools.nonBlocking().scheduler(Schedulers.trampoline());
547 }
548
549 @Test(timeout = 40000)
550 public void testSelectUsingNonBlockingBuilderConcurrencyTest() throws InterruptedException, TimeoutException {
551 try (Database db = db(3)) {
552 Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(50));
553 int n = 10000;
554 CountDownLatch latch = new CountDownLatch(n);
555 AtomicInteger count = new AtomicInteger();
556 for (int i = 0; i < n; i++) {
557 db.select("select score from person where name=?")
558 .parameters("FRED", "JOSEPH")
559 .getAs(Integer.class)
560 .subscribeOn(scheduler)
561 .toList()
562 .doOnSuccess(x -> {
563 if (!x.equals(Lists.newArrayList(21, 34))) {
564 throw new RuntimeException("run broken");
565 } else {
566
567 }
568 })
569 .doOnSuccess(x -> {
570 count.incrementAndGet();
571 latch.countDown();
572 })
573 .doOnError(x -> latch.countDown())
574 .subscribe();
575 }
576 if (!latch.await(20, TimeUnit.SECONDS)) {
577 throw new TimeoutException("timeout");
578 }
579 assertEquals(n, count.get());
580 }
581 }
582
583 @Test(timeout = 5000)
584 public void testSelectConcurrencyTest() throws InterruptedException, TimeoutException {
585 try (Database db = db(1)) {
586 Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(2));
587 int n = 2;
588 CountDownLatch latch = new CountDownLatch(n);
589 AtomicInteger count = new AtomicInteger();
590 for (int i = 0; i < n; i++) {
591 db.select("select score from person where name=?")
592 .parameters("FRED", "JOSEPH")
593 .getAs(Integer.class)
594 .subscribeOn(scheduler)
595 .toList()
596 .doOnSuccess(x -> {
597 if (!x.equals(Lists.newArrayList(21, 34))) {
598 throw new RuntimeException("run broken");
599 }
600 })
601 .doOnSuccess(x -> {
602 count.incrementAndGet();
603 latch.countDown();
604 })
605 .doOnError(x -> latch.countDown())
606 .subscribe();
607 log.debug("submitted " + i);
608 }
609 if (!latch.await(5000, TimeUnit.SECONDS)) {
610 throw new TimeoutException("timeout");
611 }
612 assertEquals(n, count.get());
613 }
614 }
615
616 @Test
617 public void testDatabaseClose() {
618 try (Database db = db()) {
619 db.select("select score from person where name=?")
620 .parameters("FRED", "JOSEPH")
621 .getAs(Integer.class)
622 .test()
623 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
624 .assertNoErrors()
625 .assertValues(21, 34)
626 .assertComplete();
627 }
628 }
629
630 @Test
631 public void testSelectUsingName() {
632 try (Database db = db()) {
633 db
634 .select("select score from person where name=:name")
635 .parameter("name", "FRED")
636 .parameter("name", "JOSEPH")
637 .getAs(Integer.class)
638 .test()
639 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
640 .assertValues(21, 34)
641 .assertComplete();
642 }
643 }
644
645 @Test
646 public void testSelectUsingNameNotGiven() {
647 try (Database db = db()) {
648 db
649 .select("select score from person where name=:name and name<>:name2")
650 .parameter("name", "FRED")
651 .parameter("name", "JOSEPH")
652 .getAs(Integer.class)
653 .test()
654 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
655 .assertError(NamedParameterMissingException.class)
656 .assertNoValues();
657 }
658 }
659
660 @Test
661 public void testSelectUsingParameterNameNullNameWhenSqlHasNoNames() {
662 db()
663 .select("select score from person where name=?")
664 .parameter(Parameter.create("name", "FRED"))
665 .getAs(Integer.class)
666 .test()
667 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
668 .assertError(NamedParameterFoundButSqlDoesNotHaveNamesException.class)
669 .assertNoValues();
670 }
671
672 @Test
673 public void testUpdateWithNullNamedParameter() {
674 try (Database db = db()) {
675 db
676 .update("update person set date_of_birth = :dob")
677 .parameter(Parameter.create("dob", null))
678 .counts()
679 .test()
680 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
681 .assertValue(3)
682 .assertComplete();
683 }
684 }
685
686 @Test
687 public void testUpdateWithNullParameter() {
688 try (Database db = db()) {
689 db
690 .update("update person set date_of_birth = ?")
691 .parameter(null)
692 .counts()
693 .test()
694 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
695 .assertValue(3)
696 .assertComplete();
697 }
698 }
699
700 @Test
701 public void testUpdateWithNullStreamParameter() {
702 try (Database db = db()) {
703 db
704 .update("update person set date_of_birth = ?")
705 .parameterStream(Flowable.just(Parameter.NULL))
706 .counts()
707 .test()
708 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
709 .assertValue(3)
710 .assertComplete();
711 }
712 }
713
714 @Test
715 public void testUpdateWithTestDatabaseForReadme() {
716 try (Database db = db()) {
717 db
718 .update("update person set date_of_birth = ?")
719 .parameterStream(Flowable.just(Parameter.NULL))
720 .counts()
721 .test()
722 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
723 .assertValue(3)
724 .assertComplete();
725 }
726 }
727
728 @Test
729 public void testUpdateClobWithNull() {
730 try (Database db = db()) {
731 insertNullClob(db);
732 db
733 .update("update person_clob set document = :doc")
734 .parameter("doc", Database.NULL_CLOB)
735 .counts()
736 .test()
737 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
738 .assertValue(1)
739 .assertComplete();
740 }
741 }
742
743 @Test
744 public void testUpdateClobWithClob() throws SQLException {
745 try (Database db = db()) {
746 Clob clob = new JDBCClobFile(new File("src/test/resources/big.txt"));
747 insertNullClob(db);
748 db
749 .update("update person_clob set document = :doc")
750 .parameter("doc", clob)
751 .counts()
752 .test()
753 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
754 .assertValue(1)
755 .assertComplete();
756 }
757 }
758
759 @Test
760 public void testUpdateClobWithReader() throws FileNotFoundException {
761 try (Database db = db()) {
762 Reader reader = new FileReader(new File("src/test/resources/big.txt"));
763 insertNullClob(db);
764 db
765 .update("update person_clob set document = :doc")
766 .parameter("doc", reader)
767 .counts()
768 .test()
769 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
770 .assertValue(1)
771 .assertComplete();
772 }
773 }
774
775 @Test
776 public void testUpdateBlobWithBlob() throws SQLException {
777 try (Database db = db()) {
778 Blob blob = new JDBCBlobFile(new File("src/test/resources/big.txt"));
779 insertPersonBlob(db);
780 db
781 .update("update person_blob set document = :doc")
782 .parameter("doc", blob)
783 .counts()
784 .test()
785 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
786 .assertValue(1)
787 .assertComplete();
788 }
789 }
790
791 @Test
792 public void testUpdateBlobWithNull() {
793 try (Database db = db()) {
794 insertPersonBlob(db);
795 db
796 .update("update person_blob set document = :doc")
797 .parameter("doc", Database.NULL_BLOB)
798 .counts()
799 .test()
800 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
801 .assertValue(1)
802 .assertComplete();
803 }
804 }
805
806 @Test(expected = NullPointerException.class)
807 public void testSelectUsingNullNameInParameter() {
808 try (Database db = db()) {
809 db
810 .select("select score from person where name=:name")
811 .parameter(null, "FRED");
812 }
813 }
814
815 @Test(expected = IllegalArgumentException.class)
816 public void testSelectUsingNameDoesNotExist() {
817 try (Database db = db()) {
818 db
819 .select("select score from person where name=:name")
820 .parameters("nam", "FRED");
821 }
822 }
823
824 @Test(expected = IllegalArgumentException.class)
825 public void testSelectUsingNameWithoutSpecifyingNameThrowsImmediately() {
826 try (Database db = db()) {
827 db
828 .select("select score from person where name=:name")
829 .parameters("FRED", "JOSEPH");
830 }
831 }
832
833 @Test
834 public void testSelectTransacted() {
835 try (Database db = db()) {
836 db
837 .select("select score from person where name=?")
838 .parameters("FRED", "JOSEPH")
839 .transacted()
840 .getAs(Integer.class)
841 .doOnNext(tx -> log
842 .debug(tx.isComplete() ? "complete" : String.valueOf(tx.value())))
843 .test()
844 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
845 .assertValueCount(3)
846 .assertComplete();
847 }
848 }
849
850 @Test
851 public void testSelectAutomappedAnnotatedTransacted() {
852 try (Database db = db()) {
853 db
854 .select(Person10.class)
855 .transacted()
856 .valuesOnly()
857 .get().test()
858 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
859 .assertValueCount(3)
860 .assertComplete();
861 }
862 }
863
864 @Test
865 public void testSelectAutomappedTransactedValuesOnly() {
866 try (Database db = db()) {
867 db
868 .select("select name, score from person")
869 .transacted()
870 .valuesOnly()
871 .autoMap(Person2.class)
872 .test()
873 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
874 .assertValueCount(3)
875 .assertComplete();
876 }
877 }
878
879 @Test
880 public void testSelectAutomappedTransacted() {
881 try (Database db = db()) {
882 db
883 .select("select name, score from person")
884 .transacted()
885 .autoMap(Person2.class)
886 .test()
887 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
888 .assertValueCount(4)
889 .assertComplete();
890 }
891 }
892
893 @Test
894 public void testSelectTransactedTuple2() {
895 try (Database db = db()) {
896 Tx<Tuple2<String, Integer>> t = db
897 .select("select name, score from person where name=?")
898 .parameters("FRED")
899 .transacted()
900 .getAs(String.class, Integer.class)
901 .test()
902 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
903 .assertValueCount(2)
904 .assertComplete()
905 .values().get(0);
906 assertEquals("FRED", t.value()._1());
907 assertEquals(21, (int) t.value()._2());
908 }
909 }
910
911 @Test
912 public void testSelectTransactedTuple3() {
913 try (Database db = db()) {
914 Tx<Tuple3<String, Integer, String>> t = db
915 .select("select name, score, name from person where name=?")
916 .parameters("FRED")
917 .transacted()
918 .getAs(String.class, Integer.class, String.class)
919 .test()
920 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
921 .assertValueCount(2)
922 .assertComplete()
923 .values().get(0);
924 assertEquals("FRED", t.value()._1());
925 assertEquals(21, (int) t.value()._2());
926 assertEquals("FRED", t.value()._3());
927 }
928 }
929
930 @Test
931 public void testSelectTransactedTuple4() {
932 try (Database db = db()) {
933 Tx<Tuple4<String, Integer, String, Integer>> t = db
934 .select("select name, score, name, score from person where name=?")
935 .parameters("FRED")
936 .transacted()
937 .getAs(String.class, Integer.class, String.class, Integer.class)
938 .test()
939 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
940 .assertValueCount(2)
941 .assertComplete()
942 .values().get(0);
943 assertEquals("FRED", t.value()._1());
944 assertEquals(21, (int) t.value()._2());
945 assertEquals("FRED", t.value()._3());
946 assertEquals(21, (int) t.value()._4());
947 }
948 }
949
950 @Test
951 public void testSelectTransactedTuple5() {
952 try (Database db = db()) {
953 Tx<Tuple5<String, Integer, String, Integer, String>> t = db
954 .select("select name, score, name, score, name from person where name=?")
955 .parameters("FRED")
956 .transacted()
957 .getAs(String.class, Integer.class, String.class, Integer.class, String.class)
958 .test()
959 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
960 .assertValueCount(2)
961 .assertComplete()
962 .values().get(0);
963 assertEquals("FRED", t.value()._1());
964 assertEquals(21, (int) t.value()._2());
965 assertEquals("FRED", t.value()._3());
966 assertEquals(21, (int) t.value()._4());
967 assertEquals("FRED", t.value()._5());
968 }
969 }
970
971 @Test
972 public void testSelectTransactedTuple6() {
973 try (Database db = db()) {
974 Tx<Tuple6<String, Integer, String, Integer, String, Integer>> t = db
975 .select("select name, score, name, score, name, score from person where name=?")
976 .parameters("FRED")
977 .transacted()
978 .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
979 Integer.class)
980 .test()
981 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
982 .assertValueCount(2)
983 .assertComplete()
984 .values().get(0);
985 assertEquals("FRED", t.value()._1());
986 assertEquals(21, (int) t.value()._2());
987 assertEquals("FRED", t.value()._3());
988 assertEquals(21, (int) t.value()._4());
989 assertEquals("FRED", t.value()._5());
990 assertEquals(21, (int) t.value()._6());
991 }
992 }
993
994 @Test
995 public void testSelectTransactedTuple7() {
996 try (Database db = db()) {
997 Tx<Tuple7<String, Integer, String, Integer, String, Integer, String>> t = db
998 .select("select name, score, name, score, name, score, name from person where name=?")
999 .parameters("FRED")
1000 .transacted()
1001 .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
1002 Integer.class, String.class)
1003 .test()
1004 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1005 .assertValueCount(2)
1006 .assertComplete()
1007 .values().get(0);
1008 assertEquals("FRED", t.value()._1());
1009 assertEquals(21, (int) t.value()._2());
1010 assertEquals("FRED", t.value()._3());
1011 assertEquals(21, (int) t.value()._4());
1012 assertEquals("FRED", t.value()._5());
1013 assertEquals(21, (int) t.value()._6());
1014 assertEquals("FRED", t.value()._7());
1015 }
1016 }
1017
1018 @Test
1019 public void testSelectTransactedTupleN() {
1020 try (Database db = db()) {
1021 List<Tx<TupleN<Object>>> list = db
1022 .select("select name, score from person where name=?")
1023 .parameters("FRED")
1024 .transacted()
1025 .getTupleN()
1026 .test()
1027 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1028 .assertValueCount(2)
1029 .assertComplete()
1030 .values();
1031 assertEquals("FRED", list.get(0).value().values().get(0));
1032 assertEquals(21, (int) list.get(0).value().values().get(1));
1033 assertTrue(list.get(1).isComplete());
1034 assertEquals(2, list.size());
1035 }
1036 }
1037
1038 @Test
1039 public void testSelectTransactedCount() {
1040 try (Database db = db()) {
1041 db
1042 .select("select name, score, name, score, name, score, name from person where name=?")
1043 .parameters("FRED")
1044 .transacted()
1045 .count()
1046 .test()
1047 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1048 .assertValueCount(1)
1049 .assertComplete();
1050 }
1051 }
1052
1053 @Test
1054 public void testSelectTransactedGetAs() {
1055 try (Database db = db()) {
1056 db
1057 .select("select name from person")
1058 .transacted()
1059 .getAs(String.class)
1060 .test()
1061 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1062 .assertValueCount(4)
1063 .assertComplete();
1064 }
1065 }
1066
1067 @Test
1068 public void testSelectTransactedGetAsOptional() {
1069 try (Database db = db()) {
1070 List<Tx<Optional<String>>> list = db
1071 .select("select name from person where name=?")
1072 .parameters("FRED")
1073 .transacted()
1074 .getAsOptional(String.class)
1075 .test()
1076 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1077 .assertValueCount(2)
1078 .assertComplete()
1079 .values();
1080 assertTrue(list.get(0).isValue());
1081 assertEquals("FRED", list.get(0).value().get());
1082 assertTrue(list.get(1).isComplete());
1083 }
1084 }
1085
1086 @Test
1087 public void testDatabaseFrom() {
1088 Database.from(DatabaseCreator.nextUrl(), 3)
1089 .select("select name from person")
1090 .getAs(String.class)
1091 .count()
1092 .test()
1093 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1094 .assertError(JdbcSQLSyntaxErrorException.class);
1095 }
1096
1097 @Test
1098 public void testSelectTransactedChained() throws Exception {
1099 try (Database db = db()) {
1100 db
1101 .select("select score from person where name=?")
1102 .parameters("FRED", "JOSEPH")
1103 .transacted()
1104 .transactedValuesOnly()
1105 .getAs(Integer.class)
1106 .doOnNext(tx -> log
1107 .debug(tx.isComplete() ? "complete" : String.valueOf(tx.value())))
1108 .flatMap(tx -> tx
1109 .select("select name from person where score = ?")
1110 .parameter(tx.value())
1111 .valuesOnly()
1112 .getAs(String.class))
1113 .test()
1114 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1115 .assertNoErrors()
1116 .assertValues("FRED", "JOSEPH")
1117 .assertComplete();
1118 }
1119 }
1120
1121 @Test
1122 public void databaseIsAutoCloseable() {
1123 try (Database db = db()) {
1124 log.debug(db.toString());
1125 }
1126 }
1127
1128 private static void println(Object o) {
1129 log.debug("{}", o);
1130 }
1131
1132 @Test
1133 public void testSelectChained() {
1134 try (Database db = db(1)) {
1135
1136 db.select("select score from person where name=?")
1137 .parameters("FRED", "JOSEPH")
1138 .getAs(Integer.class)
1139 .doOnNext(DatabaseTest::println)
1140 .concatMap(score -> {
1141 log.debug("score={}", score);
1142 return db
1143 .select("select name from person where score = ?")
1144 .parameter(score)
1145 .getAs(String.class)
1146 .doOnComplete(
1147 () -> log.debug("completed select where score=" + score));
1148 })
1149 .test()
1150 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1151 .assertNoErrors()
1152 .assertValues("FRED", "JOSEPH")
1153 .assertComplete();
1154 }
1155 }
1156
1157 @Test
1158 public void testReadMeFragment1() {
1159 try (Database db = db()) {
1160 db.select("select name from person")
1161 .getAs(String.class)
1162 .forEach(DatabaseTest::println);
1163 }
1164 }
1165
1166 @Test
1167 public void testReadMeFragmentColumnDoesNotExistEmitsSqlSyntaxErrorException() {
1168 try (Database db = Database.test()) {
1169 db.select("select nam from person")
1170 .getAs(String.class)
1171 .test()
1172 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1173 .assertNoValues()
1174 .assertError(SQLSyntaxErrorException.class);
1175 }
1176 }
1177
1178 @Test
1179 public void testReadMeFragmentDerbyHealthCheck() {
1180 try (Database db = Database.test()) {
1181 db.select("select 'a' from sysibm.sysdummy1")
1182 .getAs(String.class)
1183 .test()
1184 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1185 .assertValue("a")
1186 .assertComplete();
1187 }
1188 }
1189
1190 @Test
1191 public void testTupleSupport() {
1192 try (Database db = db()) {
1193 db.select("select name, score from person")
1194 .getAs(String.class, Integer.class)
1195 .forEach(DatabaseTest::println);
1196 }
1197 }
1198
1199 @Test
1200 public void testDelayedCallsAreNonBlocking() throws InterruptedException {
1201 List<String> list = new CopyOnWriteArrayList<String>();
1202 try (Database db = db(1)) {
1203 db.select("select score from person where name=?")
1204 .parameter("FRED")
1205 .getAs(Integer.class)
1206 .doOnNext(x -> Thread.sleep(1000))
1207 .subscribeOn(Schedulers.io())
1208 .subscribe();
1209 Thread.sleep(100);
1210 CountDownLatch latch = new CountDownLatch(1);
1211 db.select("select score from person where name=?")
1212 .parameter("FRED")
1213 .getAs(Integer.class)
1214 .doOnNext(x -> list.add("emitted"))
1215 .doOnNext(x -> log.debug("emitted on " + Thread.currentThread().getName()))
1216 .doOnNext(x -> latch.countDown())
1217 .subscribe();
1218 list.add("subscribed");
1219 assertTrue(latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
1220 assertEquals(Arrays.asList("subscribed", "emitted"), list);
1221 }
1222 }
1223
1224 @Test
1225 public void testAutoMapToInterface() {
1226 try (Database db = db()) {
1227 db
1228 .select("select name from person")
1229 .autoMap(Person.class)
1230 .map(p -> p.name())
1231 .test()
1232 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1233 .assertValueCount(3)
1234 .assertComplete();
1235 }
1236 }
1237
1238 @Test
1239 public void testAutoMapToInterfaceWithEnum() {
1240 try (Database db = db()) {
1241 db
1242 .select("select name from person")
1243 .autoMap(Person.class)
1244 .map(p -> p.name())
1245 .test()
1246 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1247 .assertValueCount(3)
1248 .assertComplete();
1249 }
1250 }
1251
1252 @Test
1253 public void testAutoMapToInterfaceWithoutAnnotationstsError() {
1254 try (Database db = db()) {
1255 db
1256 .select("select name from person")
1257 .autoMap(PersonNoAnnotation.class)
1258 .map(p -> p.name())
1259 .test()
1260 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1261 .assertNoValues()
1262 .assertError(AnnotationsNotFoundException.class);
1263 }
1264 }
1265
1266 @Test
1267 public void testAutoMapToInterfaceWithTwoMethods() {
1268 try (Database db = db()) {
1269 db
1270 .select("select name, score from person order by name")
1271 .autoMap(Person2.class)
1272 .firstOrError()
1273 .map(Person2::score)
1274 .test()
1275 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1276 .assertValue(21)
1277 .assertComplete();
1278 }
1279 }
1280
1281 @Test
1282 public void testAutoMapToInterfaceWithExplicitColumnName() {
1283 try (Database db = db()) {
1284 db
1285 .select("select name, score from person order by name")
1286 .autoMap(Person3.class)
1287 .firstOrError()
1288 .map(Person3::examScore)
1289 .test()
1290 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1291 .assertValue(21)
1292 .assertComplete();
1293 }
1294 }
1295
1296 @Test
1297 public void testAutoMapToInterfaceWithExplicitColumnNameThatDoesNotExist() {
1298 try (Database db = db()) {
1299 db
1300 .select("select name, score from person order by name")
1301 .autoMap(Person4.class)
1302 .firstOrError()
1303 .map(Person4::examScore)
1304 .test()
1305 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1306 .assertNoValues()
1307 .assertError(ColumnNotFoundException.class);
1308 }
1309 }
1310
1311 @Test
1312 public void testAutoMapToInterfaceWithIndex() {
1313 try (Database db = db()) {
1314 db
1315 .select("select name, score from person order by name")
1316 .autoMap(Person5.class)
1317 .firstOrError()
1318 .map(Person5::examScore)
1319 .test()
1320 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1321 .assertValue(21)
1322 .assertComplete();
1323 }
1324 }
1325
1326 @Test
1327 public void testAutoMapToInterfaceWithIndexTooLarge() {
1328 try (Database db = db()) {
1329 db
1330 .select("select name, score from person order by name")
1331 .autoMap(Person6.class)
1332 .firstOrError()
1333 .map(Person6::examScore)
1334 .test()
1335 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1336 .assertNoValues()
1337 .assertError(ColumnIndexOutOfRangeException.class);
1338 }
1339 }
1340
1341 @Test
1342 public void testAutoMapToInterfaceWithIndexTooSmall() {
1343 try (Database db = db()) {
1344 db
1345 .select("select name, score from person order by name")
1346 .autoMap(Person7.class)
1347 .firstOrError()
1348 .map(Person7::examScore)
1349 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1350 .assertNoValues()
1351 .assertError(ColumnIndexOutOfRangeException.class);
1352 }
1353 }
1354
1355 @Test
1356 public void testAutoMapWithUnmappableColumnType() {
1357 try (Database db = db()) {
1358 db
1359 .select("select name from person order by name")
1360 .autoMap(Person8.class)
1361 .map(p -> p.name())
1362 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1363 .assertNoValues()
1364 .assertError(ClassCastException.class);
1365 }
1366 }
1367
1368 @Test
1369 public void testAutoMapWithMixIndexAndName() {
1370 try (Database db = db()) {
1371 db
1372 .select("select name, score from person order by name")
1373 .autoMap(Person9.class)
1374 .firstOrError()
1375 .map(Person9::score)
1376 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1377 .assertValue(21)
1378 .assertComplete();
1379 }
1380 }
1381
1382 @Test
1383 public void testAutoMapWithQueryInAnnotation() {
1384 try (Database db = db()) {
1385 db.select(Person10.class)
1386 .get()
1387 .firstOrError()
1388 .map(Person10::score)
1389 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1390 .assertValue(21)
1391 .assertComplete();
1392 }
1393 }
1394
1395 @Test
1396 @Ignore
1397 public void testAutoMapForReadMe() {
1398 try (Database db = Database.test()) {
1399 db.select(Person10.class)
1400 .get(Person10::name)
1401 .blockingForEach(DatabaseTest::println);
1402 }
1403 }
1404
1405 @Test(expected = QueryAnnotationMissingException.class)
1406 public void testAutoMapWithoutQueryInAnnotation() {
1407 try (Database db = db()) {
1408 db.select(Person.class);
1409 }
1410 }
1411
1412 @Test
1413 public void testSelectWithoutWhereClause() {
1414 try (Database db = db()) {
1415 Assert.assertEquals(3, (long) db.select("select name from person")
1416 .count()
1417 .blockingGet());
1418 }
1419 }
1420
1421 @Test
1422 public void testTuple3() {
1423 try (Database db = db()) {
1424 db
1425 .select("select name, score, name from person order by name")
1426 .getAs(String.class, Integer.class, String.class)
1427 .firstOrError()
1428 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1429 .assertComplete()
1430 .assertValue(Tuple3.create("FRED", 21, "FRED"));
1431 }
1432 }
1433
1434 @Test
1435 public void testTuple4() {
1436 try (Database db = db()) {
1437 db
1438 .select("select name, score, name, score from person order by name")
1439 .getAs(String.class, Integer.class, String.class, Integer.class)
1440 .firstOrError()
1441 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1442 .assertComplete()
1443 .assertValue(Tuple4.create("FRED", 21, "FRED", 21));
1444 }
1445 }
1446
1447 @Test
1448 public void testTuple5() {
1449 try (Database db = db()) {
1450 db
1451 .select("select name, score, name, score, name from person order by name")
1452 .getAs(String.class, Integer.class, String.class, Integer.class, String.class)
1453 .firstOrError()
1454 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1455 .assertComplete().assertValue(Tuple5.create("FRED", 21, "FRED", 21, "FRED"));
1456 }
1457 }
1458
1459 @Test
1460 public void testTuple6() {
1461 try (Database db = db()) {
1462 db
1463 .select("select name, score, name, score, name, score from person order by name")
1464 .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
1465 Integer.class)
1466 .firstOrError()
1467 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1468 .assertComplete()
1469 .assertValue(Tuple6.create("FRED", 21, "FRED", 21, "FRED", 21));
1470 }
1471 }
1472
1473 @Test
1474 public void testTuple7() {
1475 try (Database db = db()) {
1476 db
1477 .select("select name, score, name, score, name, score, name from person order by name")
1478 .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
1479 Integer.class, String.class)
1480 .firstOrError()
1481 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1482 .assertComplete()
1483 .assertValue(Tuple7.create("FRED", 21, "FRED", 21, "FRED", 21, "FRED"));
1484 }
1485 }
1486
1487 @Test
1488 public void testTupleN() {
1489 try (Database db = db()) {
1490 db
1491 .select("select name, score, name from person order by name")
1492 .getTupleN()
1493 .firstOrError().test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1494 .assertComplete()
1495 .assertValue(TupleN.create("FRED", 21, "FRED"));
1496 }
1497 }
1498
1499 @Test
1500 public void testTupleNWithClass() {
1501 try (Database db = db()) {
1502 db
1503 .select("select score a, score b from person order by name")
1504 .getTupleN(Integer.class)
1505 .firstOrError()
1506 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1507 .assertComplete()
1508 .assertValue(TupleN.create(21, 21));
1509 }
1510 }
1511
1512 @Test
1513 public void testTupleNWithClassInTransaction() {
1514 try (Database db = db()) {
1515 db
1516 .select("select score a, score b from person order by name")
1517 .transactedValuesOnly()
1518 .getTupleN(Integer.class)
1519 .map(x -> x.value())
1520 .firstOrError()
1521 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1522 .assertComplete()
1523 .assertValue(TupleN.create(21, 21));
1524 }
1525 }
1526
1527 @Test
1528 public void testHealthCheck() throws InterruptedException {
1529 AtomicBoolean once = new AtomicBoolean(true);
1530 testHealthCheck(c -> {
1531 log.debug("doing health check");
1532 return !once.compareAndSet(true, false);
1533 });
1534 }
1535
1536 @Test
1537 public void testHealthCheckThatThrows() throws InterruptedException {
1538 AtomicBoolean once = new AtomicBoolean(true);
1539 testHealthCheck(c -> {
1540 log.debug("doing health check");
1541 if (!once.compareAndSet(true, false))
1542 return true;
1543 else
1544 throw new RuntimeException("health check failed");
1545 });
1546 }
1547
1548 @Test
1549 public void testUpdateOneRow() {
1550 try (Database db = db()) {
1551 db.update("update person set score=20 where name='FRED'")
1552 .counts()
1553 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1554 .assertValue(1)
1555 .assertComplete();
1556 }
1557 }
1558
1559 @Test
1560 public void testUpdateThreeRows() {
1561 try (Database db = db()) {
1562 db.update("update person set score=20")
1563 .counts()
1564 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1565 .assertValue(3)
1566 .assertComplete();
1567 }
1568 }
1569
1570 @Test
1571 public void testUpdateWithParameter() {
1572 try (Database db = db()) {
1573 db.update("update person set score=20 where name=?")
1574 .parameter("FRED").counts()
1575 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1576 .assertValue(1)
1577 .assertComplete();
1578 }
1579 }
1580
1581 @Test
1582 public void testUpdateWithParameterTwoRuns() {
1583 try (Database db = db()) {
1584 db.update("update person set score=20 where name=?")
1585 .parameters("FRED", "JOSEPH").counts()
1586 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1587 .assertValues(1, 1)
1588 .assertComplete();
1589 }
1590 }
1591
1592 @Test
1593 public void testUpdateAllWithParameterFourRuns() {
1594 try (Database db = db()) {
1595 db.update("update person set score=?")
1596 .parameters(1, 2, 3, 4)
1597 .counts()
1598 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1599 .assertValues(3, 3, 3, 3)
1600 .assertComplete();
1601 }
1602 }
1603
1604 @Test
1605 public void testUpdateWithBatchSize2() {
1606 try (Database db = db()) {
1607 db.update("update person set score=?")
1608 .batchSize(2)
1609 .parameters(1, 2, 3, 4)
1610 .counts()
1611 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1612 .assertValues(3, 3, 3, 3)
1613 .assertComplete();
1614 }
1615 }
1616
1617 @Test
1618 public void testUpdateWithBatchSize3GreaterThanNumRecords() {
1619 try (Database db = db()) {
1620 db.update("update person set score=?")
1621 .batchSize(3)
1622 .parameters(1, 2, 3, 4)
1623 .counts()
1624 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1625 .assertValues(3, 3, 3, 3)
1626 .assertComplete();
1627 }
1628 }
1629
1630 @Test
1631 public void testInsert() {
1632 try (Database db = db()) {
1633 db.update("insert into person(name, score) values(?,?)")
1634 .parameters("DAVE", 12, "ANNE", 18)
1635 .counts()
1636 .test()
1637 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1638 .assertValues(1, 1)
1639 .assertComplete();
1640 List<Tuple2<String, Integer>> list = db.select("select name, score from person")
1641 .getAs(String.class, Integer.class)
1642 .toList()
1643 .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1644 .blockingGet();
1645 assertTrue(list.contains(Tuple2.create("DAVE", 12)));
1646 assertTrue(list.contains(Tuple2.create("ANNE", 18)));
1647 }
1648 }
1649
1650 @Test
1651 public void testReturnGeneratedKeys() {
1652 try (Database db = db()) {
1653
1654 db.update("insert into note(text) values(?)")
1655 .parameters("HI", "THERE")
1656 .returnGeneratedKeys()
1657 .getAs(Integer.class)
1658 .test()
1659 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1660 .assertValues(1, 2)
1661 .assertComplete();
1662
1663 db.update("insert into note(text) values(?)")
1664 .parameters("ME", "TOO")
1665 .returnGeneratedKeys()
1666 .getAs(Integer.class)
1667 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1668 .assertValues(3, 4)
1669 .assertComplete();
1670 }
1671 }
1672
1673 @Test
1674 public void testReturnGeneratedKeysDerby() {
1675 Database db = DatabaseCreator.createDerby(1);
1676
1677
1678 db.update("insert into note2(text) values(?)")
1679 .parameters("HI", "THERE")
1680 .returnGeneratedKeys()
1681 .getAs(Integer.class)
1682 .test()
1683 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1684 .assertNoErrors().assertValues(1, 3)
1685 .assertComplete();
1686
1687 db.update("insert into note2(text) values(?)")
1688 .parameters("ME", "TOO")
1689 .returnGeneratedKeys()
1690 .getAs(Integer.class)
1691 .test()
1692 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1693 .assertValues(5, 7)
1694 .assertComplete();
1695 }
1696
1697 @Test(expected = IllegalArgumentException.class)
1698 public void testReturnGeneratedKeysWithBatchSizeShouldThrow() {
1699 try (Database db = db()) {
1700
1701 db.update("insert into note(text) values(?)")
1702 .parameters("HI", "THERE")
1703 .batchSize(2)
1704 .returnGeneratedKeys();
1705 }
1706 }
1707
1708 @Test
1709 public void testTransactedReturnGeneratedKeys() {
1710 try (Database db = db()) {
1711
1712 db.update("insert into note(text) values(?)")
1713 .parameters("HI", "THERE")
1714 .transacted()
1715 .returnGeneratedKeys()
1716 .valuesOnly()
1717 .getAs(Integer.class)
1718 .test()
1719 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1720 .assertValues(1, 2)
1721 .assertComplete();
1722
1723 db.update("insert into note(text) values(?)")
1724 .parameters("ME", "TOO")
1725 .transacted()
1726 .returnGeneratedKeys()
1727 .valuesOnly()
1728 .getAs(Integer.class)
1729 .test()
1730 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1731 .assertValues(3, 4)
1732 .assertComplete();
1733 }
1734 }
1735
1736 @Test
1737 public void testTransactedReturnGeneratedKeys2() {
1738 try (Database db = db()) {
1739
1740 Flowable<Integer> a = db.update("insert into note(text) values(?)")
1741 .parameters("HI", "THERE")
1742 .transacted()
1743 .returnGeneratedKeys()
1744 .valuesOnly()
1745 .getAs(Integer.class);
1746
1747 db.update("insert into note(text) values(?)")
1748 .parameters("ME", "TOO")
1749 .transacted()
1750 .returnGeneratedKeys()
1751 .valuesOnly()
1752 .getAs(Integer.class)
1753 .startWith(a)
1754 .test()
1755 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1756 .assertValues(1, 2, 3, 4)
1757 .assertComplete();
1758 }
1759 }
1760
1761 @Test
1762 public void testUpdateWithinTransaction() {
1763 try (Database db = db()) {
1764 db
1765 .select("select name from person")
1766 .transactedValuesOnly()
1767 .getAs(String.class)
1768 .doOnNext(DatabaseTest::println)
1769 .flatMap(tx -> tx
1770 .update("update person set score=-1 where name=:name")
1771 .batchSize(1)
1772 .parameter("name", tx.value())
1773 .valuesOnly()
1774 .counts())
1775 .test()
1776 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1777 .assertValues(1, 1, 1)
1778 .assertComplete();
1779 }
1780 }
1781
1782 @Test
1783 public void testSelectDependsOnFlowable() {
1784 try (Database db = db()) {
1785 Flowable<Integer> a = db.update("update person set score=100 where name=?")
1786 .parameter("FRED")
1787 .counts();
1788 db.select("select score from person where name=?")
1789 .parameter("FRED")
1790 .dependsOn(a)
1791 .getAs(Integer.class)
1792 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1793 .assertValues(100)
1794 .assertComplete();
1795 }
1796 }
1797
1798 @Test
1799 public void testSelectDependsOnObservable() {
1800 try (Database db = db()) {
1801 Observable<Integer> a = db.update("update person set score=100 where name=?")
1802 .parameter("FRED")
1803 .counts().toObservable();
1804 db.select("select score from person where name=?")
1805 .parameter("FRED")
1806 .dependsOn(a)
1807 .getAs(Integer.class)
1808 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1809 .assertValues(100)
1810 .assertComplete();
1811 }
1812 }
1813
1814 @Test
1815 public void testSelectDependsOnOnSingle() {
1816 try (Database db = db()) {
1817 Single<Long> a = db.update("update person set score=100 where name=?")
1818 .parameter("FRED")
1819 .counts().count();
1820 db.select("select score from person where name=?")
1821 .parameter("FRED")
1822 .dependsOn(a)
1823 .getAs(Integer.class)
1824 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1825 .assertValues(100)
1826 .assertComplete();
1827 }
1828 }
1829
1830 @Test
1831 public void testSelectDependsOnCompletable() {
1832 try (Database db = db()) {
1833 Completable a = db.update("update person set score=100 where name=?")
1834 .parameter("FRED")
1835 .counts().ignoreElements();
1836 db.select("select score from person where name=?")
1837 .parameter("FRED")
1838 .dependsOn(a)
1839 .getAs(Integer.class)
1840 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1841 .assertValues(100)
1842 .assertComplete();
1843 }
1844 }
1845
1846 @Test
1847 public void testUpdateWithinTransactionBatchSize0() {
1848 try (Database db = db()) {
1849 db
1850 .select("select name from person")
1851 .transactedValuesOnly()
1852 .getAs(String.class)
1853 .doOnNext(DatabaseTest::println)
1854 .flatMap(tx -> tx
1855 .update("update person set score=-1 where name=:name")
1856 .batchSize(0)
1857 .parameter("name", tx.value())
1858 .valuesOnly()
1859 .counts())
1860 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1861 .assertValues(1, 1, 1)
1862 .assertComplete();
1863 }
1864 }
1865
1866 @Test
1867 public void testCreateBig() {
1868 big(5).select("select count(*) from person")
1869 .getAs(Integer.class)
1870 .test().awaitDone(20, TimeUnit.SECONDS)
1871 .assertValue(5163)
1872 .assertComplete();
1873 }
1874
1875 @Test
1876 public void testTxWithBig() {
1877 big(1)
1878 .select("select name from person")
1879 .transactedValuesOnly()
1880 .getAs(String.class)
1881 .flatMap(tx -> tx
1882 .update("update person set score=-1 where name=:name")
1883 .batchSize(1)
1884 .parameter("name", tx.value())
1885 .valuesOnly()
1886 .counts())
1887 .count()
1888 .test().awaitDone(20, TimeUnit.SECONDS)
1889 .assertValue((long) NAMES_COUNT_BIG)
1890 .assertComplete();
1891 }
1892
1893 @Test
1894 public void testTxWithBigInputBatchSize2000() {
1895 big(1)
1896 .select("select name from person")
1897 .transactedValuesOnly()
1898 .getAs(String.class)
1899 .flatMap(tx -> tx
1900 .update("update person set score=-1 where name=:name")
1901 .batchSize(2000)
1902 .parameter("name", tx.value())
1903 .valuesOnly()
1904 .counts())
1905 .count()
1906 .test().awaitDone(20, TimeUnit.SECONDS)
1907 .assertValue((long) NAMES_COUNT_BIG)
1908 .assertComplete();
1909 }
1910
1911 @Test
1912 public void testInsertNullClobAndReadClobAsString() {
1913 try (Database db = db()) {
1914 insertNullClob(db);
1915 db.select("select document from person_clob where name='FRED'")
1916 .getAsOptional(String.class)
1917 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1918 .assertValue(Optional.<String>empty())
1919 .assertComplete();
1920 }
1921 }
1922
1923 @Test
1924 public void testAutomapClobIssue32() {
1925 try (Database db = db()) {
1926 assertTrue(db.update("insert into person_clob(name, document) values(?, ? )")
1927 .parameters("fred", "hello there")
1928 .complete()
1929 .blockingAwait(TIMEOUT_SECONDS, TimeUnit.SECONDS));
1930 db.select("select * from person_clob")
1931 .autoMap(PersonClob.class)
1932 .map(pc -> {
1933 log.debug(pc.toString());
1934 return pc.document();
1935 })
1936 .test()
1937 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1938 .assertValueCount(1)
1939 .assertComplete();
1940 }
1941 }
1942
1943 private static interface PersonClob {
1944 @Column
1945 String name();
1946
1947 @Column
1948 String document();
1949 }
1950
1951 private static void insertNullClob(Database db) {
1952 db.update("insert into person_clob(name,document) values(?,?)")
1953 .parameters("FRED", Database.NULL_CLOB)
1954 .counts()
1955 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1956 .assertValue(1)
1957 .assertComplete();
1958 }
1959
1960 @Test
1961 public void testClobMethod() {
1962 assertEquals(Database.NULL_CLOB, Database.clob(null));
1963 }
1964
1965 @Test
1966 public void testBlobMethod() {
1967 assertEquals(Database.NULL_BLOB, Database.blob(null));
1968 }
1969
1970 @Test
1971 public void testClobMethodPresent() {
1972 assertEquals("a", Database.clob("a"));
1973 }
1974
1975 @Test
1976 public void testBlobMethodPresent() {
1977 byte[] b = new byte[1];
1978 assertEquals(b, Database.blob(b));
1979 }
1980
1981 @Test
1982 public void testDateOfBirthNullableForReadMe() {
1983 Database.test()
1984 .select("select date_of_birth from person where name='FRED'")
1985 .getAsOptional(Instant.class)
1986 .blockingForEach(DatabaseTest::println);
1987 }
1988
1989 @Test
1990 public void testInsertNullClobAndReadClobAsTuple2() {
1991 try (Database db = db()) {
1992 insertNullClob(db);
1993 db.select("select document, document from person_clob where name='FRED'")
1994 .getAs(String.class, String.class)
1995 .test()
1996 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1997 .assertValue(Tuple2.create(null, null))
1998 .assertComplete();
1999 }
2000 }
2001
2002 @Test
2003 public void testInsertClobAndReadClobAsString() {
2004 try (Database db = db()) {
2005 db.update("insert into person_clob(name,document) values(?,?)")
2006 .parameters("FRED", "some text here")
2007 .counts()
2008 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2009 .assertValue(1)
2010 .assertComplete();
2011 db.select("select document from person_clob where name='FRED'")
2012 .getAs(String.class)
2013 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2014
2015
2016
2017 .assertComplete();
2018 }
2019 }
2020
2021 @Test
2022 public void testInsertClobAndReadClobUsingReader() {
2023 try (Database db = db()) {
2024 db.update("insert into person_clob(name,document) values(?,?)")
2025 .parameters("FRED", "some text here")
2026 .counts()
2027 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2028 .assertValue(1)
2029 .assertComplete();
2030 db.select("select document from person_clob where name='FRED'")
2031 .getAs(Reader.class)
2032 .map(r -> read(r)).test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2033 .assertValue("some text here")
2034 .assertComplete();
2035 }
2036 }
2037
2038 @Test
2039 public void testInsertBlobAndReadBlobAsByteArray() {
2040 try (Database db = db()) {
2041 insertPersonBlob(db);
2042 db.select("select document from person_blob where name='FRED'")
2043 .getAs(byte[].class)
2044 .map(b -> new String(b))
2045 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2046 .assertValue("some text here")
2047 .assertComplete();
2048 }
2049 }
2050
2051 private static void insertPersonBlob(Database db) {
2052 byte[] bytes = "some text here".getBytes();
2053 db.update("insert into person_blob(name,document) values(?,?)")
2054 .parameters("FRED", bytes)
2055 .counts()
2056 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2057 .assertValue(1)
2058 .assertComplete();
2059 }
2060
2061 @Test
2062 public void testInsertBlobAndReadBlobAsInputStream() {
2063 try (Database db = db()) {
2064 byte[] bytes = "some text here".getBytes();
2065 db.update("insert into person_blob(name,document) values(?,?)")
2066 .parameters("FRED", new ByteArrayInputStream(bytes))
2067 .counts()
2068 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2069 .assertValue(1)
2070 .assertComplete();
2071 db.select("select document from person_blob where name='FRED'")
2072 .getAs(InputStream.class)
2073 .map(is -> read(is))
2074 .map(b -> new String(b))
2075 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2076 .assertValue("some text here")
2077 .assertComplete();
2078 }
2079 }
2080
2081 private static String read(Reader reader) throws IOException {
2082 StringBuffer s = new StringBuffer();
2083 char[] ch = new char[128];
2084 int n = 0;
2085 while ((n = reader.read(ch)) != -1) {
2086 s.append(ch, 0, n);
2087 }
2088 reader.close();
2089 return s.toString();
2090 }
2091
2092 private static byte[] read(InputStream is) throws IOException {
2093 ByteArrayOutputStream bytes = new ByteArrayOutputStream();
2094 byte[] b = new byte[128];
2095 int n = 0;
2096 while ((n = is.read(b)) != -1) {
2097 bytes.write(b, 0, n);
2098 }
2099 is.close();
2100 return bytes.toByteArray();
2101 }
2102
2103 private void testHealthCheck(Predicate<Connection> healthy) throws InterruptedException {
2104 TestScheduler scheduler = new TestScheduler();
2105
2106 NonBlockingConnectionPool pool = Pools
2107 .nonBlocking()
2108 .connectionProvider(DatabaseCreator.connectionProvider())
2109 .maxIdleTime(10, TimeUnit.MINUTES)
2110 .idleTimeBeforeHealthCheck(0, TimeUnit.MINUTES)
2111 .healthCheck(healthy)
2112 .scheduler(scheduler)
2113 .maxPoolSize(1)
2114 .build();
2115
2116 try (Database db = Database.from(pool)) {
2117 TestSubscriber<Integer> ts0 = db.select(
2118 "select score from person where name=?")
2119 .parameter("FRED")
2120 .getAs(Integer.class)
2121 .test();
2122 ts0.assertValueCount(0)
2123 .assertNotComplete();
2124 scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
2125 ts0.assertValueCount(1)
2126 .assertComplete();
2127 TestSubscriber<Integer> ts = db.select(
2128 "select score from person where name=?")
2129 .parameter("FRED")
2130 .getAs(Integer.class)
2131 .test()
2132 .assertValueCount(0);
2133 log.debug("done2");
2134 scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
2135 Thread.sleep(200);
2136 ts.assertValueCount(1);
2137 Thread.sleep(200);
2138 ts.assertValue(21)
2139 .assertComplete();
2140 }
2141 }
2142
2143 @Test
2144 public void testShutdownBeforeUse() {
2145 NonBlockingConnectionPool pool = Pools
2146 .nonBlocking()
2147 .connectionProvider(DatabaseCreator.connectionProvider())
2148 .scheduler(Schedulers.io())
2149 .maxPoolSize(1)
2150 .build();
2151 pool.close();
2152 Database.from(pool)
2153 .select("select score from person where name=?")
2154 .parameter("FRED")
2155 .getAs(Integer.class)
2156 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2157 .assertNoValues()
2158 .assertError(PoolClosedException.class);
2159 }
2160
2161 @Test
2162 public void testFewerColumnsMappedThanAvailable() {
2163 try (Database db = db()) {
2164 db.select("select name, score from person where name='FRED'")
2165 .getAs(String.class)
2166 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2167 .assertValues("FRED")
2168 .assertComplete();
2169 }
2170 }
2171
2172 @Test
2173 public void testMoreColumnsMappedThanAvailable() {
2174 try (Database db = db()) {
2175 db
2176 .select("select name, score from person where name='FRED'")
2177 .getAs(String.class, Integer.class, String.class)
2178 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2179 .assertNoValues()
2180 .assertError(MoreColumnsRequestedThanExistException.class);
2181 }
2182 }
2183
2184 @Test
2185 public void testSelectTimestamp() {
2186 try (Database db = db()) {
2187 db
2188 .select("select registered from person where name='FRED'")
2189 .getAs(Long.class)
2190 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2191 .assertValue(FRED_REGISTERED_TIME)
2192 .assertComplete();
2193 }
2194 }
2195
2196 @Test
2197 public void testSelectTimestampAsDate() {
2198 try (Database db = db()) {
2199 db
2200 .select("select registered from person where name='FRED'")
2201 .getAs(Date.class)
2202 .map(d -> d.getTime())
2203 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2204 .assertValue(FRED_REGISTERED_TIME)
2205 .assertComplete();
2206 }
2207 }
2208
2209 @Test
2210 public void testSelectTimestampAsInstant() {
2211 try (Database db = db()) {
2212 db
2213 .select("select registered from person where name='FRED'")
2214 .getAs(Instant.class)
2215 .map(d -> d.toEpochMilli())
2216 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2217 .assertValue(FRED_REGISTERED_TIME)
2218 .assertComplete();
2219 }
2220 }
2221
2222 @Test
2223 public void testUpdateCalendarParameter() {
2224 Calendar c = GregorianCalendar.from(ZonedDateTime.parse("2017-03-25T15:37Z"));
2225 try (Database db = db()) {
2226 db.update("update person set registered=?")
2227 .parameter(c)
2228 .counts()
2229 .test()
2230 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2231 .assertValue(3)
2232 .assertComplete();
2233 db.select("select registered from person")
2234 .getAs(Long.class)
2235 .firstOrError()
2236 .test()
2237 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2238 .assertValue(c.getTimeInMillis())
2239 .assertComplete();
2240 }
2241 }
2242
2243 @Test
2244 public void testUpdateTimeParameter() throws InterruptedException {
2245 try (Database db = db()) {
2246 Timestamp t = new Timestamp(1234);
2247 db.update("update person set registered=?")
2248 .parameter(t)
2249 .counts()
2250 .test()
2251 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2252 .assertValue(3)
2253 .assertComplete();
2254 db.select("select registered from person")
2255 .getAs(Long.class)
2256 .firstOrError()
2257 .test()
2258 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2259 .assertValue(1234L)
2260 .assertComplete();
2261 }
2262 }
2263
2264 @Test
2265 public void testUpdateTimestampParameter() {
2266 try (Database db = db()) {
2267 Timestamp t = new Timestamp(1234);
2268 db.update("update person set registered=?")
2269 .parameter(t)
2270 .counts()
2271 .test()
2272 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2273 .assertValue(3)
2274 .assertComplete();
2275 db.select("select registered from person")
2276 .getAs(Long.class)
2277 .firstOrError()
2278 .test()
2279 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2280 .assertValue(1234L)
2281 .assertComplete();
2282 }
2283 }
2284
2285 @Test
2286 public void testUpdateSqlDateParameter() {
2287 try (Database db = db()) {
2288 java.sql.Date t = new java.sql.Date(1234);
2289
2290 db.update("update person set registered=?")
2291 .parameter(t)
2292 .counts()
2293 .test()
2294 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2295 .assertValue(3)
2296 .assertComplete();
2297 db.select("select registered from person")
2298 .getAs(Long.class)
2299 .firstOrError()
2300 .test()
2301 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2302
2303 .assertValue(x -> Math.abs(x - 1234) <= TimeUnit.HOURS.toMillis(24))
2304 .assertComplete();
2305 }
2306 }
2307
2308 @Test
2309 public void testUpdateUtilDateParameter() {
2310 try (Database db = db()) {
2311 Date d = new Date(1234);
2312 db.update("update person set registered=?")
2313 .parameter(d)
2314 .counts()
2315 .test()
2316 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2317 .assertValue(3)
2318 .assertComplete();
2319 db.select("select registered from person")
2320 .getAs(Long.class)
2321 .firstOrError()
2322 .test()
2323 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2324 .assertValue(1234L)
2325 .assertComplete();
2326 }
2327 }
2328
2329 @Test
2330 public void testUpdateTimestampAsInstant() {
2331 try (Database db = db()) {
2332 db.update("update person set registered=? where name='FRED'")
2333 .parameter(Instant.ofEpochMilli(FRED_REGISTERED_TIME))
2334 .counts()
2335 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2336 .assertValue(1)
2337 .assertComplete();
2338 db.select("select registered from person where name='FRED'")
2339 .getAs(Long.class)
2340 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2341 .assertValue(FRED_REGISTERED_TIME)
2342 .assertComplete();
2343 }
2344 }
2345
2346 @Test
2347 public void testUpdateTimestampAsZonedDateTime() {
2348 try (Database db = db()) {
2349 db.update("update person set registered=? where name='FRED'")
2350 .parameter(ZonedDateTime.ofInstant(Instant.ofEpochMilli(FRED_REGISTERED_TIME),
2351 ZoneOffset.UTC.normalized()))
2352 .counts()
2353 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2354 .assertValue(1)
2355 .assertComplete();
2356 db.select("select registered from person where name='FRED'")
2357 .getAs(Long.class)
2358 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2359 .assertValue(FRED_REGISTERED_TIME)
2360 .assertComplete();
2361 }
2362 }
2363
2364 @Test
2365 public void testCompleteCompletes() {
2366 try (Database db = db(1)) {
2367 db
2368 .update("update person set score=-3 where name='FRED'")
2369 .complete()
2370 .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2371 .blockingAwait();
2372
2373 int score = db.select("select score from person where name='FRED'")
2374 .getAs(Integer.class)
2375 .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2376 .blockingFirst();
2377 assertEquals(-3, score);
2378 }
2379 }
2380
2381 @Test
2382 public void testComplete() throws InterruptedException {
2383 try (Database db = db(1)) {
2384 Completable a = db
2385 .update("update person set score=-3 where name='FRED'")
2386 .complete();
2387 db.update("update person set score=-4 where score = -3")
2388 .dependsOn(a)
2389 .counts()
2390 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2391 .assertValue(1)
2392 .assertComplete();
2393 }
2394 }
2395
2396 @Test
2397 public void testCountsOnlyInTransaction() {
2398 try (Database db = db()) {
2399 db.update("update person set score = -3")
2400 .transacted()
2401 .countsOnly()
2402 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2403 .assertValues(3)
2404 .assertComplete();
2405 }
2406 }
2407
2408 @Test
2409 public void testCountsInTransaction() {
2410 try (Database db = db()) {
2411 db.update("update person set score = -3")
2412 .transacted()
2413 .counts()
2414 .doOnNext(DatabaseTest::println)
2415 .toList()
2416 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2417 .assertValue(list -> list.get(0).isValue() && list.get(0).value() == 3
2418 && list.get(1).isComplete() && list.size() == 2)
2419 .assertComplete();
2420 }
2421 }
2422
2423 @Test
2424 public void testTx() throws InterruptedException {
2425 Database db = db(3);
2426 Flowable<Tx<?>> transaction = db
2427 .update("update person set score=-3 where name='FRED'")
2428 .transaction();
2429
2430 transaction
2431 .doOnCancel(() -> log.debug("disposing"))
2432 .doOnNext(DatabaseTest::println)
2433 .flatMap(tx -> {
2434 log.debug("flatmapping");
2435 return tx
2436 .update("update person set score = -4 where score = -3")
2437 .countsOnly()
2438 .doOnSubscribe(s -> log.debug("subscribed"))
2439 .doOnNext(num -> log.debug("num=" + num));
2440 })
2441 .test()
2442 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2443 .assertNoErrors()
2444 .assertValue(1)
2445 .assertComplete();
2446 }
2447
2448 @Test
2449 public void testTxAfterSelect() {
2450 Database db = db(3);
2451 Single<Tx<Integer>> transaction = db
2452 .select("select score from person where name='FRED'")
2453 .transactedValuesOnly()
2454 .getAs(Integer.class)
2455 .firstOrError();
2456
2457 transaction
2458 .doOnDispose(() -> log.debug("disposing"))
2459 .doOnSuccess(DatabaseTest::println)
2460 .flatMapPublisher(tx -> {
2461 log.debug("flatmapping");
2462 return tx
2463 .update("update person set score = -4 where score = ?")
2464 .parameter(tx.value())
2465 .countsOnly()
2466 .doOnSubscribe(s -> log.debug("subscribed"))
2467 .doOnNext(num -> log.debug("num=" + num));
2468 })
2469 .test().awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2470 .assertNoErrors()
2471 .assertValue(1)
2472 .assertComplete();
2473 }
2474
2475 @Test
2476 public void testUseTxOnComplete() {
2477 db(1)
2478 .select(Person10.class)
2479 .transacted()
2480 .get()
2481 .lastOrError()
2482 .map(tx -> tx.select("select count(*) from person")
2483 .count().blockingGet())
2484 .test()
2485 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2486 .assertError(CannotForkTransactedConnection.class)
2487 .assertValueCount(0);
2488 }
2489
2490 @Test
2491 public void testSingleFlatMap() {
2492 Single.just(1).flatMapPublisher(n -> Flowable.just(1)).test(1).assertValue(1)
2493 .assertComplete();
2494 }
2495
2496 @Test
2497 public void testAutomappedInstanceHasMeaningfulToStringMethod() {
2498 String s = Database.test()
2499 .select("select name, score from person where name=?")
2500 .parameterStream(Flowable.just("FRED"))
2501 .autoMap(Person2.class)
2502 .map(x -> x.toString())
2503 .blockingSingle();
2504 assertEquals("Person2[name=FRED, score=21]", s);
2505 }
2506
2507 @Test
2508 public void testAutomappedEquals() {
2509 boolean b = Database.test()
2510 .select("select name, score from person where name=?")
2511 .parameterStream(Flowable.just("FRED"))
2512 .autoMap(Person2.class)
2513 .map(x -> x.equals(x))
2514 .blockingSingle();
2515 assertTrue(b);
2516 }
2517
2518 @Test
2519 public void testAutomappedDoesNotEqualNull() {
2520 boolean b = Database.test()
2521 .select("select name, score from person where name=?")
2522 .parameterStream(Flowable.just("FRED"))
2523 .autoMap(Person2.class)
2524 .map(x -> x.equals(null))
2525 .blockingSingle();
2526 assertFalse(b);
2527 }
2528
2529 @Test
2530 public void testAutomappedDoesNotEqual() {
2531 boolean b = Database.test()
2532 .select("select name, score from person where name=?")
2533 .parameterStream(Flowable.just("FRED"))
2534 .autoMap(Person2.class)
2535 .map(x -> x.equals(new Object()))
2536 .blockingSingle();
2537 assertFalse(b);
2538 }
2539
2540 @Test
2541 public void testAutomappedHashCode() {
2542 Person2 p = Database.test()
2543 .select("select name, score from person where name=?")
2544 .parameterStream(Flowable.just("FRED"))
2545 .autoMap(Person2.class)
2546 .blockingSingle();
2547 assertTrue(p.hashCode() != 0);
2548 }
2549
2550 @Test
2551 public void testAutomappedWithParametersThatProvokesMoreThanOneQuery() {
2552 Database.test()
2553 .select("select name, score from person where name=?")
2554 .parameters("FRED", "FRED")
2555 .autoMap(Person2.class)
2556 .doOnNext(DatabaseTest::println)
2557 .test()
2558 .awaitDone(2000, TimeUnit.SECONDS)
2559 .assertNoErrors()
2560 .assertValueCount(2)
2561 .assertComplete();
2562 }
2563
2564 @Test
2565 public void testAutomappedObjectsEqualsAndHashCodeIsDistinctOnValues() {
2566 Database.test()
2567 .select("select name, score from person where name=?")
2568 .parameters("FRED", "FRED")
2569 .autoMap(Person2.class)
2570 .distinct()
2571 .doOnNext(DatabaseTest::println)
2572 .test()
2573 .awaitDone(2000, TimeUnit.SECONDS)
2574 .assertNoErrors()
2575 .assertValueCount(1)
2576 .assertComplete();
2577 }
2578
2579 @Test
2580 public void testAutomappedObjectsEqualsDifferentiatesDifferentInterfacesWithSameMethodNamesAndValues() {
2581 PersonDistinct1 p1 = Database.test()
2582 .select("select name, score from person where name=?")
2583 .parameters("FRED")
2584 .autoMap(PersonDistinct1.class)
2585 .blockingFirst();
2586
2587 PersonDistinct2 p2 = Database.test()
2588 .select("select name, score from person where name=?")
2589 .parameters("FRED")
2590 .autoMap(PersonDistinct2.class)
2591 .blockingFirst();
2592
2593 assertNotEquals(p1, p2);
2594 }
2595
2596 @Test
2597 public void testAutomappedObjectsWhenDefaultMethodInvoked() {
2598
2599 if (System.getProperty("java.version").startsWith("1.8.")) {
2600 PersonWithDefaultMethod p = Database.test()
2601 .select("select name, score from person where name=?")
2602 .parameters("FRED")
2603 .autoMap(PersonWithDefaultMethod.class)
2604 .blockingFirst();
2605 assertEquals("fred", p.nameLower());
2606 }
2607 }
2608
2609 @Test(expected = AutomappedInterfaceInaccessibleException.class)
2610 public void testAutomappedObjectsWhenDefaultMethodInvokedAndIsNonPublicThrows() {
2611 PersonWithDefaultMethodNonPublic p = Database.test()
2612 .select("select name, score from person where name=?")
2613 .parameters("FRED")
2614 .autoMap(PersonWithDefaultMethodNonPublic.class)
2615 .blockingFirst();
2616 p.nameLower();
2617 }
2618
2619 @Test
2620 public void testBlockingDatabase() {
2621 Database db = blocking();
2622 db.select("select score from person where name=?")
2623 .parameters("FRED", "JOSEPH")
2624 .getAs(Integer.class)
2625 .test()
2626 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2627 .assertNoErrors()
2628 .assertValues(21, 34)
2629 .assertComplete();
2630 }
2631
2632 @Test
2633 public void testBlockingDatabaseTransacted() {
2634 Database db = blocking();
2635 db.select("select score from person where name=?")
2636 .parameters("FRED", "JOSEPH")
2637 .transactedValuesOnly()
2638 .getAs(Integer.class)
2639 .map(x -> x.value())
2640 .test()
2641 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2642 .assertNoErrors()
2643 .assertValues(21, 34)
2644 .assertComplete();
2645 }
2646
2647 @Test
2648 public void testBlockingDatabaseTransactedNested() {
2649 Database db = blocking();
2650 db.select("select score from person where name=?")
2651 .parameters("FRED", "JOSEPH")
2652 .transactedValuesOnly()
2653 .getAs(Integer.class)
2654 .flatMap(tx -> tx.select("select name from person where score=?")
2655 .parameter(tx.value())
2656 .valuesOnly()
2657 .getAs(String.class))
2658 .test()
2659 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2660 .assertNoErrors()
2661 .assertValues("FRED", "JOSEPH")
2662 .assertComplete();
2663 }
2664
2665 @Test
2666 public void testUsingNormalJDBCApi() {
2667 Database db = db(1);
2668 db.apply(con -> {
2669 try (PreparedStatement stmt = con
2670 .prepareStatement("select count(*) from person where name='FRED'");
2671 ResultSet rs = stmt.executeQuery()) {
2672 rs.next();
2673 return rs.getInt(1);
2674 }
2675 })
2676 .test()
2677 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2678 .assertValue(1)
2679 .assertComplete();
2680
2681
2682 db.select("select count(*) from person where name='FRED'")
2683 .getAs(Integer.class)
2684 .test()
2685 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2686 .assertValue(1)
2687 .assertComplete();
2688 }
2689
2690 @Test
2691 public void testUsingNormalJDBCApiCompletable() {
2692 Database db = db(1);
2693 db.apply(con -> {
2694 try (PreparedStatement stmt = con
2695 .prepareStatement("select count(*) from person where name='FRED'");
2696 ResultSet rs = stmt.executeQuery()) {
2697 rs.next();
2698 }
2699 })
2700 .test()
2701 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2702 .assertComplete();
2703
2704
2705 db.select("select count(*) from person where name='FRED'")
2706 .getAs(Integer.class)
2707 .test()
2708 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2709 .assertValue(1)
2710 .assertComplete();
2711 }
2712
2713 @Test
2714 public void testCallableStatement() {
2715 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2716 assertTrue(db.apply(con -> {
2717 try (Statement stmt = con.createStatement()) {
2718 CallableStatement st = con.prepareCall("call getPersonCount(?, ?)");
2719 st.setInt(1, 0);
2720 st.registerOutParameter(2, Types.INTEGER);
2721 st.execute();
2722 assertEquals(2, st.getInt(2));
2723 }
2724 }).blockingAwait(TIMEOUT_SECONDS, TimeUnit.SECONDS));
2725 }
2726
2727 @Test
2728 public void testCallableStatementReturningResultSets() {
2729 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2730 db.apply(con -> {
2731 try (Statement stmt = con.createStatement()) {
2732 CallableStatement st = con.prepareCall("call in1out0rs2(?)");
2733 st.setInt(1, 0);
2734 st.execute();
2735 ResultSet rs1 = st.getResultSet();
2736 st.getMoreResults(Statement.KEEP_CURRENT_RESULT);
2737 ResultSet rs2 = st.getResultSet();
2738 rs1.next();
2739 assertEquals("FRED", rs1.getString(1));
2740 rs2.next();
2741 assertEquals("SARAH", rs2.getString(1));
2742 }
2743 }).blockingAwait(TIMEOUT_SECONDS, TimeUnit.SECONDS);
2744 }
2745
2746 @Test
2747 public void testCallableApiNoParameters() {
2748 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2749 db
2750 .call("call zero()")
2751 .once()
2752 .test()
2753 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2754 .assertComplete();
2755 }
2756
2757 @Test
2758 public void testCallableApiNoParametersTransacted() {
2759 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2760 db
2761 .call("call zero()")
2762 .transacted()
2763 .once()
2764 .test()
2765 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2766 .assertValueCount(1)
2767 .assertValue(x -> x.isComplete())
2768 .assertComplete();
2769 }
2770
2771 @Test
2772 public void testCallableApiOneInOutParameter() {
2773 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2774 db
2775 .call("call inout1(?)")
2776 .inOut(Type.INTEGER, Integer.class)
2777 .input(4)
2778 .test()
2779 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2780 .assertValue(5)
2781 .assertComplete();
2782 }
2783
2784 @Test
2785 public void testCallableApiOneInOutParameterTransacted() {
2786 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2787 db
2788 .call("call inout1(?)")
2789 .transacted()
2790 .inOut(Type.INTEGER, Integer.class)
2791 .input(4)
2792 .flatMap(Tx.flattenToValuesOnly())
2793 .test()
2794 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2795 .assertValue(5)
2796 .assertComplete();
2797 }
2798
2799 @Test
2800 public void testCallableApiTwoInOutParameters() {
2801 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2802 db
2803 .call("call inout2(?, ?)")
2804 .inOut(Type.INTEGER, Integer.class)
2805 .inOut(Type.INTEGER, Integer.class)
2806 .input(4, 10)
2807 .test()
2808 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2809 .assertValue(x -> x._1() == 5 && x._2() == 12)
2810 .assertComplete();
2811 }
2812
2813 @Test
2814 public void testCallableApiTwoInOutParametersTransacted() {
2815 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2816 db
2817 .call("call inout2(?, ?)")
2818 .transacted()
2819 .inOut(Type.INTEGER, Integer.class)
2820 .inOut(Type.INTEGER, Integer.class)
2821 .input(4, 10)
2822 .flatMap(Tx.flattenToValuesOnly())
2823 .test()
2824 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2825 .assertValue(x -> x._1() == 5 && x._2() == 12)
2826 .assertComplete();
2827 }
2828
2829 @Test
2830 public void testCallableApiThreeInOutParameters() {
2831 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2832 db
2833 .call("call inout3(?, ?, ?)")
2834 .inOut(Type.INTEGER, Integer.class)
2835 .inOut(Type.INTEGER, Integer.class)
2836 .inOut(Type.INTEGER, Integer.class)
2837 .input(4, 10, 13)
2838 .test()
2839 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2840 .assertValue(x -> x._1() == 5 && x._2() == 12 && x._3() == 16)
2841 .assertComplete();
2842 }
2843
2844 @Test
2845 public void testCallableApiThreeInOutParametersTransacted() {
2846 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2847 db
2848 .call("call inout3(?, ?, ?)")
2849 .transacted()
2850 .inOut(Type.INTEGER, Integer.class)
2851 .inOut(Type.INTEGER, Integer.class)
2852 .inOut(Type.INTEGER, Integer.class)
2853 .input(4, 10, 13)
2854 .flatMap(Tx.flattenToValuesOnly())
2855 .test()
2856 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2857 .assertValue(x -> x._1() == 5 && x._2() == 12 && x._3() == 16)
2858 .assertComplete();
2859 }
2860
2861 @Test
2862 public void testCallableApiReturningOneOutParameter() throws InterruptedException {
2863 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2864 db
2865 .call("call in1out1(?,?)")
2866 .in()
2867 .out(Type.INTEGER, Integer.class)
2868 .input(0, 10, 20)
2869 .test()
2870 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2871 .assertValues(0, 10, 20)
2872 .assertComplete();
2873 }
2874
2875 @Test
2876 public void testCallableApiReturningOneOutParameterTransacted() throws InterruptedException {
2877 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2878 db
2879 .call("call in1out1(?,?)")
2880 .transacted()
2881 .in()
2882 .out(Type.INTEGER, Integer.class)
2883 .input(0, 10, 20)
2884 .flatMap(Tx.flattenToValuesOnly())
2885 .test()
2886 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2887 .assertValues(0, 10, 20)
2888 .assertComplete();
2889 }
2890
2891 @Test
2892 public void testCallableApiReturningTwoOutParameters() throws InterruptedException {
2893 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2894 db
2895 .call("call in1out2(?,?,?)")
2896 .in()
2897 .out(Type.INTEGER, Integer.class)
2898 .out(Type.INTEGER, Integer.class)
2899 .input(0, 10, 20)
2900 .test()
2901 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2902 .assertValueCount(3)
2903 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1)
2904 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11)
2905 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21)
2906 .assertComplete();
2907
2908 db.call("call in1out2(?,?,?)").in().out(Type.INTEGER, Integer.class)
2909 .out(Type.INTEGER, Integer.class).input(0, 10, 20)
2910 .blockingForEach(DatabaseTest::println);
2911 }
2912
2913 @Test
2914 public void testCallableApiReturningTwoOutParametersTransacted() throws InterruptedException {
2915 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2916 db
2917 .call("call in1out2(?,?,?)")
2918 .transacted()
2919 .in()
2920 .out(Type.INTEGER, Integer.class)
2921 .out(Type.INTEGER, Integer.class)
2922 .input(0, 10, 20)
2923 .flatMap(Tx.flattenToValuesOnly())
2924 .test()
2925 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2926 .assertValueCount(3)
2927 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1)
2928 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11)
2929 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21)
2930 .assertComplete();
2931
2932 db.call("call in1out2(?,?,?)")
2933 .in()
2934 .out(Type.INTEGER, Integer.class)
2935 .out(Type.INTEGER, Integer.class)
2936 .input(0, 10, 20)
2937 .blockingForEach(DatabaseTest::println);
2938 }
2939
2940 @Test
2941 public void testCallableApiReturningThreeOutParameters() throws InterruptedException {
2942 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2943 db
2944 .call("call in1out3(?,?,?,?)")
2945 .in()
2946 .out(Type.INTEGER, Integer.class)
2947 .out(Type.INTEGER, Integer.class)
2948 .out(Type.INTEGER, Integer.class)
2949 .input(0, 10, 20)
2950 .test()
2951 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2952 .assertValueCount(3)
2953 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1 && x._3() == 2)
2954 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11 && x._3() == 12)
2955 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21 && x._3() == 22)
2956 .assertComplete();
2957 }
2958
2959 @Test
2960 public void testCallableApiReturningThreeOutParametersTransacted() throws InterruptedException {
2961 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2962 db
2963 .call("call in1out3(?,?,?,?)")
2964 .transacted()
2965 .in()
2966 .out(Type.INTEGER, Integer.class)
2967 .out(Type.INTEGER, Integer.class)
2968 .out(Type.INTEGER, Integer.class)
2969 .input(0, 10, 20)
2970 .flatMap(Tx.flattenToValuesOnly())
2971 .test()
2972 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2973 .assertValueCount(3)
2974 .assertValueAt(0, x -> x._1() == 0 && x._2() == 1 && x._3() == 2)
2975 .assertValueAt(1, x -> x._1() == 10 && x._2() == 11 && x._3() == 12)
2976 .assertValueAt(2, x -> x._1() == 20 && x._2() == 21 && x._3() == 22)
2977 .assertComplete();
2978 }
2979
2980 @Test
2981 public void testCallableApiReturningOneResultSet() throws InterruptedException {
2982 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
2983 db
2984 .call("call in0out0rs1()")
2985 .autoMap(Person2.class)
2986 .input(0, 10, 20)
2987 .doOnNext(x -> {
2988 assertTrue(x.outs().isEmpty());
2989 })
2990 .flatMap(x -> x.results())
2991 .test()
2992 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
2993 .assertValueAt(0, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
2994 .assertValueAt(1, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
2995 .assertValueAt(2, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
2996 .assertValueAt(3, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
2997 .assertValueAt(4, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
2998 .assertValueAt(5, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
2999 .assertValueCount(6)
3000 .assertComplete();
3001 }
3002
3003 @Test
3004 public void testCallableApiReturningOneResultSetTransacted() throws InterruptedException {
3005 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3006 db
3007 .call("call in0out0rs1()")
3008 .transacted()
3009 .autoMap(Person2.class)
3010 .input(0, 10, 20)
3011 .flatMap(Tx.flattenToValuesOnly()).doOnNext(x -> {
3012 assertTrue(x.outs().isEmpty());
3013 })
3014 .flatMap(x -> x.results())
3015 .test()
3016 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3017 .assertValueAt(0, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3018 .assertValueAt(1, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3019 .assertValueAt(2, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3020 .assertValueAt(3, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3021 .assertValueAt(4, p -> "FRED".equalsIgnoreCase(p.name()) && p.score() == 24)
3022 .assertValueAt(5, p -> "SARAH".equalsIgnoreCase(p.name()) && p.score() == 26)
3023 .assertValueCount(6)
3024 .assertComplete();
3025 }
3026
3027 @Test
3028 public void testCallableApiReturningTwoResultSetsWithAutoMap() throws InterruptedException {
3029 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3030 db
3031 .call("call in1out0rs2(?)")
3032 .in()
3033 .autoMap(Person2.class)
3034 .autoMap(Person2.class)
3035 .input(0, 10, 20)
3036 .doOnNext(x -> assertTrue(x.outs().isEmpty()))
3037 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name()))
3038 .test()
3039 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3040 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3041 "SARAHFRED")
3042 .assertComplete();
3043 }
3044
3045 @Test
3046 public void testCallableApiReturningTwoResultSetsWithAutoMapTransacted()
3047 throws InterruptedException {
3048 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3049 db
3050 .call("call in1out0rs2(?)")
3051 .transacted()
3052 .in()
3053 .autoMap(Person2.class)
3054 .autoMap(Person2.class)
3055 .input(0, 10, 20)
3056 .flatMap(Tx.flattenToValuesOnly())
3057 .doOnNext(x -> assertTrue(x.outs().isEmpty()))
3058 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name()))
3059 .test()
3060 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3061 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3062 "SARAHFRED")
3063 .assertComplete();
3064 }
3065
3066 @Test
3067 public void testCallableApiReturningTwoResultSetsWithGet() throws InterruptedException {
3068 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3069 db
3070 .call("call in1out0rs2(?)")
3071 .in()
3072 .getAs(String.class, Integer.class)
3073 .getAs(String.class, Integer.class)
3074 .input(0, 10, 20)
3075 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y._1() + z._1()))
3076 .test()
3077 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3078 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3079 "SARAHFRED")
3080 .assertComplete();
3081 }
3082
3083 @Test
3084 public void testCallableApiReturningTwoResultSetsWithGetTransacted()
3085 throws InterruptedException {
3086 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3087 db
3088 .call("call in1out0rs2(?)")
3089 .transacted()
3090 .in()
3091 .getAs(String.class, Integer.class)
3092 .getAs(String.class, Integer.class)
3093 .input(0, 10, 20)
3094 .flatMap(Tx.flattenToValuesOnly())
3095 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y._1() + z._1()))
3096 .test()
3097 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3098 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3099 "SARAHFRED")
3100 .assertComplete();
3101 }
3102
3103 @Test
3104 public void testCallableApiReturningTwoResultSetsSwitchOrder1() throws InterruptedException {
3105 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3106 db
3107 .call("call in1out0rs2(?)")
3108 .autoMap(Person2.class)
3109 .in()
3110 .autoMap(Person2.class)
3111 .input(0, 10, 20)
3112 .doOnNext(x -> assertTrue(x.outs().isEmpty()))
3113 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name()))
3114 .test()
3115 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3116 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3117 "SARAHFRED")
3118 .assertComplete();
3119 }
3120
3121 @Test
3122 public void testCallableApiReturningTwoResultSetsSwitchOrder1Transacted()
3123 throws InterruptedException {
3124 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3125 db
3126 .call("call in1out0rs2(?)")
3127 .transacted()
3128 .autoMap(Person2.class)
3129 .in()
3130 .autoMap(Person2.class)
3131 .input(0, 10, 20)
3132 .flatMap(Tx.flattenToValuesOnly())
3133 .doOnNext(x -> assertTrue(x.outs().isEmpty()))
3134 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name()))
3135 .test()
3136 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3137 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3138 "SARAHFRED")
3139 .assertComplete();
3140 }
3141
3142 @Test
3143 public void testCallableApiReturningTwoResultSetsSwitchOrder2() throws InterruptedException {
3144 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3145 db
3146 .call("call in1out0rs2(?)")
3147 .autoMap(Person2.class)
3148 .autoMap(Person2.class)
3149 .in()
3150 .input(0, 10, 20)
3151 .doOnNext(x -> assertTrue(x.outs().isEmpty()))
3152 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name()))
3153 .test()
3154 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3155 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3156 "SARAHFRED")
3157 .assertComplete();
3158 }
3159
3160 @Test
3161 public void testCallableApiReturningTwoResultSetsSwitchOrder2Transacted()
3162 throws InterruptedException {
3163 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3164 db
3165 .call("call in1out0rs2(?)")
3166 .transacted()
3167 .autoMap(Person2.class)
3168 .autoMap(Person2.class)
3169 .in()
3170 .input(0, 10, 20)
3171 .flatMap(Tx.flattenToValuesOnly())
3172 .doOnNext(x -> assertTrue(x.outs().isEmpty()))
3173 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name()))
3174 .test()
3175 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3176 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED", "FREDSARAH",
3177 "SARAHFRED")
3178 .assertComplete();
3179 }
3180
3181 @Test
3182 public void testCallableApiReturningTwoOutputThreeResultSets() throws InterruptedException {
3183 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3184 db
3185 .call("call in0out2rs3(?, ?)")
3186 .out(Type.INTEGER, Integer.class)
3187 .out(Type.INTEGER, Integer.class)
3188 .autoMap(Person2.class)
3189 .autoMap(Person2.class)
3190 .autoMap(Person2.class)
3191 .input(0, 10, 20)
3192 .doOnNext(x -> {
3193 assertEquals(2, x.outs().size());
3194 assertEquals(1, x.outs().get(0));
3195 assertEquals(2, x.outs().get(1));
3196 })
3197 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())
3198 .zipWith(x.results3(), (y, z) -> y + z.name()))
3199 .test()
3200 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3201 .assertNoErrors()
3202 .assertValues("FREDSARAHFRED", "SARAHFREDSARAH", "FREDSARAHFRED", "SARAHFREDSARAH",
3203 "FREDSARAHFRED", "SARAHFREDSARAH")
3204 .assertComplete();
3205 }
3206
3207 @Test
3208 public void testCallableApiReturningTwoOutputThreeResultSetsTransacted()
3209 throws InterruptedException {
3210 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3211 db
3212 .call("call in0out2rs3(?, ?)")
3213 .transacted()
3214 .out(Type.INTEGER, Integer.class)
3215 .out(Type.INTEGER, Integer.class)
3216 .autoMap(Person2.class)
3217 .autoMap(Person2.class)
3218 .autoMap(Person2.class)
3219 .input(0, 10, 20)
3220 .flatMap(Tx.flattenToValuesOnly())
3221 .doOnNext(x -> {
3222 assertEquals(2, x.outs().size());
3223 assertEquals(1, x.outs().get(0));
3224 assertEquals(2, x.outs().get(1));
3225 })
3226 .flatMap(x -> x.results1().zipWith(x.results2(), (y, z) -> y.name() + z.name())
3227 .zipWith(x.results3(), (y, z) -> y + z.name()))
3228 .test()
3229 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3230 .assertNoErrors()
3231 .assertValues("FREDSARAHFRED", "SARAHFREDSARAH", "FREDSARAHFRED", "SARAHFREDSARAH",
3232 "FREDSARAHFRED", "SARAHFREDSARAH")
3233 .assertComplete();
3234 }
3235
3236 @Test
3237 public void testCallableApiReturningTenOutParameters() {
3238 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3239 db
3240 .call("call out10(?,?,?,?,?,?,?,?,?,?)")
3241 .out(Type.INTEGER, Integer.class)
3242 .out(Type.INTEGER, Integer.class)
3243 .out(Type.INTEGER, Integer.class)
3244 .out(Type.INTEGER, Integer.class)
3245 .out(Type.INTEGER, Integer.class)
3246 .out(Type.INTEGER, Integer.class)
3247 .out(Type.INTEGER, Integer.class)
3248 .out(Type.INTEGER, Integer.class)
3249 .out(Type.INTEGER, Integer.class)
3250 .out(Type.INTEGER, Integer.class)
3251 .input(0, 10)
3252 .test()
3253 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3254 .assertNoErrors()
3255 .assertValueCount(2)
3256 .assertComplete();
3257 }
3258
3259 @Test
3260 public void testCallableApiReturningTenOutParametersTransacted() {
3261 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3262 db
3263 .call("call out10(?,?,?,?,?,?,?,?,?,?)")
3264 .transacted()
3265 .out(Type.INTEGER, Integer.class)
3266 .out(Type.INTEGER, Integer.class)
3267 .out(Type.INTEGER, Integer.class)
3268 .out(Type.INTEGER, Integer.class)
3269 .out(Type.INTEGER, Integer.class)
3270 .out(Type.INTEGER, Integer.class)
3271 .out(Type.INTEGER, Integer.class)
3272 .out(Type.INTEGER, Integer.class)
3273 .out(Type.INTEGER, Integer.class)
3274 .out(Type.INTEGER, Integer.class)
3275 .input(0, 10)
3276 .flatMap(Tx.flattenToValuesOnly())
3277 .test()
3278 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3279 .assertNoErrors()
3280 .assertValueCount(2)
3281 .assertComplete();
3282 }
3283
3284 @Test
3285 public void testCallableApiReturningTenResultSets() {
3286 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3287 db
3288 .call("call rs10()")
3289 .autoMap(Person2.class)
3290 .autoMap(Person2.class)
3291 .autoMap(Person2.class)
3292 .autoMap(Person2.class)
3293 .autoMap(Person2.class)
3294 .autoMap(Person2.class)
3295 .autoMap(Person2.class)
3296 .autoMap(Person2.class)
3297 .autoMap(Person2.class)
3298 .autoMap(Person2.class)
3299 .input(0, 10)
3300 .doOnNext(x -> {
3301 assertEquals(0, x.outs().size());
3302 assertEquals(10, x.results().size());
3303 })
3304
3305 .flatMap(x -> x.results(0)
3306 .zipWith(x.results(9),
3307 (y, z) -> ((Person2) y).name() + ((Person2) z).name()))
3308 .test()
3309 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3310 .assertNoErrors()
3311 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED")
3312 .assertComplete();
3313 }
3314
3315 @Test
3316 public void testCallableApiReturningTenResultSetsTransacted() {
3317 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3318 db
3319 .call("call rs10()")
3320 .transacted()
3321 .autoMap(Person2.class)
3322 .autoMap(Person2.class)
3323 .autoMap(Person2.class)
3324 .autoMap(Person2.class)
3325 .autoMap(Person2.class)
3326 .autoMap(Person2.class)
3327 .autoMap(Person2.class)
3328 .autoMap(Person2.class)
3329 .autoMap(Person2.class)
3330 .autoMap(Person2.class)
3331 .input(0, 10)
3332 .flatMap(Tx.flattenToValuesOnly())
3333 .doOnNext(x -> {
3334 assertEquals(0, x.outs().size());
3335 assertEquals(10, x.results().size());
3336 })
3337
3338 .flatMap(x -> x.results(0)
3339 .zipWith(x.results(9),
3340 (y, z) -> ((Person2) y).name() + ((Person2) z).name()))
3341 .test()
3342 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3343 .assertNoErrors()
3344 .assertValues("FREDSARAH", "SARAHFRED", "FREDSARAH", "SARAHFRED")
3345 .assertComplete();
3346 }
3347
3348 @Test
3349 public void testCallableApiReturningOneResultSetGetAs() throws InterruptedException {
3350 Database db = DatabaseCreator.createDerbyWithStoredProcs(1);
3351 db
3352 .call("call in0out0rs1()")
3353 .getAs(String.class, Integer.class)
3354 .input(1)
3355 .flatMap(x -> x.results())
3356 .test()
3357 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3358 .assertValueAt(0, p -> "FRED".equalsIgnoreCase(p._1()) && p._2() == 24)
3359 .assertValueAt(1, p -> "SARAH".equalsIgnoreCase(p._1()) && p._2() == 26)
3360 .assertComplete();
3361 }
3362
3363 @Test
3364 public void testH2InClauseWithoutSetArray() {
3365 db().apply(con -> {
3366 try (PreparedStatement ps = con
3367 .prepareStatement("select count(*) from person where name in (?, ?)")) {
3368 ps.setString(1, "FRED");
3369 ps.setString(2, "JOSEPH");
3370 ResultSet rs = ps.executeQuery();
3371 rs.next();
3372 return rs.getInt(1);
3373 }
3374 }).test()
3375 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS).assertComplete()
3376 .assertValue(2);
3377 }
3378
3379 @Test
3380 @Ignore
3381 public void testH2InClauseWithSetArray() {
3382 db().apply(con -> {
3383 try (PreparedStatement ps = con
3384 .prepareStatement("select count(*) from person where name in (?)")) {
3385 ps.setArray(1, con.createArrayOf("VARCHAR", new String[] {"FRED", "JOSEPH"}));
3386 ResultSet rs = ps.executeQuery();
3387 rs.next();
3388 return rs.getInt(1);
3389 }
3390 }).test()
3391 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS).assertComplete()
3392 .assertValue(2);
3393 }
3394
3395 @Test
3396 public void testUpdateTxPerformed() {
3397 Database db = db(1);
3398 db.update("update person set score = 1000")
3399 .transacted()
3400 .counts()
3401 .test()
3402 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3403 .assertComplete()
3404 .assertValueCount(2);
3405
3406 db.select("select count(*) from person where score=1000")
3407 .getAs(Integer.class)
3408 .test()
3409 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3410 .assertComplete()
3411 .assertValue(3);
3412
3413 }
3414
3415 @Test
3416 public void testIssue20AutoCommitEnabledAndConnectionThrowsOnCommit() {
3417 ConnectionProvider cp = DatabaseCreator.connectionProvider();
3418 Database db = Database.fromBlocking(new ConnectionProvider() {
3419
3420 @Override
3421 public Connection get() {
3422 Connection c = cp.get();
3423 try {
3424 c.setAutoCommit(true);
3425 } catch (SQLException e) {
3426 throw new SQLRuntimeException(e);
3427 }
3428 return new DelegatedConnection() {
3429
3430 @Override
3431 public Connection con() {
3432 return c;
3433 }
3434
3435 @Override
3436 public void commit() throws SQLException {
3437 log.debug("COMMITTING");
3438 if (this.getAutoCommit()) {
3439 throw new SQLException("cannot commit when autoCommit is true");
3440 } else {
3441 con().commit();
3442 }
3443 }
3444
3445 };
3446 }
3447
3448 @Override
3449 public void close() {
3450
3451 }
3452 });
3453 db.update("insert into note(text) values(?)")
3454 .parameters("HI", "THERE")
3455 .returnGeneratedKeys()
3456 .getAs(Integer.class)
3457 .test()
3458 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3459 .assertValues(1, 2)
3460 .assertComplete();
3461 }
3462
3463 @Test
3464 public void testMapInTransactionIssue35() {
3465 Database.test()
3466 .select(Person10.class)
3467 .transacted()
3468 .valuesOnly()
3469 .get()
3470 .map(p -> p.name())
3471 .blockingForEach(log::debug);
3472 }
3473
3474 private static final class Plugins {
3475
3476 private static final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
3477
3478 static void reset() {
3479 list.clear();
3480 RxJavaPlugins.setErrorHandler(e -> list.add(e));
3481 }
3482
3483 static Throwable getSingleError() {
3484 assertEquals(1, list.size());
3485 RxJavaPlugins.reset();
3486 return list.get(0);
3487 }
3488 }
3489
3490 @Test
3491 public void testIssue27ConnectionErrorReportedToRxJavaPlugins() {
3492 try (Database db = Database.nonBlocking()
3493 .url("jdbc:driverdoesnotexist://doesnotexist:1527/notThere").build()) {
3494 Plugins.reset();
3495 db.select("select count(*) from person")
3496 .getAs(Long.class)
3497 .test()
3498 .awaitDone(TIMEOUT_SECONDS / 5, TimeUnit.SECONDS)
3499 .assertNoValues()
3500 .assertNotComplete()
3501 .assertNoErrors()
3502 .cancel();
3503 Throwable e = Plugins.getSingleError();
3504 assertTrue(e instanceof UndeliverableException);
3505 assertTrue(e.getMessage().toLowerCase().contains("no suitable driver"));
3506 }
3507 }
3508
3509 @Test
3510 public void testAutoMapInTransactionIssue35() {
3511 try (Database db = Database.test()) {
3512 db.select(Person10.class)
3513 .transacted()
3514 .valuesOnly()
3515 .get(x -> x.name())
3516 .sorted()
3517 .test()
3518 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3519 .assertValues("FRED", "JOSEPH", "MARMADUKE")
3520 .assertComplete();
3521 }
3522 }
3523
3524 @Test
3525 @Ignore
3526 public void testAutoMapToEnum() {
3527 try (Database db = Database.test()) {
3528 db.select(PersonWithEnum.class)
3529 .get(x -> x.gender())
3530 .test()
3531 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3532 .assertValues(Gender.MALE, Gender.FEMALE, Gender.MALE)
3533 .assertComplete();
3534 }
3535 }
3536
3537 @Test
3538 public void testClosureOfTransactedSelectsIssue46() {
3539 try (Database db = db()) {
3540 for (int i = 0; i < 10; i++) {
3541 db
3542 .select("select name, score from person")
3543 .transacted()
3544 .valuesOnly()
3545 .autoMap(Person2.class)
3546 .doOnNext(x -> log.debug(x.toString()))
3547 .test()
3548 .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
3549 .assertValueCount(3)
3550 .assertComplete();
3551 }
3552 }
3553 }
3554
3555 public enum Gender {
3556 MALE("M"), FEMALE("F"), OTHER("O");
3557
3558 private final String code;
3559
3560 private Gender(String code) {
3561 this.code = code;
3562 }
3563
3564 public static Gender fromCode(String code) {
3565 if (code == null) {
3566 return null;
3567 }
3568 for (Gender g : Gender.values()) {
3569 if (g.code.equals(code)) {
3570 return g;
3571 }
3572 }
3573 return OTHER;
3574 }
3575 }
3576
3577 @Query("select name, gender from person order by name")
3578 public interface PersonWithEnum {
3579 @Column
3580 String name();
3581
3582 @Column("gender")
3583 String genderCharacter();
3584
3585 default Gender gender() {
3586 return Gender.fromCode(genderCharacter());
3587 }
3588 }
3589
3590 public interface PersonWithDefaultMethod {
3591 @Column
3592 String name();
3593
3594 @Column
3595 int score();
3596
3597 public default String nameLower() {
3598 return name().toLowerCase();
3599 }
3600 }
3601
3602 interface PersonWithDefaultMethodNonPublic {
3603 @Column
3604 String name();
3605
3606 @Column
3607 int score();
3608
3609 default String nameLower() {
3610 return name().toLowerCase();
3611 }
3612 }
3613
3614 interface PersonDistinct1 {
3615 @Column
3616 String name();
3617
3618 @Column
3619 int score();
3620
3621 }
3622
3623 interface PersonDistinct2 {
3624 @Column
3625 String name();
3626
3627 @Column
3628 int score();
3629
3630 }
3631
3632 interface Score {
3633 @Column
3634 int score();
3635 }
3636
3637 interface Person {
3638 @Column
3639 String name();
3640 }
3641
3642 interface Person2 {
3643 @Column
3644 String name();
3645
3646 @Column
3647 int score();
3648 }
3649
3650 interface Person3 {
3651 @Column("name")
3652 String fullName();
3653
3654 @Column("score")
3655 int examScore();
3656 }
3657
3658 interface Person4 {
3659 @Column("namez")
3660 String fullName();
3661
3662 @Column("score")
3663 int examScore();
3664 }
3665
3666 interface Person5 {
3667 @Index(1)
3668 String fullName();
3669
3670 @Index(2)
3671 int examScore();
3672 }
3673
3674 interface Person6 {
3675 @Index(1)
3676 String fullName();
3677
3678 @Index(3)
3679 int examScore();
3680 }
3681
3682 interface Person7 {
3683 @Index(1)
3684 String fullName();
3685
3686 @Index(0)
3687 int examScore();
3688 }
3689
3690 interface Person8 {
3691 @Column
3692 int name();
3693 }
3694
3695 interface Person9 {
3696 @Column
3697 String name();
3698
3699 @Index(2)
3700 int score();
3701 }
3702
3703 interface PersonNoAnnotation {
3704 String name();
3705 }
3706
3707 @Query("select name, score from person order by name")
3708 interface Person10 {
3709
3710 @Column
3711 String name();
3712
3713 @Column
3714 int score();
3715 }
3716
3717 }