View Javadoc
1   package com.github.davidmoten.rx.jdbc;
2   
3   import static com.github.davidmoten.rx.jdbc.Conditions.checkNotNull;
4   import static com.github.davidmoten.rx.jdbc.Queries.bufferedParameters;
5   
6   import java.sql.ResultSet;
7   import java.util.List;
8   
9   import rx.Observable;
10  import rx.Observable.Operator;
11  import rx.functions.Func1;
12  
13  import com.github.davidmoten.rx.jdbc.NamedParameters.JdbcQuery;
14  import com.github.davidmoten.rx.jdbc.tuple.Tuple2;
15  import com.github.davidmoten.rx.jdbc.tuple.Tuple3;
16  import com.github.davidmoten.rx.jdbc.tuple.Tuple4;
17  import com.github.davidmoten.rx.jdbc.tuple.Tuple5;
18  import com.github.davidmoten.rx.jdbc.tuple.Tuple6;
19  import com.github.davidmoten.rx.jdbc.tuple.Tuple7;
20  import com.github.davidmoten.rx.jdbc.tuple.TupleN;
21  import com.github.davidmoten.rx.jdbc.tuple.Tuples;
22  
23  /**
24   * Always emits an Observable<Integer> of size 1 containing the number of
25   * affected records.
26   * 
27   * @param <T>
28   *            type of returned observable (Integer for count, custom for
29   *            returning generated keys)
30   */
31  final public class QueryUpdate<T> implements Query {
32  
33      private final JdbcQuery jdbcQuery;
34      private final Observable<Parameter> parameters;
35      private final QueryContext context;
36      private final Observable<?> depends;
37      // nullable!
38      private final ResultSetMapper<? extends T> returnGeneratedKeysFunction;
39  
40      /**
41       * Private constructor.
42       * 
43       * @param sql
44       * @param parameters
45       * @param depends
46       * @param context
47       * @param returnGeneratedKeysFunction
48       *            nullable!
49       */
50      private QueryUpdate(String sql, Observable<Parameter> parameters, Observable<?> depends,
51              QueryContext context, ResultSetMapper<? extends T> returnGeneratedKeysFunction) {
52          checkNotNull(sql);
53          checkNotNull(parameters);
54          checkNotNull(depends);
55          checkNotNull(context);
56          this.jdbcQuery = NamedParameters.parse(sql);
57          this.parameters = parameters;
58          this.depends = depends;
59          this.context = context;
60          this.returnGeneratedKeysFunction = returnGeneratedKeysFunction;
61      }
62  
63      @Override
64      public String sql() {
65          return jdbcQuery.sql();
66      }
67  
68      @Override
69      public Observable<Parameter> parameters() {
70          return parameters;
71      }
72  
73      @Override
74      public QueryContext context() {
75          return context;
76      }
77  
78      @Override
79      public String toString() {
80          return "QueryUpdate [sql=" + sql() + "]";
81      }
82  
83      @Override
84      public Observable<?> depends() {
85          return depends;
86      }
87  
88      @Override
89      public List<String> names() {
90          return jdbcQuery.names();
91      }
92  
93      /**
94       * Returns the results of an update query. Should be an {@link Observable}
95       * of size 1 containing the number of records affected by the update (or
96       * insert) statement.
97       * 
98       * @param query
99       * @return
100      */
101     @SuppressWarnings("unchecked")
102     public Observable<Integer> count() {
103         return (Observable<Integer>) QueryUpdate.get(this);
104     }
105 
106     public ResultSetMapper<? extends T> returnGeneratedKeysFunction() {
107         return returnGeneratedKeysFunction;
108     }
109 
110     boolean returnGeneratedKeys() {
111         return returnGeneratedKeysFunction != null;
112     }
113 
114     static <T> Observable<T> get(QueryUpdate<T> queryUpdate) {
115         return bufferedParameters(queryUpdate)
116         // execute query for each set of parameters
117                 .concatMap(queryUpdate.executeOnce());
118     }
119 
120     /**
121      * Returns a {@link Func1} that itself returns the results of pushing
122      * parameters through an update query.
123      * 
124      * @param query
125      * @return
126      */
127     private Func1<List<Parameter>, Observable<T>> executeOnce() {
128         return new Func1<List<Parameter>, Observable<T>>() {
129             @Override
130             public Observable<T> call(final List<Parameter> params) {
131                 if (jdbcQuery.sql().equals(QueryUpdateOnSubscribe.BEGIN_TRANSACTION)) {
132                     context.beginTransactionSubscribe();
133                 }
134                 Observable<T> result = executeOnce(params).subscribeOn(context.scheduler());
135                 if (jdbcQuery.sql().equals(QueryUpdateOnSubscribe.COMMIT)
136                         || jdbcQuery.sql().equals(QueryUpdateOnSubscribe.ROLLBACK))
137                     context.endTransactionSubscribe();
138                 return (Observable<T>) result;
139             }
140         };
141     }
142 
143     /**
144      * Returns the results of an update query. Should return an
145      * {@link Observable} of size one containing the rows affected count.
146      * 
147      * @param query
148      * @param parameters
149      * @return
150      */
151     private Observable<T> executeOnce(final List<Parameter> parameters) {
152         return (Observable<T>) QueryUpdateOnSubscribe.execute(this, parameters);
153     }
154 
155     /**
156      * Builds a {@link QueryUpdate}.
157      */
158     final public static class Builder {
159 
160         /**
161          * Standard query builder.
162          */
163         private final QueryBuilder builder;
164 
165         /**
166          * Constructor.
167          * 
168          * @param sql
169          * @param db
170          */
171         public Builder(String sql, Database db) {
172             this.builder = new QueryBuilder(sql, db);
173         }
174 
175         /**
176          * Appends the given parameters to the parameter list for the query. If
177          * there are more parameters than required for one execution of the
178          * query then more than one execution of the query will occur.
179          * 
180          * @param parameters
181          * @return this
182          */
183         public <T> Builder parameters(Observable<T> parameters) {
184             builder.parameters(parameters);
185             return this;
186         }
187 
188         /**
189          * Appends the given parameter values to the parameter list for the
190          * query. If there are more parameters than required for one execution
191          * of the query then more than one execution of the query will occur.
192          * 
193          * @param objects
194          * @return this
195          */
196         public Builder parameters(Object... objects) {
197             builder.parameters(objects);
198             return this;
199         }
200 
201         /**
202          * Appends a parameter to the parameter list for the query. If there are
203          * more parameters than required for one execution of the query then
204          * more than one execution of the query will occur.
205          * 
206          * @param value
207          * @return this
208          */
209         public Builder parameter(Object value) {
210             builder.parameter(value);
211             return this;
212         }
213 
214         /**
215          * Sets a named parameter. If name is null throws a
216          * {@link NullPointerException}. If value is instance of Observable then
217          * throws an {@link IllegalArgumentException}.
218          * 
219          * @param name
220          *            the parameter name. Cannot be null.
221          * @param value
222          *            the parameter value
223          */
224         public Builder parameter(String name, Object value) {
225             builder.parameter(name, value);
226             return this;
227         }
228 
229         /**
230          * Appends a parameter to the parameter list for the query for a CLOB
231          * parameter and handles null appropriately. If there are more
232          * parameters than required for one execution of the query then more
233          * than one execution of the query will occur.
234          * 
235          * @param value
236          *            the string to insert in the CLOB column
237          * @return this
238          */
239         public Builder parameterClob(String value) {
240             builder.parameter(Database.toSentinelIfNull(value));
241             return this;
242         }
243 
244         /**
245          * Appends a parameter to the parameter list for the query for a CLOB
246          * parameter and handles null appropriately. If there are more
247          * parameters than required for one execution of the query then more
248          * than one execution of the query will occur.
249          * 
250          * @param value
251          * @return this
252          */
253         public Builder parameterBlob(byte[] bytes) {
254             builder.parameter(Database.toSentinelIfNull(bytes));
255             return this;
256         }
257 
258         /**
259          * Appends a dependency to the dependencies that have to complete their
260          * emitting before the query is executed.
261          * 
262          * @param dependency
263          * @return this
264          */
265         public Builder dependsOn(Observable<?> dependency) {
266             builder.dependsOn(dependency);
267             return this;
268         }
269 
270         /**
271          * Appends a dependency on the result of the last transaction (
272          * <code>true</code> for commit or <code>false</code> for rollback) to
273          * the dependencies that have to complete their emitting before the
274          * query is executed.
275          * 
276          * @return this
277          */
278         public Builder dependsOnLastTransaction() {
279             builder.dependsOnLastTransaction();
280             return this;
281         }
282 
283         /**
284          * Returns a builder used to specify how to process the generated keys
285          * {@link ResultSet}. Not all jdbc drivers support this functionality
286          * and some have limitations in their support (h2 for instance only
287          * returns the last generated key when multiple inserts happen in the
288          * one statement).
289          * 
290          * @return a builder used to specify how to process the generated keys
291          *         ResultSet
292          */
293         public ReturnGeneratedKeysBuilder returnGeneratedKeys() {
294             return new ReturnGeneratedKeysBuilder(builder);
295         }
296 
297         /**
298          * Returns an {@link Observable} with the count of rows affected by the
299          * update statement.
300          * 
301          * @return
302          */
303         public Observable<Integer> count() {
304             return new QueryUpdate<Integer>(builder.sql(), builder.parameters(), builder.depends(),
305                     builder.context(), null).count();
306         }
307 
308         /**
309          * Returns an {@link Operator} to allow the query to be pushed
310          * parameters via the {@link Observable#lift(Operator)} method.
311          * 
312          * @return operator that acts on parameters
313          */
314         public Operator<Integer, Object> parameterOperator() {
315             return new QueryUpdateOperator<Object>(this, OperatorType.PARAMETER);
316         }
317 
318         /**
319          * Returns an {@link Operator} to allow the query to be pushed
320          * dependencies via the {@link Observable#lift(Operator)} method.
321          * 
322          * @return operator that acts on dependencies
323          */
324         public Operator<Integer, Object> dependsOnOperator() {
325             return new QueryUpdateOperator<Object>(this, OperatorType.DEPENDENCY);
326         }
327 
328         /**
329          * Returns an {@link Operator} to allow the query to be run once per
330          * parameter list in the source.
331          * 
332          * @return operator
333          */
334         public Operator<Observable<Integer>, Observable<Object>> parameterListOperator() {
335             return new QueryUpdateOperatorFromObservable<Object>(this);
336         }
337 
338         /**
339          * Clears the parameter inputs for the query.
340          * 
341          * @return the current builder
342          */
343         public Builder clearParameters() {
344             builder.clearParameters();
345             return this;
346         }
347     }
348 
349     public static class ReturnGeneratedKeysBuilder {
350 
351         private final QueryBuilder builder;
352 
353         public ReturnGeneratedKeysBuilder(QueryBuilder builder) {
354             this.builder = builder;
355         }
356 
357         /**
358          * Transforms the results using the given function.
359          *
360          * @param function
361          * @return
362          */
363         public <T> Observable<T> get(ResultSetMapper<? extends T> function) {
364             return QueryUpdate.get(new QueryUpdate<T>(builder.sql(), builder.parameters(), builder
365                     .depends(), builder.context(), function));
366         }
367 
368         /**
369          * <p>
370          * Transforms each row of the {@link ResultSet} into an instance of
371          * <code>T</code> using <i>automapping</i> of the ResultSet columns into
372          * corresponding constructor parameters that are assignable. Beyond
373          * normal assignable criteria (for example Integer 123 is assignable to
374          * a Double) other conversions exist to facilitate the automapping:
375          * </p>
376          * <p>
377          * They are:
378          * <ul>
379          * <li>java.sql.Blob &#10143; byte[]</li>
380          * <li>java.sql.Blob &#10143; java.io.InputStream</li>
381          * <li>java.sql.Clob &#10143; String</li>
382          * <li>java.sql.Clob &#10143; java.io.Reader</li>
383          * <li>java.sql.Date &#10143; java.util.Date</li>
384          * <li>java.sql.Date &#10143; Long</li>
385          * <li>java.sql.Timestamp &#10143; java.util.Date</li>
386          * <li>java.sql.Timestamp &#10143; Long</li>
387          * <li>java.sql.Time &#10143; java.util.Date</li>
388          * <li>java.sql.Time &#10143; Long</li>
389          * <li>java.math.BigInteger &#10143;
390          * Short,Integer,Long,Float,Double,BigDecimal</li>
391          * <li>java.math.BigDecimal &#10143;
392          * Short,Integer,Long,Float,Double,BigInteger</li>
393          * </p>
394          * 
395          * @param cls
396          * @return
397          */
398         public <T> Observable<T> autoMap(Class<T> cls) {
399             Util.setSqlFromQueryAnnotation(cls, builder);
400             return get(Util.autoMap(cls));
401         }
402 
403         /**
404          * Automaps the first column of the ResultSet into the target class
405          * <code>cls</code>.
406          * 
407          * @param cls
408          * @return
409          */
410         public <T> Observable<T> getAs(Class<T> cls) {
411             return get(Tuples.single(cls));
412         }
413 
414         /**
415          * Automaps all the columns of the {@link ResultSet} into the target
416          * class <code>cls</code>. See {@link #autoMap(Class) autoMap()}.
417          * 
418          * @param cls
419          * @return
420          */
421         public <T> Observable<TupleN<T>> getTupleN(Class<T> cls) {
422             return get(Tuples.tupleN(cls));
423         }
424 
425         /**
426          * Automaps all the columns of the {@link ResultSet} into {@link Object}
427          * . See {@link #autoMap(Class) autoMap()}.
428          * 
429          * @param cls
430          * @return
431          */
432         public Observable<TupleN<Object>> getTupleN() {
433             return get(Tuples.tupleN(Object.class));
434         }
435 
436         /**
437          * Automaps the columns of the {@link ResultSet} into the specified
438          * classes. See {@link #autoMap(Class) autoMap()}.
439          * 
440          * @param cls1
441          * @param cls2
442          * @return
443          */
444         public <T1, T2> Observable<Tuple2<T1, T2>> getAs(Class<T1> cls1, Class<T2> cls2) {
445             return get(Tuples.tuple(cls1, cls2));
446         }
447 
448         /**
449          * Automaps the columns of the {@link ResultSet} into the specified
450          * classes. See {@link #autoMap(Class) autoMap()}.
451          * 
452          * @param cls1
453          * @param cls2
454          * @param cls3
455          * @return
456          */
457         public <T1, T2, T3> Observable<Tuple3<T1, T2, T3>> getAs(Class<T1> cls1, Class<T2> cls2,
458                 Class<T3> cls3) {
459             return get(Tuples.tuple(cls1, cls2, cls3));
460         }
461 
462         /**
463          * Automaps the columns of the {@link ResultSet} into the specified
464          * classes. See {@link #autoMap(Class) autoMap()}.
465          * 
466          * @param cls1
467          * @param cls2
468          * @param cls3
469          * @param cls4
470          * @return
471          */
472         public <T1, T2, T3, T4> Observable<Tuple4<T1, T2, T3, T4>> getAs(Class<T1> cls1,
473                 Class<T2> cls2, Class<T3> cls3, Class<T4> cls4) {
474             return get(Tuples.tuple(cls1, cls2, cls3, cls4));
475         }
476 
477         /**
478          * Automaps the columns of the {@link ResultSet} into the specified
479          * classes. See {@link #autoMap(Class) autoMap()}.
480          * 
481          * @param cls1
482          * @param cls2
483          * @param cls3
484          * @param cls4
485          * @param cls5
486          * @return
487          */
488         public <T1, T2, T3, T4, T5> Observable<Tuple5<T1, T2, T3, T4, T5>> getAs(Class<T1> cls1,
489                 Class<T2> cls2, Class<T3> cls3, Class<T4> cls4, Class<T5> cls5) {
490             return get(Tuples.tuple(cls1, cls2, cls3, cls4, cls5));
491         }
492 
493         /**
494          * Automaps the columns of the {@link ResultSet} into the specified
495          * classes. See {@link #autoMap(Class) autoMap()}.
496          * 
497          * @param cls1
498          * @param cls2
499          * @param cls3
500          * @param cls4
501          * @param cls5
502          * @param cls6
503          * @return
504          */
505         public <T1, T2, T3, T4, T5, T6> Observable<Tuple6<T1, T2, T3, T4, T5, T6>> getAs(
506                 Class<T1> cls1, Class<T2> cls2, Class<T3> cls3, Class<T4> cls4, Class<T5> cls5,
507                 Class<T6> cls6) {
508             return get(Tuples.tuple(cls1, cls2, cls3, cls4, cls5, cls6));
509         }
510 
511         /**
512          * Automaps the columns of the {@link ResultSet} into the specified
513          * classes. See {@link #autoMap(Class) autoMap()}.
514          * 
515          * @param cls1
516          * @param cls2
517          * @param cls3
518          * @param cls4
519          * @param cls5
520          * @param cls6
521          * @param cls7
522          * @return
523          */
524         public <T1, T2, T3, T4, T5, T6, T7> Observable<Tuple7<T1, T2, T3, T4, T5, T6, T7>> getAs(
525                 Class<T1> cls1, Class<T2> cls2, Class<T3> cls3, Class<T4> cls4, Class<T5> cls5,
526                 Class<T6> cls6, Class<T7> cls7) {
527             return get(Tuples.tuple(cls1, cls2, cls3, cls4, cls5, cls6, cls7));
528         }
529 
530         public Observable<Integer> count() {
531             return get(Util.toOne()).count();
532         }
533 
534     }
535 
536 }