View Javadoc
1   package com.github.davidmoten.rx.jdbc;
2   
3   import static com.github.davidmoten.rx.Functions.constant;
4   import static com.github.davidmoten.rx.RxUtil.log;
5   import static com.github.davidmoten.rx.RxUtil.toEmpty;
6   import static com.github.davidmoten.rx.jdbc.DatabaseCreator.connectionProvider;
7   import static com.github.davidmoten.rx.jdbc.DatabaseCreator.createDatabase;
8   import static com.github.davidmoten.rx.jdbc.DatabaseCreator.nextUrl;
9   import static com.github.davidmoten.rx.jdbc.TestingUtil.countDown;
10  import static java.util.Arrays.asList;
11  import static org.easymock.EasyMock.createMock;
12  import static org.junit.Assert.assertEquals;
13  import static org.junit.Assert.assertNull;
14  import static org.junit.Assert.assertTrue;
15  import static rx.Observable.just;
16  
17  import java.io.IOException;
18  import java.io.Reader;
19  import java.sql.Connection;
20  import java.sql.DriverManager;
21  import java.sql.PreparedStatement;
22  import java.sql.ResultSet;
23  import java.sql.SQLException;
24  import java.sql.Timestamp;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.Calendar;
28  import java.util.Collections;
29  import java.util.Date;
30  import java.util.HashMap;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Set;
34  import java.util.TimeZone;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  
39  import org.easymock.EasyMock;
40  import org.junit.Test;
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  import rx.Observable;
45  import rx.Observable.Operator;
46  import rx.Observer;
47  import rx.functions.Action0;
48  import rx.functions.Action1;
49  import rx.functions.Func1;
50  import rx.observables.MathObservable;
51  
52  import com.github.davidmoten.rx.RxUtil;
53  import com.github.davidmoten.rx.jdbc.annotations.Column;
54  import com.github.davidmoten.rx.jdbc.annotations.Index;
55  import com.github.davidmoten.rx.jdbc.annotations.Query;
56  import com.github.davidmoten.rx.jdbc.exceptions.TransactionAlreadyOpenException;
57  import com.github.davidmoten.rx.jdbc.tuple.Tuple2;
58  import com.github.davidmoten.rx.jdbc.tuple.Tuple3;
59  import com.github.davidmoten.rx.jdbc.tuple.Tuple4;
60  import com.github.davidmoten.rx.jdbc.tuple.Tuple5;
61  import com.github.davidmoten.rx.jdbc.tuple.Tuple6;
62  import com.github.davidmoten.rx.jdbc.tuple.Tuple7;
63  import com.github.davidmoten.rx.jdbc.tuple.TupleN;
64  import com.zaxxer.hikari.HikariDataSource;
65  
66  public abstract class DatabaseTestBase {
67  
68      /**
69       * <p>
70       * Timeout used for latch.await() and similar. If too short then short
71       * machine lockups on build server (which is known to happen on CloudBees
72       * Jenkins infrastructure) will cause undesired test failures.
73       * </p>
74       * 
75       * <p>
76       * While your test is still failing use a lower value of course so that you
77       * get rapid turnaround. However, once passing switch the timeout to this
78       * standard timeout value.
79       * </p>
80       */
81      private static final int TIMEOUT_SECONDS = 3;
82  
83      private static final Logger log = LoggerFactory.getLogger(DatabaseTestBase.class);
84  
85      private final boolean async;
86  
87      public DatabaseTestBase(boolean async) {
88          this.async = async;
89      }
90  
91      Database db() {
92          if (async)
93              return DatabaseCreator.db().asynchronous();
94          else
95              return DatabaseCreator.db();
96      }
97  
98      @Test
99      public void testSimpleExample() {
100         Observable<String> names = db().select("select name from person order by name").getAs(
101                 String.class);
102         // convert the names to a list for unit test
103         List<String> list = names.toList().toBlocking().single();
104         log.debug("list=" + list);
105         assertEquals(asList("FRED", "JOSEPH", "MARMADUKE"), list);
106     }
107 
108     @Test
109     public void testCountQuery() {
110         int count = db()
111         // select names
112                 .select("select name from person where name >?")
113                 // set name parameter
114                 .parameter("ALEX")
115                 // count results
116                 .count()
117                 // get count
118                 .first()
119                 // block till finished
120                 .toBlocking().single();
121         assertEquals(3, count);
122     }
123 
124     @Test
125     public void testTransactionUsingCount() {
126         Database db = db();
127         Func1<? super Integer, Boolean> isZero = new Func1<Integer, Boolean>() {
128             @Override
129             public Boolean call(Integer t1) {
130                 return t1 == 0;
131             }
132         };
133         Observable<Integer> existingRows = db
134         // select names
135                 .select("select name from person where name=?")
136                 // set name parameter
137                 .parameter("FRED")
138                 // is part of transaction
139                 .dependsOn(db.beginTransaction())
140                 // get result count
141                 .count()
142                 // return empty if count = 0
143                 .filter(isZero);
144         Observable<Integer> update = db
145         // insert record if does not exist
146                 .update("insert into person(name,score) values(?,0)")
147                 // get parameters from last query
148                 .parameters(existingRows.map(constant("FRED")))
149                 // return num rows affected
150                 .count();
151 
152         boolean committed = db.commit(update).toBlocking().single();
153         assertTrue(committed);
154     }
155 
156     @Test
157     public void testTransactionOnCommit() {
158         Database db = db();
159         Observable<Boolean> begin = db.beginTransaction();
160         Observable<Integer> updateCount = db
161         // set everyones score to 99
162                 .update("update person set score=?")
163                 // is within transaction
164                 .dependsOn(begin)
165                 // new score
166                 .parameter(99)
167                 // execute
168                 .count();
169         Observable<Boolean> commit = db.commit(updateCount);
170         long count = db.select("select count(*) from person where score=?")
171         // set score
172                 .parameter(99)
173                 // depends on
174                 .dependsOn(commit)
175                 // return as Long
176                 .getAs(Long.class)
177                 // log
178                 .doOnEach(RxUtil.log())
179                 // get answer
180                 .toBlocking().single();
181         assertEquals(3, count);
182     }
183 
184     @Test
185     public void testSelectErrorResetsTransactionContextInDatabaseClass() {
186         Database db = db();
187         Observable<Integer> select = db
188         // select names
189                 .select("select namez from person where name=?")
190                 // set name parameter
191                 .parameter("FRED")
192                 // is part of transaction
193                 .dependsOn(db.beginTransaction())
194                 // get result count
195                 .count();
196         final AtomicBoolean transactionClosed = new AtomicBoolean(true);
197         db.commit(select).doOnError(new Action1<Throwable>() {
198             @Override
199             public void call(Throwable t) {
200                 System.out.println(t.getMessage());
201                 if (t instanceof TransactionAlreadyOpenException)
202                     transactionClosed.set(false);
203             }
204         }).retry(1).subscribe(ignore());
205         assertTrue(transactionClosed.get());
206     }
207 
208     @Test
209     public void testUpdateErrorResetsTransactionContextInDatabaseClass() {
210         Database db = db();
211         Observable<Integer> update = db
212         // select names
213                 .update("zzz")
214                 // set name parameter
215                 .parameter("FRED")
216                 // is part of transaction
217                 .dependsOn(db.beginTransaction())
218                 // get result count
219                 .count();
220         final AtomicBoolean transactionClosed = new AtomicBoolean(true);
221         db.commit(update).doOnError(new Action1<Throwable>() {
222             @Override
223             public void call(Throwable t) {
224                 System.out.println(t.getMessage());
225                 if (t instanceof TransactionAlreadyOpenException)
226                     transactionClosed.set(false);
227             }
228         }).retry(1).subscribe(ignore());
229         assertTrue(transactionClosed.get());
230     }
231 
232     private static <T> Observer<T> ignore() {
233         return new Observer<T>() {
234 
235             @Override
236             public void onCompleted() {
237 
238             }
239 
240             @Override
241             public void onError(Throwable e) {
242 
243             }
244 
245             @Override
246             public void onNext(T t) {
247 
248             }
249         };
250     }
251 
252     @Test
253     public void testPushEmptyList() {
254         Database db = db();
255         Observable<Integer> rowsAffected = Observable
256         // generate two integers
257                 .range(1, 2)
258                 // replace the integers with empty lists
259                 .map(toEmpty())
260                 // execute the update
261                 .lift(db.update("update person set score = score + 1").parameterListOperator())
262                 // flatten
263                 .lift(RxUtil.<Integer> flatten())
264                 // total the affected records
265                 .lift(SUM_INTEGER);
266         assertIs(6, rowsAffected);
267     }
268 
269     @Test
270     public void testRunScript() {
271         Observable<String> commands = just("create table temp1(id integer)", "drop table temp1");
272         db().run(commands).count().toBlocking().single();
273     }
274 
275     @Test
276     public void testTransactionOnCommitDoesntOccurUnlessSubscribedTo() {
277         Database db = db();
278         Observable<Boolean> begin = db.beginTransaction();
279         Observable<Integer> u = db.update("update person set score=?").dependsOn(begin)
280                 .parameter(99).count();
281         db.commit(u);
282         // note that last transaction was not listed as a dependency of the next
283         // query
284         long count = db.select("select count(*) from person where score=?").parameter(99)
285                 .getAs(Long.class).toBlocking().single();
286         assertEquals(0, count);
287     }
288 
289     @Test
290     public void testTransactionOnRollback() {
291         Database db = db();
292         Observable<Boolean> begin = db.beginTransaction();
293         Observable<Integer> updateCount = db.update("update person set score=?").dependsOn(begin)
294                 .parameter(99).count();
295         db.rollback(updateCount);
296         long count = db.select("select count(*) from person where score=?").parameter(99)
297                 .dependsOnLastTransaction().getAs(Long.class).toBlocking().single();
298         assertEquals(0, count);
299     }
300 
301     @Test
302     public void testUpdateAndSelectWithTransaction() {
303         Database db = db();
304         Observable<Boolean> begin = db.beginTransaction();
305         Observable<Integer> updateCount = db
306         // update everyone's score to 99
307                 .update("update person set score=?")
308                 // in transaction
309                 .dependsOn(begin)
310                 // new score
311                 .parameter(99)
312                 // execute
313                 .count();
314         long count = db.select("select count(*) from person where score=?")
315         // where score = 99
316                 .parameter(99)
317                 // depends on
318                 .dependsOn(updateCount)
319                 // as long value
320                 .getAs(Long.class).toBlocking().single();
321 
322         assertEquals(3, count);
323     }
324 
325     @Test
326     public void testUseParameterObservable() {
327         int count = db().select("select name from person where name >?")
328                 .parameters(Observable.just("ALEX")).count().toBlocking().single();
329         assertEquals(3, count);
330     }
331 
332     @Test
333     public void testTwoParameters() {
334         List<String> list = db().select("select name from person where name > ? and name < ?")
335                 .parameter("ALEX").parameter("LOUIS").getAs(String.class).toList().toBlocking()
336                 .single();
337         assertEquals(asList("FRED", "JOSEPH"), list);
338     }
339 
340     @Test
341     public void testTakeFewerThanAvailable() {
342         int count = db().select("select name from person where name >?").parameter("ALEX")
343                 .get(new ResultSetMapper<Integer>() {
344                     @Override
345                     public Integer call(ResultSet rs) throws SQLException {
346                         return 1;
347                     }
348                 }).take(2).count().first().toBlocking().single();
349         assertEquals(2, count);
350     }
351 
352     @Test
353     public void testJdbcObservableCountLettersInAllNames() {
354         int count = MathObservable.sumInteger(db()
355         // select
356                 .select("select name from person where name >?")
357                 // set name
358                 .parameter("ALEX")
359                 // count letters
360                 .get(COUNT_LETTERS_IN_NAME))
361         // first result
362                 .first()
363                 // block and get result
364                 .toBlocking().single();
365         assertEquals(19, count);
366     }
367 
368     private static final ResultSetMapper<Integer> COUNT_LETTERS_IN_NAME = new ResultSetMapper<Integer>() {
369         @Override
370         public Integer call(ResultSet rs) throws SQLException {
371             return rs.getString("name").length();
372         }
373     };
374 
375     @Test
376     public void testTransformToTuple2AndTestActionsPrintln() {
377         Tuple2<String, Integer> tuple = db()
378                 .select("select name,score from person where name >? order by name")
379                 .parameter("ALEX").getAs(String.class, Integer.class).last().toBlocking().single();
380         assertEquals("MARMADUKE", tuple.value1());
381         assertEquals(25, (int) tuple.value2());
382     }
383 
384     @Test
385     public void testTransformToTupleN() {
386         TupleN<String> tuple = db().select("select name, lower(name) from person order by name")
387                 .getTupleN(String.class).first().toBlocking().single();
388         assertEquals("FRED", tuple.values().get(0));
389         assertEquals("fred", tuple.values().get(1));
390     }
391 
392     @Test
393     public void testMultipleSetsOfParameters() {
394         List<Integer> list = db().select("select score from person where name=?")
395         // first param
396                 .parameter("FRED")
397                 // second param
398                 .parameter("JOSEPH")
399                 // score as integer
400                 .getAs(Integer.class)
401                 // log
402                 .doOnEach(log())
403                 // sort
404                 .toSortedList()
405                 // block and get
406                 .toBlocking().single();
407         assertEquals(asList(21, 34), list);
408     }
409 
410     @Test
411     public void testNoParams() {
412         List<Tuple2<String, Integer>> tuples = db()
413                 .select("select name, score from person where name=? order by name")
414                 .getAs(String.class, Integer.class).toList().toBlocking().single();
415         assertEquals(0, tuples.size());
416     }
417 
418     @Test
419     public void testCreateFromScript() {
420         Database db = Database.from(DatabaseCreator.nextUrl());
421         Observable<Integer> create = db.run(
422                 DatabaseTestBase.class.getResourceAsStream("/db-creation-script.sql"), ";");
423         Observable<Integer> count = db.select("select name from person").dependsOn(create)
424                 .getAs(String.class).count();
425         assertIs(3, count);
426     }
427 
428     @Test
429     public void testComposition2() {
430         log.debug("running testComposition2");
431         Func1<Integer, Boolean> isZero = new Func1<Integer, Boolean>() {
432             @Override
433             public Boolean call(Integer count) {
434                 return count == 0;
435             }
436         };
437         Database db = db();
438         Observable<Integer> existingRows = db.select("select name from person where name=?")
439                 .parameter("FRED").getAs(String.class).count().filter(isZero);
440         List<Integer> counts = db.update("insert into person(name,score) values(?,?)")
441                 .parameters(existingRows).count().toList().toBlocking().single();
442         assertEquals(0, counts.size());
443     }
444 
445     @Test
446     public void testEmptyResultSet() {
447         int count = db().select("select name from person where name >?")
448                 .parameters(Observable.just("ZZTOP")).count().first().toBlocking().single();
449         assertEquals(0, count);
450     }
451 
452     @Test
453     public void testMixingExplicitAndObservableParameters() {
454         String name = db()
455                 .select("select name from person where name > ?  and score < ? order by name")
456                 .parameter("BARRY").parameters(Observable.just(100)).getAs(String.class).first()
457                 .toBlocking().single();
458         assertEquals("FRED", name);
459     }
460 
461     @Test
462     public void testInstantiateDatabaseWithUrl() throws SQLException {
463         Database db = Database.from("jdbc:h2:mem:testa1");
464         Connection con = db.queryContext().connectionProvider().get();
465         con.close();
466     }
467 
468     @Test
469     public void testComposition() {
470         // use composition to find the first person alphabetically with
471         // a score less than the person with the last name alphabetically
472         // whose name is not XAVIER. Two threads and connections will be used.
473 
474         Database db = db();
475         Observable<Integer> score = db
476                 .select("select score from person where name <> ? order by name")
477                 .parameter("XAVIER").getAs(Integer.class).last();
478         Observable<String> name = db
479                 .select("select name from person where score < ? order by name").parameters(score)
480                 .getAs(String.class).first();
481         assertIs("FRED", name);
482     }
483 
484     @Test
485     public void testCompositionUsingLift() {
486         // use composition to find the first person alphabetically with
487         // a score less than the person with the last name alphabetically
488         // whose name is not XAVIER. Two threads and connections will be used.
489 
490         Database db = db();
491         Observable<String> name = db
492                 .select("select score from person where name <> ? order by name")
493                 .parameter("XAVIER")
494                 .getAs(Integer.class)
495                 .last()
496                 .lift(db.select("select name from person where score < ? order by name")
497                         .parameterOperator().getAs(String.class)).first();
498         assertIs("FRED", name);
499     }
500 
501     @Test
502     public void testCompositionTwoLevels() {
503 
504         Database db = db();
505         Observable<String> names = db.select("select name from person order by name").getAs(
506                 String.class);
507         Observable<String> names2 = db
508                 .select("select name from person where name<>? order by name").parameters(names)
509                 .parameters(names).getAs(String.class);
510         List<String> list = db.select("select name from person where name>?").parameters(names2)
511                 .getAs(String.class).toList().toBlocking().single();
512         System.out.println(list);
513         assertEquals(12, list.size());
514     }
515 
516     @Test(expected = RuntimeException.class)
517     public void testSqlProblem() {
518         String name = db().select("select name from pperson where name >?").parameter("ALEX")
519                 .getAs(String.class).first().toBlocking().single();
520         log.debug(name);
521     }
522 
523     @Test(expected = ClassCastException.class)
524     public void testException() {
525         Integer name = db().select("select name from person where name >?").parameter("ALEX")
526                 .getAs(Integer.class).first().toBlocking().single();
527         log.debug("name=" + name);
528     }
529 
530     @Test
531     public void testAutoMapWillMapStringToStringAndIntToDouble() {
532         Person person = db().select("select name,score,dob,registered from person order by name")
533                 .autoMap(Person.class).first().toBlocking().single();
534         assertEquals("FRED", person.getName());
535         assertEquals(21, person.getScore(), 0.001);
536         assertNull(person.getDateOfBirth());
537     }
538 
539     @Test(expected = RuntimeException.class)
540     public void testAutoMapCannotFindConstructorWithEnoughParameters() {
541         db().select("select name,score,dob,registered,name from person order by name")
542                 .autoMap(Person.class).first().toBlocking().single();
543     }
544 
545     @Test
546     public void testGetTimestamp() {
547         Database db = db();
548         java.sql.Timestamp registered = new java.sql.Timestamp(100);
549         Observable<Integer> u = db.update("update person set registered=? where name=?")
550                 .parameter(registered).parameter("FRED").count();
551         Date regTime = db.select("select registered from person order by name").dependsOn(u)
552                 .getAs(Date.class).first().toBlocking().single();
553         assertEquals(100, regTime.getTime());
554     }
555 
556     @Test
557     public void insertClobAndReadAsString() throws SQLException {
558         Database db = db();
559         insertClob(db);
560         // read clob as string
561         String text = db.select("select document from person_clob").getAs(String.class).first()
562                 .toBlocking().single();
563         assertTrue(text.contains("about Fred"));
564     }
565 
566     @Test
567     public void insertNullClobAndReadAsString() throws SQLException {
568         Database db = db();
569         insertClob(db, null);
570         // read clob as string
571         String text = db.select("select document from person_clob").getAs(String.class).first()
572                 .toBlocking().single();
573         assertNull(text);
574     }
575 
576     @Test
577     public void insertNullBlobAndReadAsByteArray() throws SQLException {
578         Database db = db();
579         insertBlob(db, null);
580         // read clob as string
581         byte[] bytes = db.select("select document from person_blob").getAs(byte[].class).first()
582                 .toBlocking().single();
583         assertNull(bytes);
584     }
585 
586     private static void insertClob(Database db) {
587         insertClob(db, "A description about Fred that is rather long and needs a Clob to store it");
588     }
589 
590     private static void insertClob(Database db, String value) {
591         Observable<Integer> count = db.update("insert into person_clob(name,document) values(?,?)")
592                 .parameter("FRED").parameterClob(value).count();
593         assertIs(1, count);
594     }
595 
596     private static void insertBlob(Database db, byte[] bytes) {
597         Observable<Integer> count = db.update("insert into person_blob(name,document) values(?,?)")
598                 .parameter("FRED").parameterBlob(bytes).count();
599         assertIs(1, count);
600     }
601 
602     @Test
603     public void insertClobAndReadAsReader() throws SQLException, IOException {
604         Database db = db();
605         insertClob(db);
606         // read clob as Reader
607         String text = db.select("select document from person_clob").getAs(Reader.class)
608                 .map(Util.READER_TO_STRING).first().toBlocking().single();
609         assertTrue(text.contains("about Fred"));
610     }
611 
612     @Test
613     public void insertBlobAndReadAsByteArray() throws SQLException {
614         Database db = db();
615         insertBlob(db);
616         // read clob as string
617         byte[] bytes = db.select("select document from person_blob").getAs(byte[].class).first()
618                 .toBlocking().single();
619         assertTrue(new String(bytes).contains("about Fred"));
620     }
621 
622     @Test
623     public void testInsertNull() {
624         Observable<Integer> count = db().update("insert into person(name,score,dob) values(?,?,?)")
625                 .parameters("JACK", 42, null).count();
626         assertIs(1, count);
627     }
628 
629     @Test
630     public void testRC4() {
631         Observable.<Object> empty().concatWith(just(10, 20, 30)).buffer(3)
632                 .concatMap(new Func1<List<Object>, Observable<Object>>() {
633 
634                     @Override
635                     public Observable<Object> call(List<Object> list) {
636                         return Observable.from(list);
637                     }
638                 }).count().toBlocking().single();
639         // Observable<Integer> count = db()
640         // .update("insert into person(name,score,dob) values(?,?,?)")
641         // .parameters("JACK", 42, null).count();
642         // assertIs(1, count);
643     }
644 
645     @Test
646     public void testAutoMap() {
647         TimeZone current = TimeZone.getDefault();
648         try {
649             TimeZone.setDefault(TimeZone.getTimeZone("AEST"));
650             Database db = db();
651             Date dob = new Date(100);
652             long now = System.currentTimeMillis();
653             java.sql.Timestamp registered = new java.sql.Timestamp(now);
654             Observable<Integer> u = db.update("update person set dob=?, registered=? where name=?")
655                     .parameter(dob).parameter(registered).parameter("FRED").count();
656             Person person = db.select("select name,score,dob,registered from person order by name")
657                     .dependsOn(u).autoMap(Person.class).first().toBlocking().single();
658             assertEquals("FRED", person.getName());
659             assertEquals(21, person.getScore(), 0.001);
660             // Dates are truncated to start of day
661             assertEquals(0, (long) person.getDateOfBirth());
662             assertEquals(now, (long) person.getRegistered());
663         } finally {
664             TimeZone.setDefault(current);
665         }
666     }
667 
668     @Test
669     public void testLastTransactionWithoutTransaction() {
670         assertIs(0, db().lastTransactionResult().count());
671     }
672 
673     @Test
674     public void testTuple3() {
675         Tuple3<String, Integer, String> tuple = db()
676                 .select("select name,1,lower(name) from person order by name")
677                 .getAs(String.class, Integer.class, String.class).first().toBlocking().single();
678         assertEquals("FRED", tuple.value1());
679         assertEquals(1, (int) tuple.value2());
680         assertEquals("fred", tuple.value3());
681     }
682 
683     @Test
684     public void testTuple4() {
685         Tuple4<String, Integer, String, Integer> tuple = db()
686                 .select("select name,1,lower(name),2 from person order by name")
687                 .getAs(String.class, Integer.class, String.class, Integer.class).first()
688                 .toBlocking().single();
689         assertEquals("FRED", tuple.value1());
690         assertEquals(1, (int) tuple.value2());
691         assertEquals("fred", tuple.value3());
692         assertEquals(2, (int) tuple.value4());
693     }
694 
695     @Test
696     public void testTuple5() {
697         Tuple5<String, Integer, String, Integer, String> tuple = db()
698                 .select("select name,1,lower(name),2,name from person order by name")
699                 .getAs(String.class, Integer.class, String.class, Integer.class, String.class)
700                 .first().toBlocking().single();
701         assertEquals("FRED", tuple.value1());
702         assertEquals(1, (int) tuple.value2());
703         assertEquals("fred", tuple.value3());
704         assertEquals(2, (int) tuple.value4());
705         assertEquals("FRED", tuple.value5());
706     }
707 
708     @Test
709     public void testTuple6() {
710         Tuple6<String, Integer, String, Integer, String, Integer> tuple = db()
711                 .select("select name,1,lower(name),2,name,3 from person order by name")
712                 .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
713                         Integer.class).first().toBlocking().single();
714         assertEquals("FRED", tuple.value1());
715         assertEquals(1, (int) tuple.value2());
716         assertEquals("fred", tuple.value3());
717         assertEquals(2, (int) tuple.value4());
718         assertEquals("FRED", tuple.value5());
719         assertEquals(3, (int) tuple.value6());
720     }
721 
722     @Test
723     public void testTuple7() {
724         Tuple7<String, Integer, String, Integer, String, Integer, Integer> tuple = db()
725                 .select("select name,1,lower(name),2,name,3,4 from person order by name")
726                 .getAs(String.class, Integer.class, String.class, Integer.class, String.class,
727                         Integer.class, Integer.class).first().toBlocking().single();
728         assertEquals("FRED", tuple.value1());
729         assertEquals(1, (int) tuple.value2());
730         assertEquals("fred", tuple.value3());
731         assertEquals(2, (int) tuple.value4());
732         assertEquals("FRED", tuple.value5());
733         assertEquals(3, (int) tuple.value6());
734         assertEquals(4, (int) tuple.value7());
735     }
736 
737     @Test
738     public void testAutoMapClob() {
739         Database db = db();
740         insertClob(db);
741         List<PersonClob> list = db.select("select name, document from person_clob")
742                 .autoMap(PersonClob.class).toList().toBlocking().single();
743         assertEquals(1, list.size());
744         assertEquals("FRED", list.get(0).getName());
745         assertTrue(list.get(0).getDocument().contains("rather long"));
746     }
747 
748     @Test
749     public void testAutoMapBlob() {
750         Database db = db();
751         insertBlob(db);
752         List<PersonBlob> list = db.select("select name, document from person_blob")
753                 .autoMap(PersonBlob.class).toList().toBlocking().single();
754         assertEquals(1, list.size());
755         assertEquals("FRED", list.get(0).getName());
756         assertTrue(new String(list.get(0).getDocument()).contains("rather long"));
757     }
758 
759     private void insertBlob(Database db) {
760         insertBlob(db,
761                 "A description about Fred that is rather long and needs a Clob to store it"
762                         .getBytes());
763     }
764 
765     @Test
766     public void testCalendarParameter() throws SQLException {
767         Database db = db();
768         Calendar cal = Calendar.getInstance();
769         cal.setTimeInMillis(0);
770         Observable<Integer> update = db.update("update person set registered=? where name=?")
771                 .parameters(cal, "FRED").count();
772         Timestamp t = db.select("select registered from person where name=?").parameter("FRED")
773                 .dependsOn(update).getAs(Timestamp.class).first().toBlocking().single();
774         assertEquals(0, t.getTime());
775     }
776 
777     @Test
778     public void testDatabaseBuilder() {
779         Database.builder().connectionProvider(connectionProvider())
780                 .nonTransactionalSchedulerOnCurrentThread().build();
781     }
782 
783     @Test
784     public void testConnectionPool() {
785         ConnectionProviderPooled cp = new ConnectionProviderPooled(nextUrl(), 0, 10);
786         Database db = createDatabase(cp);
787         int count = db.select("select name from person order by name").count().toBlocking()
788                 .single();
789         assertEquals(3, count);
790         cp.close();
791         // and again to test idempotentcy
792         cp.close();
793     }
794 
795     @Test(expected = RuntimeException.class)
796     public void testConnectionPoolWhenExceptionThrown() throws SQLException {
797         HikariDataSource pool = new HikariDataSource();
798         pool.setJdbcUrl("invalid");
799         new ConnectionProviderPooled(pool).get();
800     }
801 
802     @Test
803     public void testConnectionPoolDoesNotRunOutOfConnectionsWhenQueryRunRepeatedly()
804             throws SQLException {
805         ConnectionProviderPooled cp = new ConnectionProviderPooled(nextUrl(), 0, 1);
806         Database db = new Database(cp);
807         Connection con = cp.get();
808         DatabaseCreator.createDatabase(con);
809         con.close();
810         assertIs(
811                 100,
812                 db.select("select name from person where name=?")
813                         .parameters(Observable.range(0, 100).map(constant("FRED"))).count());
814     }
815 
816     // TODO add unit test to check that resources closed (connection etc) before
817     // onComplete or onError called on either select or update
818 
819     @Test
820     public void testDatabaseBuilderWithPool() {
821         Database.builder().url(nextUrl()).pool(0, 5).build().close();
822     }
823 
824     @Test
825     public void testOneConnectionOpenAndClosedAfterOneSelect() throws InterruptedException {
826         CountDownConnectionProvider cp = new CountDownConnectionProvider(1, 1);
827         Database db = new Database(cp);
828         db.select("select name from person").count().toBlocking().single();
829         cp.closesLatch().await();
830         cp.getsLatch().await();
831     }
832 
833     @Test
834     public void testOneConnectionOpenAndClosedAfterOneUpdate() throws InterruptedException {
835         CountDownConnectionProvider cp = new CountDownConnectionProvider(1, 1);
836         Database db = new Database(cp);
837         db.update("update person set score=? where name=?").parameters(23, "FRED").count()
838                 .toBlocking().single();
839         cp.closesLatch().await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
840         cp.getsLatch().await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
841     }
842 
843     @Test
844     public void testLiftWithParameters() {
845         int score = just("FRED")
846                 .lift(db().select("select score from person where name=?").parameterOperator()
847                         .getAs(Integer.class)).toBlocking().single();
848         assertEquals(21, score);
849     }
850 
851     @Test
852     public void testLiftWithManyParameters() {
853         int score = Observable
854         // range
855                 .range(1, 3)
856                 // log
857                 .doOnEach(log())
858                 // to parameter
859                 .map(constant("FRED")).lift(db()
860                 // select
861                         .select("select score from person where name=?")
862                         // push parameters
863                         .parameterOperator()
864                         // get score as integer
865                         .getAs(Integer.class))
866                 // sum values
867                 .lift(SUM_INTEGER)
868                 // block and get
869                 .toBlocking().single();
870         assertEquals(3 * 21, score);
871     }
872 
873     private final Operator<Integer, Integer> SUM_INTEGER = RxUtil
874             .toOperator(new Func1<Observable<Integer>, Observable<Integer>>() {
875                 @Override
876                 public Observable<Integer> call(Observable<Integer> source) {
877                     return MathObservable.sumInteger(source);
878                 }
879             });
880 
881     @Test
882     public void testDetector() throws InterruptedException {
883         CountDownLatch latch = new CountDownLatch(1);
884         Observable.range(1, 10).doOnUnsubscribe(countDown(latch)).take(1).toBlocking().single();
885         assertTrue(latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
886     }
887 
888     @Test
889     public void testUnsubscribeOfBufferAndFlatMap() throws InterruptedException {
890         CountDownLatch latch = new CountDownLatch(1);
891         Observable.interval(10, TimeUnit.MILLISECONDS).doOnUnsubscribe(countDown(latch)).buffer(2)
892                 .flatMap(constant(just(1L))).take(6).toList().toBlocking().single();
893         assertTrue(latch.await(3, TimeUnit.SECONDS));
894     }
895 
896     @Test
897     public void testParametersAreUnsubscribedIfUnsubscribedPostParameterOperatorLift()
898             throws InterruptedException {
899         CountDownLatch latch = new CountDownLatch(1);
900         Observable
901                 .interval(100, TimeUnit.MILLISECONDS)
902                 .doOnEach(log())
903                 .map(constant("FRED"))
904                 .doOnUnsubscribe(countDown(latch))
905                 .lift(db().select("select score from person where name=?").parameterOperator()
906                         .getAs(Integer.class)).take(1).subscribe(log());
907         assertTrue(latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
908     }
909 
910     @Test
911     public void testParametersAreUnsubscribed() throws InterruptedException {
912         CountDownLatch latch = new CountDownLatch(1);
913         Observable<String> params = Observable.interval(100, TimeUnit.MILLISECONDS).doOnEach(log())
914                 .map(constant("FRED")).doOnUnsubscribe(countDown(latch));
915         db().select("select score from person where name=?").parameters(params)
916                 .getAs(Integer.class).take(1).subscribe(log());
917         assertTrue(latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
918     }
919 
920     @Test
921     public void testTakeShouldNotHang() {
922         assertEquals(1, (int) Observable.<Integer> empty().concatWith(Observable.just(1)).take(1)
923                 .toBlocking().single());
924     }
925 
926     @Test
927     public void testLiftSelectWithDependencies() {
928         Database db = db();
929         Observable<Integer> count = db
930                 .update("update person set score=? where name=?")
931                 .parameters(4, "FRED")
932                 .count()
933                 .lift(db.select("select score from person where name=?").parameters("FRED")
934                         .dependsOnOperator().getAs(Integer.class));
935         assertIs(4, count);
936     }
937 
938     @Test
939     public void testLiftUpdateWithParameters() {
940         Database db = db();
941         Observable<Integer> count = just(4, "FRED").lift(
942                 db.update("update person set score=? where name=?").parameterOperator());
943         assertIs(1, count);
944     }
945 
946     @Test
947     public void testLiftUpdateWithDependencies() {
948         Database db = db();
949         Observable<Integer> score = Observable
950         // parameters for coming update
951                 .just(4, "FRED")
952                 // update Fred's score to 4
953                 .lift(db.update("update person set score=? where name=?").parameterOperator())
954                 // update everyone with score of 4 to 14
955                 .lift(db.update("update person set score=? where score=?").parameters(14, 4)
956                         .dependsOnOperator())
957                 // get Fred's score
958                 .lift(db.select("select score from person where name=?").parameters("FRED")
959                         .dependsOnOperator().getAs(Integer.class));
960         assertIs(14, score);
961     }
962 
963     static <T> void assertIs(T t, Observable<T> observable) {
964         assertEquals(t, observable.toBlocking().single());
965     }
966 
967     @Test
968     public void testTwoConnectionsOpenedAndClosedAfterTwoSelects() throws InterruptedException {
969         CountDownConnectionProvider cp = new CountDownConnectionProvider(2, 2);
970         Database db = new Database(cp);
971         db.select("select name from person").count().toBlocking().single();
972         db.select("select name from person").count().toBlocking().single();
973         assertTrue(cp.getsLatch().await(60, TimeUnit.SECONDS));
974         assertTrue(cp.closesLatch().await(60, TimeUnit.SECONDS));
975     }
976 
977     @Test
978     public void testTwoConnectionsOpenedAndClosedAfterTwoUpdates() throws InterruptedException {
979         CountDownConnectionProvider cp = new CountDownConnectionProvider(2, 2);
980         Database db = new Database(cp);
981         db.update("update person set score=? where name=?").parameters(23, "FRED").count()
982                 .toBlocking().single();
983         db.update("update person set score=? where name=?").parameters(25, "JOHN").count()
984                 .toBlocking().single();
985         assertTrue(cp.getsLatch().await(60, TimeUnit.SECONDS));
986         assertTrue(cp.closesLatch().await(60, TimeUnit.SECONDS));
987     }
988 
989     @Test
990     public void testOneConnectionOpenedAndClosedAfterTwoSelectsWithinTransaction()
991             throws InterruptedException {
992         CountDownConnectionProvider cp = new CountDownConnectionProvider(1, 1);
993         Database db = new Database(cp);
994         Observable<Boolean> begin = db.beginTransaction();
995         Observable<Integer> count = db.select("select name from person").dependsOn(begin).count();
996         Observable<Integer> count2 = db.select("select name from person").dependsOn(count).count();
997         int result = db.commit(count2).count().toBlocking().single();
998         log.info("committed " + result);
999         cp.getsLatch().await();
1000         log.info("gets ok");
1001         cp.closesLatch().await();
1002         log.info("closes ok");
1003     }
1004 
1005     @Test
1006     public void testOneConnectionOpenedAndClosedAfterTwoUpdatesWithinTransaction()
1007             throws InterruptedException {
1008         CountDownConnectionProvider cp = new CountDownConnectionProvider(1, 1);
1009         Database db = new Database(cp);
1010         Observable<Boolean> begin = db.beginTransaction();
1011         Observable<Integer> count = db.update("update person set score=? where name=?")
1012                 .dependsOn(begin).parameters(23, "FRED").count();
1013         Observable<Integer> count2 = db.update("update person set score=? where name=?")
1014                 .dependsOn(count).parameters(25, "JOHN").count();
1015         int result = db.commit(count2).count().toBlocking().single();
1016         log.info("committed " + result);
1017         cp.getsLatch().await();
1018         log.info("gets ok");
1019         cp.closesLatch().await();
1020         log.info("closes ok");
1021     }
1022 
1023     @Test
1024     public void testCloseDatabaseClosesConnectionProvider() {
1025         ConnectionProvider cp = createMock(ConnectionProvider.class);
1026         cp.close();
1027         EasyMock.expectLastCall().once();
1028         EasyMock.replay(cp);
1029         new Database(cp).close();
1030         EasyMock.verify(cp);
1031     }
1032 
1033     @Test
1034     public void testCloseAutoCommittingConnectionProviderClosesInternalConnectionProvider() {
1035         ConnectionProvider cp = createMock(ConnectionProvider.class);
1036         cp.close();
1037         EasyMock.expectLastCall().once();
1038         EasyMock.replay(cp);
1039         new ConnectionProviderAutoCommitting(cp).close();
1040         EasyMock.verify(cp);
1041     }
1042 
1043     @Test
1044     public void testCloseSingletonManualCommitConnectionProviderClosesInternalConnectionProvider() {
1045         ConnectionProvider cp = createMock(ConnectionProvider.class);
1046         cp.close();
1047         EasyMock.expectLastCall().once();
1048         EasyMock.replay(cp);
1049         new ConnectionProviderSingletonManualCommit(cp).close();
1050         EasyMock.verify(cp);
1051     }
1052 
1053     @Test
1054     public void testCloseConnectionProviderFromUrlClosesInternalConnectionProvider() {
1055         db().close();
1056     }
1057 
1058     @Test(expected = RuntimeException.class)
1059     public void testCannotPassObservableAsSingleParameter() {
1060         db().select("anything").parameter(Observable.just(123));
1061     }
1062 
1063     @Test
1064     public void testConnectionsReleasedByUpdateStatementBeforeOnNext() throws InterruptedException {
1065         final CountDownConnectionProvider cp = new CountDownConnectionProvider(1, 1);
1066         Database db = new Database(cp);
1067         Observable<Integer> result = db.update("update person set score = 1 where name=?")
1068                 .parameter("FRED").count();
1069 
1070         checkConnectionsReleased(cp, result);
1071     }
1072 
1073     @Test
1074     public void testConnectionsReleasedByCommitBeforeOnNext() throws InterruptedException {
1075         final CountDownConnectionProvider cp = new CountDownConnectionProvider(1, 1);
1076         Database db = new Database(cp);
1077         Observable<Boolean> begin = db.beginTransaction();
1078         Observable<Integer> result = db.update("update person set score = 1 where name=?")
1079                 .dependsOn(begin).parameter("FRED").count();
1080         checkConnectionsReleased(cp, db.commit(result));
1081     }
1082 
1083     @Test
1084     public void testConnectionsReleasedByRollbackBeforeOnNext() throws InterruptedException {
1085         final CountDownConnectionProvider cp = new CountDownConnectionProvider(1, 1);
1086         Database db = new Database(cp);
1087         Observable<Boolean> begin = db.beginTransaction();
1088         Observable<Integer> result = db.update("update person set score = 1 where name=?")
1089                 .dependsOn(begin).parameter("FRED").count();
1090         checkConnectionsReleased(cp, db.rollback(result));
1091     }
1092 
1093     private void checkConnectionsReleased(final CountDownConnectionProvider cp, Observable<?> result)
1094             throws InterruptedException {
1095         final CountDownLatch latch = new CountDownLatch(1);
1096         result.subscribe(new Action1<Object>() {
1097 
1098             @Override
1099             public void call(Object obj) {
1100                 try {
1101                     if (cp.closesLatch().await(TIMEOUT_SECONDS, TimeUnit.SECONDS))
1102                         latch.countDown();
1103                 } catch (InterruptedException e) {
1104                     e.printStackTrace();
1105                 }
1106             }
1107         });
1108         assertTrue(latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
1109     }
1110 
1111     @Test
1112     public void testCanChainUpdateStatementsWithinTransaction() {
1113         Database db = db();
1114         Observable<Boolean> begin = db.beginTransaction();
1115         Observable<Integer> updates = Observable
1116         // set name parameter
1117                 .just("FRED")
1118                 // push into update
1119                 .lift(db.update("update person set score=1 where name=?").dependsOn(begin)
1120                         .parameterOperator())
1121                 // map num rows affected to JOHN
1122                 .map(constant("JOHN"))
1123                 // push into second update
1124                 .lift(db.update("update person set score=2 where name=?").parameterOperator());
1125         db.commit(updates).toBlocking().single();
1126     }
1127 
1128     @Test
1129     public void testCommitOperator() {
1130         Database db = db();
1131         Observable<Boolean> begin = db.beginTransaction();
1132         String name = Observable
1133         // set name parameter
1134                 .just("FRED")
1135                 // push into update
1136                 .lift(db.update("update person set score=1 where name=?").dependsOn(begin)
1137                         .parameterOperator())
1138                 // map num rows affected to JOHN
1139                 .lift(db.commitOperator())
1140                 // select query
1141                 .lift(db.select("select name from person where score=1")
1142                 // depends on commit
1143                         .dependsOnOperator()
1144                         // return names
1145                         .getAs(String.class))
1146                 // return first name
1147                 .first()
1148                 // block to get make everything run
1149                 .toBlocking().single();
1150         assertEquals("FRED", name);
1151     }
1152 
1153     @Test
1154     public void testTryCatch() {
1155         try (Connection con = DatabaseCreator.nextConnection();
1156                 PreparedStatement ps = con
1157                         .prepareStatement("select name from person where name > ? order by name");) {
1158             ps.setObject(1, "ALEX");
1159             List<String> list = new ArrayList<String>();
1160             try (ResultSet rs = ps.executeQuery()) {
1161                 while (rs.next()) {
1162                     list.add(rs.getString(1));
1163                 }
1164             }
1165             System.out.println(list);
1166         } catch (SQLException e) {
1167             throw new RuntimeException(e);
1168         }
1169     }
1170 
1171     @Test
1172     public void testChainSelectUsingOperators() {
1173         Database db = db();
1174         List<Integer> scores = db.select("select name from person")
1175         // get name
1176                 .getAs(String.class)
1177                 // push name as parameter to next select
1178                 .lift(db
1179                 // select scores
1180                 .select("select score from person where name=?")
1181                 // parameters are pushed
1182                         .parameterOperator()
1183                         // get score as integer
1184                         .getAs(Integer.class))
1185                 // sort scores
1186                 .toSortedList()
1187                 // block to get result
1188                 .toBlocking().single();
1189         assertEquals(asList(21, 25, 34), scores);
1190     }
1191 
1192     @Test
1193     public void testBeginTransactionEmitsOneItem() {
1194         Database db = db();
1195         Boolean value = db.beginTransaction().toBlocking().single();
1196         assertTrue(value);
1197     }
1198 
1199     @Test
1200     public void testCommitOnLastOperator() {
1201         Database db = db();
1202         long count = db
1203         // start transaction
1204                 .beginTransaction()
1205                 // push parameters
1206                 .concatMap(constant(just(99, 88)))
1207                 // log
1208                 .doOnEach(log())
1209                 // update twice
1210                 .lift(db.update("update person set score=?")
1211                 // push parameters
1212                         .parameterOperator())
1213                 // commit on last
1214                 .lift(db.commitOnCompleteOperator())
1215                 // get count of 88s
1216                 .lift(db.select("select count(*) from person where score=88")
1217                 // depends on previous
1218                         .dependsOnOperator()
1219                         // count as Long
1220                         .getAs(Long.class))
1221                 // block and get result
1222                 .toBlocking().single();
1223         assertEquals(3, count);
1224     }
1225 
1226     @Test
1227     public void testRollbackOnLastOperator() {
1228         Database db = db();
1229         long count = db
1230         // start transaction
1231                 .beginTransaction()
1232                 // push parameters
1233                 .concatMap(constant(just(99, 88)))
1234                 // log
1235                 .doOnEach(log())
1236                 // update twice
1237                 .lift(db.update("update person set score=?")
1238                 // push parameters
1239                         .parameterOperator())
1240                 // commit on last
1241                 .lift(db.rollbackOnCompleteOperator())
1242                 // get count of 88s
1243                 .lift(db.select("select count(*) from person where score=88")
1244                 // depends on previous
1245                         .dependsOnOperator()
1246                         // count as Long
1247                         .getAs(Long.class))
1248                 // block and get result
1249                 .toBlocking().single();
1250         assertEquals(0, count);
1251     }
1252 
1253     @Test
1254     public void testBeginTransactionOnNextForThreePasses() {
1255         Database db = db();
1256         Observable<Integer> min = Observable
1257         // do 3 times
1258                 .just(11, 12, 13)
1259                 // begin transaction for each item
1260                 .lift(db.beginTransactionOnNextOperator())
1261                 // update all scores to the item
1262                 .lift(db.update("update person set score=?").parameterOperator())
1263                 // to empty parameter list
1264                 .map(toEmpty())
1265                 // increase score
1266                 .lift(db.update("update person set score=score + 5").parameterListOperator())
1267                 // only expect one result so can flatten
1268                 .lift(RxUtil.<Integer> flatten())
1269                 // commit transaction
1270                 .lift(db.commitOnNextOperator())
1271                 // to empty lists
1272                 .map(toEmpty())
1273                 // return count
1274                 .lift(db.select("select min(score) from person").dependsOnOperator()
1275                         .getAs(Integer.class));
1276         assertIs(18, min);
1277     }
1278 
1279     @Test
1280     public void testParameterListOperator() {
1281         Database db = db();
1282         @SuppressWarnings("unchecked")
1283         int count =
1284         // parameters grouped in lists
1285         objects(objects(1), objects(2))
1286         // log
1287                 .doOnEach(log())
1288                 // begin trans
1289                 .lift(db.<Observable<Object>> beginTransactionOnNextOperator())
1290                 // log
1291                 .doOnEach(log())
1292                 // update
1293                 .lift(db.update("update person set score = ?")
1294                 // push lists of parameters
1295                         .parameterListOperator())
1296                 // log
1297                 .doOnEach(log())
1298                 // commit
1299                 .lift(db.<Integer> commitOnNextListOperator())
1300                 // total rows affected
1301                 .count()
1302                 // block and get result
1303                 .toBlocking().single();
1304         assertEquals(2, count);
1305     }
1306 
1307     @Test
1308     public void testParameterListOperatorWhenQueryNeedsTwoParameters() {
1309         Database db = db();
1310         @SuppressWarnings("unchecked")
1311         int count =
1312         // parameters grouped in lists
1313         objects(objects(1, "FRED", 3, "JOHN"), objects(2, "JOSEPH"))
1314         // log
1315                 .doOnEach(log())
1316                 // begin trans
1317                 .lift(db.<Observable<Object>> beginTransactionOnNextOperator())
1318                 // log
1319                 .doOnEach(log())
1320                 // update
1321                 .lift(db.update("update person set score = ? where name=?")
1322                 // push lists of parameters
1323                         .parameterListOperator())
1324                 // log
1325                 .doOnEach(log())
1326                 // commit
1327                 .lift(db.<Integer> commitOnNextListOperator())
1328                 // total rows affected
1329                 .count()
1330                 // block and get result
1331                 .toBlocking().single();
1332         assertEquals(2, count);
1333     }
1334 
1335     @Test
1336     public void testCanExecuteCreateSchema() {
1337         Database db = db();
1338         int count = db.update("create schema if not exists special_user").count().toBlocking()
1339                 .single();
1340         assertEquals(0, count);
1341     }
1342 
1343     @Test
1344     public void testCanExecuteCreateTable() {
1345         Database db = db();
1346         int count = db.update("create table  mytemp(name varchar2(100) primary key)").count()
1347                 .toBlocking().single();
1348         assertEquals(0, count);
1349     }
1350 
1351     private static Observable<Object> objects(Object... objects) {
1352         return Observable.from(objects);
1353     }
1354 
1355     private static Observable<Observable<Object>> objects(
1356             @SuppressWarnings("unchecked") Observable<Object>... objects) {
1357         return Observable.from(objects);
1358     }
1359 
1360     @Test
1361     public void testDatabaseFromConnectionCanUseConnectionTwiceWithoutItBeingClosedInReality()
1362             throws SQLException {
1363         ConnectionProvider cp = DatabaseCreator.connectionProvider();
1364         DatabaseCreator.createDatabase(cp);
1365         Connection con = cp.get();
1366         Database db = Database.from(con);
1367         Observable<Integer> count = db
1368         // get names
1369                 .select("select name from person")
1370                 // as string
1371                 .getAs(String.class)
1372                 // count names
1373                 .count()
1374                 // do something else
1375                 .lift(db
1376                 // get max score
1377                 .select("select max(score) from person")
1378                 // run the previous statement first
1379                         .dependsOnOperator()
1380                         // as integer
1381                         .getAs(Integer.class));
1382         assertIs(34, count);
1383         con.close();
1384     }
1385 
1386     @Test
1387     public void testNonTransactionalMultipleQueries() {
1388         // get a synchronous database
1389         Database db = DatabaseCreator.db();
1390         final Set<String> set = Collections.newSetFromMap(new HashMap<String, Boolean>());
1391         Observable<Integer> count = Observable.just(1, 2, 3, 4, 5)
1392         // select
1393                 .lift(db.select("select name from person where score >?")
1394                 // push parameters to this query
1395                         .parameterOperator()
1396                         // get name as string
1397                         .getAs(String.class))
1398                 // record thread name
1399                 .doOnNext(new Action1<String>() {
1400                     @Override
1401                     public void call(String name) {
1402                         set.add(Thread.currentThread().getName());
1403                     }
1404                 })
1405                 // count
1406                 .count();
1407         assertIs(5 * 3, count);
1408         System.out.println("threads=" + set);
1409         assertEquals(1, set.size());
1410     }
1411 
1412     @Test
1413     public void testTransaction() {
1414         Database db = db();
1415         Observable<Boolean> begin = db.beginTransaction();
1416         Observable<Integer> count = Observable
1417         // generate 1,2,3
1418                 .just(1, 2, 3)
1419                 // update score with that value
1420                 .lift(db.update("update person set score = ?")
1421                 // participates in a transaction
1422                         .dependsOn(begin)
1423                         // parameters are pushed to this update statement
1424                         .parameterOperator())
1425                 // commit transaction
1426                 .lift(db.commitOnCompleteOperator())
1427                 // count names with score 3
1428                 .lift(db.select("select count(name) from person where score=3")
1429                 // must commit first
1430                         .dependsOnOperator().getAs(Integer.class));
1431         assertIs(3, count);
1432     }
1433 
1434     @Test
1435     public void testTwoConnectionsOpenedAndClosedWhenTakeOneUsedWithSelectThatReturnsOneRow()
1436             throws InterruptedException {
1437         Action0 completed = new Action0() {
1438 
1439             @Override
1440             public void call() {
1441                 System.out.println("completed");
1442             }
1443         };
1444         CountDownConnectionProvider cp = new CountDownConnectionProvider(1, 1);
1445         Database db = new Database(cp);
1446         db.select("select count(*) from person").getAs(Long.class).doOnCompleted(completed).take(1)
1447                 .toBlocking().single();
1448         assertTrue(cp.getsLatch().await(6, TimeUnit.SECONDS));
1449         assertTrue(cp.closesLatch().await(6, TimeUnit.SECONDS));
1450     }
1451 
1452     @Test
1453     public void testAutoMapInterface() {
1454         // test dynamic proxying
1455         List<NameScore> list = db().select("select name, score from person order by name")
1456                 .autoMap(NameScore.class).toList().toBlocking().single();
1457         assertEquals(3, list.size());
1458         assertEquals("FRED", list.get(0).name());
1459         assertEquals(21, list.get(0).score());
1460         assertEquals("JOSEPH", list.get(1).name());
1461         assertEquals(34, list.get(1).score());
1462     }
1463 
1464     static interface NameScore {
1465 
1466         @Index(1)
1467         String name();
1468 
1469         @Column("score")
1470         int score();
1471     }
1472 
1473     @Test
1474     public void testAutoMapConvertsCamelCaseToUnderscoreColumnNames() {
1475         // test dynamic proxying
1476         List<Address> list = db().select("select address_id, full_address from address")
1477                 .autoMap(Address.class).toList().toBlocking().single();
1478         assertEquals(1, list.size());
1479         assertEquals(1, list.get(0).addressId());
1480         assertTrue(list.get(0).fullAddress().contains("Something"));
1481     }
1482 
1483     static interface Address {
1484 
1485         @Column
1486         int addressId();
1487 
1488         @Column
1489         String fullAddress();
1490     }
1491 
1492     @Test
1493     public void testAutoMapWithQueryAnnotation() {
1494         List<NameScore2> list = db().select().autoMap(NameScore2.class).toList().toBlocking()
1495                 .single();
1496         assertEquals(3, list.size());
1497         assertEquals("FRED", list.get(0).name());
1498         assertEquals(21, list.get(0).score());
1499         assertEquals("JOSEPH", list.get(1).name());
1500         assertEquals(34, list.get(1).score());
1501     }
1502 
1503     @Query("select name, score from person order by name")
1504     static interface NameScore2 {
1505 
1506         @Index(1)
1507         String name();
1508 
1509         @Column("score")
1510         int score();
1511     }
1512 
1513     @Test(expected = RuntimeException.class)
1514     public void testAutoMapThrowsExceptionIfMappedInterfaceColumnMethodHasParameters() {
1515         // test dynamic proxying
1516         db().select("select address_id, full_address from address").autoMap(Address2.class)
1517                 .toList().toBlocking().single();
1518     }
1519 
1520     static interface Address2 {
1521 
1522         @Column
1523         int addressId(String suburb);
1524 
1525         @Column
1526         String fullAddress();
1527     }
1528 
1529     @Test
1530     public void testCustomMapper() {
1531         String name = db().select("select name from person order by name")
1532                 .get(new ResultSetMapper<String>() {
1533                     @Override
1534                     public String call(ResultSet rs) throws SQLException {
1535                         return rs.getString(1);
1536                     }
1537                 }).first().toBlocking().single();
1538         assertEquals("FRED", name);
1539     }
1540 
1541     @Test
1542     public void testReturnGeneratedKeysForOneInsertedValue() {
1543         // h2 only returns the last generated key
1544         List<Integer> list = db()
1545         //
1546                 .update("insert into note(text) values(?)")
1547                 //
1548                 .parameters("something")
1549                 //
1550                 .returnGeneratedKeys()
1551                 //
1552                 .getAs(Integer.class)
1553                 //
1554                 .toList().toBlocking().single();
1555         assertEquals(Arrays.asList(1), list);
1556     }
1557 
1558     @Test
1559     public void testReturnGeneratedKeysForMultipleInsertedValuesInOneStatement() {
1560         // h2 only returns the last generated key
1561         List<Integer> list = db()
1562         //
1563                 .update("insert into note(text) values(?),(?)")
1564                 //
1565                 .parameters("something", "again")
1566                 //
1567                 .returnGeneratedKeys()
1568                 //
1569                 .getAs(Integer.class)
1570                 //
1571                 .toList().toBlocking().single();
1572         assertEquals(Arrays.asList(2), list);
1573     }
1574 
1575     @Test
1576     public void testReturnGeneratedKeysForMultipleCallsOfInsert() {
1577         // h2 only returns the last generated key
1578         List<Integer> list = db()
1579         //
1580                 .update("insert into note(text) values(?)")
1581                 //
1582                 .parameters("something", "again")
1583                 //
1584                 .returnGeneratedKeys()
1585                 //
1586                 .getAs(Integer.class)
1587                 //
1588                 .toList().toBlocking().single();
1589         assertEquals(Arrays.asList(1, 2), list);
1590     }
1591 
1592     @Test
1593     public void testNamedParameters() {
1594         String name = db()
1595         //
1596                 .select("select name from person where score >= :min and score <=:max")
1597                 //
1598                 .parameter("min", 24)
1599                 //
1600                 .parameter("max", 26)
1601                 //
1602                 .getAs(String.class).toBlocking().single();
1603         assertEquals("MARMADUKE", name);
1604     }
1605 
1606     @Test
1607     public void testNamedParametersWithMapParameter() {
1608         Map<String, Integer> map = new HashMap<String, Integer>();
1609         map.put("min", 24);
1610         map.put("max", 26);
1611         String name = db()
1612         //
1613                 .select("select name from person where score >= :min and score <=:max")
1614                 //
1615                 .parameters(map)
1616                 //
1617                 .getAs(String.class).toBlocking().single();
1618         assertEquals("MARMADUKE", name);
1619     }
1620 
1621     @Test
1622     public void testNamedParametersWithMapParameterInObservable() {
1623         Map<String, Integer> map = new HashMap<String, Integer>();
1624         map.put("min", 24);
1625         map.put("max", 26);
1626         String name = db()
1627         //
1628                 .select("select name from person where score >= :min and score <=:max")
1629                 //
1630                 .parameters(Observable.just(map))
1631                 //
1632                 .getAs(String.class).toBlocking().single();
1633         assertEquals("MARMADUKE", name);
1634     }
1635 
1636     @Test
1637     public void testNamedParametersWithUpdateStatement() {
1638         int count = db()
1639         //
1640                 .update("update person set score = :newScore where score >= :min and score <=:max")
1641                 //
1642                 .parameter("newScore", 25)
1643                 //
1644                 .parameter("min", 24)
1645                 //
1646                 .parameter("max", 26)
1647                 //
1648                 .count().toBlocking().single();
1649         assertEquals(1, count);
1650     }
1651 
1652     @Test(expected = RuntimeException.class)
1653     public void testNamedParametersOneMissingParameterShouldThrowException() {
1654         db().select("select name from person where name = :name and score = :score")
1655                 .parameter("name", "FRED").count().toBlocking().single();
1656     }
1657 
1658     @Test(expected = RuntimeException.class)
1659     public void testNamedParametersWithMapParameterNoNamesInSql() {
1660         Map<String, Integer> map = new HashMap<String, Integer>();
1661         map.put("min", 24);
1662         map.put("max", 26);
1663         db()
1664         //
1665         .select("select name from person where score >= ? and score <= ?")
1666         //
1667                 .parameters(Observable.just(map))
1668                 //
1669                 .getAs(String.class).toBlocking().single();
1670     }
1671 
1672     @Test
1673     public void testNoParameters() {
1674         int count = db().select("select name from person").count().toBlocking().single();
1675         assertEquals(3, count);
1676     }
1677 
1678     /********************************************************
1679      ** Utility classes
1680      ********************************************************/
1681 
1682     private static class CountDownConnectionProvider implements ConnectionProvider {
1683         private final ConnectionProvider cp;
1684         private final CountDownLatch closesLatch;
1685         private final CountDownLatch getsLatch;
1686 
1687         CountDownConnectionProvider(int expectedGets, int expectedCloses) {
1688             this.cp = connectionProvider();
1689             DatabaseCreator.createDatabase(cp.get());
1690             this.closesLatch = new CountDownLatch(expectedCloses);
1691             this.getsLatch = new CountDownLatch(expectedGets);
1692         }
1693 
1694         CountDownLatch closesLatch() {
1695             return closesLatch;
1696         }
1697 
1698         CountDownLatch getsLatch() {
1699             return getsLatch;
1700         }
1701 
1702         @Override
1703         public Connection get() {
1704             getsLatch.countDown();
1705             Connection inner = cp.get();
1706             return new CountingConnection(inner, closesLatch);
1707         }
1708 
1709         @Override
1710         public void close() {
1711             cp.close();
1712         }
1713     }
1714 
1715     static class PersonClob {
1716         private final String name;
1717         private final String document;
1718 
1719         public PersonClob(String name, String document) {
1720             this.name = name;
1721             this.document = document;
1722         }
1723 
1724         public String getName() {
1725             return name;
1726         }
1727 
1728         public String getDocument() {
1729             return document;
1730         }
1731     }
1732 
1733     static class PersonBlob {
1734         private final String name;
1735         private final byte[] document;
1736 
1737         public PersonBlob(String name, byte[] document) {
1738             this.name = name;
1739             this.document = document;
1740         }
1741 
1742         public String getName() {
1743             return name;
1744         }
1745 
1746         public byte[] getDocument() {
1747             return document;
1748         }
1749     }
1750 
1751     static class Person {
1752         private final String name;
1753         private final double score;
1754         private final Long dateOfBirthEpochMs;
1755         private final Long registered;
1756 
1757         Person(String name, double score, Long dateOfBirthEpochMs, Long registered) {
1758             this.name = name;
1759             this.score = score;
1760             this.dateOfBirthEpochMs = dateOfBirthEpochMs;
1761             this.registered = registered;
1762         }
1763 
1764         public String getName() {
1765             return name;
1766         }
1767 
1768         public double getScore() {
1769             return score;
1770         }
1771 
1772         public Long getDateOfBirth() {
1773             return dateOfBirthEpochMs;
1774         }
1775 
1776         public Long getRegistered() {
1777             return registered;
1778         }
1779 
1780         @Override
1781         public String toString() {
1782             StringBuilder builder = new StringBuilder();
1783             builder.append("Pair [name=");
1784             builder.append(name);
1785             builder.append(", score=");
1786             builder.append(score);
1787             builder.append("]");
1788             return builder.toString();
1789         }
1790     }
1791 
1792 }