1 package com.github.davidmoten.rx.jdbc;
2
3 import static com.github.davidmoten.rx.jdbc.Conditions.checkNotNull;
4 import static com.github.davidmoten.rx.jdbc.Queries.bufferedParameters;
5
6 import java.sql.ResultSet;
7 import java.util.List;
8
9 import rx.Observable;
10 import rx.Observable.Operator;
11 import rx.functions.Func1;
12
13 import com.github.davidmoten.rx.jdbc.NamedParameters.JdbcQuery;
14 import com.github.davidmoten.rx.jdbc.tuple.Tuple2;
15 import com.github.davidmoten.rx.jdbc.tuple.Tuple3;
16 import com.github.davidmoten.rx.jdbc.tuple.Tuple4;
17 import com.github.davidmoten.rx.jdbc.tuple.Tuple5;
18 import com.github.davidmoten.rx.jdbc.tuple.Tuple6;
19 import com.github.davidmoten.rx.jdbc.tuple.Tuple7;
20 import com.github.davidmoten.rx.jdbc.tuple.TupleN;
21 import com.github.davidmoten.rx.jdbc.tuple.Tuples;
22
23 /**
24 * Always emits an Observable<Integer> of size 1 containing the number of
25 * affected records.
26 *
27 * @param <T>
28 * type of returned observable (Integer for count, custom for
29 * returning generated keys)
30 */
31 final public class QueryUpdate<T> implements Query {
32
33 private final JdbcQuery jdbcQuery;
34 private final Observable<Parameter> parameters;
35 private final QueryContext context;
36 private final Observable<?> depends;
37 // nullable!
38 private final ResultSetMapper<? extends T> returnGeneratedKeysFunction;
39
40 /**
41 * Private constructor.
42 *
43 * @param sql
44 * @param parameters
45 * @param depends
46 * @param context
47 * @param returnGeneratedKeysFunction
48 * nullable!
49 */
50 private QueryUpdate(String sql, Observable<Parameter> parameters, Observable<?> depends,
51 QueryContext context, ResultSetMapper<? extends T> returnGeneratedKeysFunction) {
52 checkNotNull(sql);
53 checkNotNull(parameters);
54 checkNotNull(depends);
55 checkNotNull(context);
56 this.jdbcQuery = NamedParameters.parse(sql);
57 this.parameters = parameters;
58 this.depends = depends;
59 this.context = context;
60 this.returnGeneratedKeysFunction = returnGeneratedKeysFunction;
61 }
62
63 @Override
64 public String sql() {
65 return jdbcQuery.sql();
66 }
67
68 @Override
69 public Observable<Parameter> parameters() {
70 return parameters;
71 }
72
73 @Override
74 public QueryContext context() {
75 return context;
76 }
77
78 @Override
79 public String toString() {
80 return "QueryUpdate [sql=" + sql() + "]";
81 }
82
83 @Override
84 public Observable<?> depends() {
85 return depends;
86 }
87
88 @Override
89 public List<String> names() {
90 return jdbcQuery.names();
91 }
92
93 /**
94 * Returns the results of an update query. Should be an {@link Observable}
95 * of size 1 containing the number of records affected by the update (or
96 * insert) statement.
97 *
98 * @param query
99 * @return
100 */
101 @SuppressWarnings("unchecked")
102 public Observable<Integer> count() {
103 return (Observable<Integer>) QueryUpdate.get(this);
104 }
105
106 public ResultSetMapper<? extends T> returnGeneratedKeysFunction() {
107 return returnGeneratedKeysFunction;
108 }
109
110 boolean returnGeneratedKeys() {
111 return returnGeneratedKeysFunction != null;
112 }
113
114 static <T> Observable<T> get(QueryUpdate<T> queryUpdate) {
115 return bufferedParameters(queryUpdate)
116 // execute query for each set of parameters
117 .concatMap(queryUpdate.executeOnce());
118 }
119
120 /**
121 * Returns a {@link Func1} that itself returns the results of pushing
122 * parameters through an update query.
123 *
124 * @param query
125 * @return
126 */
127 private Func1<List<Parameter>, Observable<T>> executeOnce() {
128 return new Func1<List<Parameter>, Observable<T>>() {
129 @Override
130 public Observable<T> call(final List<Parameter> params) {
131 if (jdbcQuery.sql().equals(QueryUpdateOnSubscribe.BEGIN_TRANSACTION)) {
132 context.beginTransactionSubscribe();
133 }
134 Observable<T> result = executeOnce(params).subscribeOn(context.scheduler());
135 if (jdbcQuery.sql().equals(QueryUpdateOnSubscribe.COMMIT)
136 || jdbcQuery.sql().equals(QueryUpdateOnSubscribe.ROLLBACK))
137 context.endTransactionSubscribe();
138 return (Observable<T>) result;
139 }
140 };
141 }
142
143 /**
144 * Returns the results of an update query. Should return an
145 * {@link Observable} of size one containing the rows affected count.
146 *
147 * @param query
148 * @param parameters
149 * @return
150 */
151 private Observable<T> executeOnce(final List<Parameter> parameters) {
152 return (Observable<T>) QueryUpdateOnSubscribe.execute(this, parameters);
153 }
154
155 /**
156 * Builds a {@link QueryUpdate}.
157 */
158 final public static class Builder {
159
160 /**
161 * Standard query builder.
162 */
163 private final QueryBuilder builder;
164
165 /**
166 * Constructor.
167 *
168 * @param sql
169 * @param db
170 */
171 public Builder(String sql, Database db) {
172 this.builder = new QueryBuilder(sql, db);
173 }
174
175 /**
176 * Appends the given parameters to the parameter list for the query. If
177 * there are more parameters than required for one execution of the
178 * query then more than one execution of the query will occur.
179 *
180 * @param parameters
181 * @return this
182 */
183 public <T> Builder parameters(Observable<T> parameters) {
184 builder.parameters(parameters);
185 return this;
186 }
187
188 /**
189 * Appends the given parameter values to the parameter list for the
190 * query. If there are more parameters than required for one execution
191 * of the query then more than one execution of the query will occur.
192 *
193 * @param objects
194 * @return this
195 */
196 public Builder parameters(Object... objects) {
197 builder.parameters(objects);
198 return this;
199 }
200
201 /**
202 * Appends a parameter to the parameter list for the query. If there are
203 * more parameters than required for one execution of the query then
204 * more than one execution of the query will occur.
205 *
206 * @param value
207 * @return this
208 */
209 public Builder parameter(Object value) {
210 builder.parameter(value);
211 return this;
212 }
213
214 /**
215 * Sets a named parameter. If name is null throws a
216 * {@link NullPointerException}. If value is instance of Observable then
217 * throws an {@link IllegalArgumentException}.
218 *
219 * @param name
220 * the parameter name. Cannot be null.
221 * @param value
222 * the parameter value
223 */
224 public Builder parameter(String name, Object value) {
225 builder.parameter(name, value);
226 return this;
227 }
228
229 /**
230 * Appends a parameter to the parameter list for the query for a CLOB
231 * parameter and handles null appropriately. If there are more
232 * parameters than required for one execution of the query then more
233 * than one execution of the query will occur.
234 *
235 * @param value
236 * the string to insert in the CLOB column
237 * @return this
238 */
239 public Builder parameterClob(String value) {
240 builder.parameter(Database.toSentinelIfNull(value));
241 return this;
242 }
243
244 /**
245 * Appends a parameter to the parameter list for the query for a CLOB
246 * parameter and handles null appropriately. If there are more
247 * parameters than required for one execution of the query then more
248 * than one execution of the query will occur.
249 *
250 * @param value
251 * @return this
252 */
253 public Builder parameterBlob(byte[] bytes) {
254 builder.parameter(Database.toSentinelIfNull(bytes));
255 return this;
256 }
257
258 /**
259 * Appends a dependency to the dependencies that have to complete their
260 * emitting before the query is executed.
261 *
262 * @param dependency
263 * @return this
264 */
265 public Builder dependsOn(Observable<?> dependency) {
266 builder.dependsOn(dependency);
267 return this;
268 }
269
270 /**
271 * Appends a dependency on the result of the last transaction (
272 * <code>true</code> for commit or <code>false</code> for rollback) to
273 * the dependencies that have to complete their emitting before the
274 * query is executed.
275 *
276 * @return this
277 */
278 public Builder dependsOnLastTransaction() {
279 builder.dependsOnLastTransaction();
280 return this;
281 }
282
283 /**
284 * Returns a builder used to specify how to process the generated keys
285 * {@link ResultSet}. Not all jdbc drivers support this functionality
286 * and some have limitations in their support (h2 for instance only
287 * returns the last generated key when multiple inserts happen in the
288 * one statement).
289 *
290 * @return a builder used to specify how to process the generated keys
291 * ResultSet
292 */
293 public ReturnGeneratedKeysBuilder returnGeneratedKeys() {
294 return new ReturnGeneratedKeysBuilder(builder);
295 }
296
297 /**
298 * Returns an {@link Observable} with the count of rows affected by the
299 * update statement.
300 *
301 * @return
302 */
303 public Observable<Integer> count() {
304 return new QueryUpdate<Integer>(builder.sql(), builder.parameters(), builder.depends(),
305 builder.context(), null).count();
306 }
307
308 /**
309 * Returns an {@link Operator} to allow the query to be pushed
310 * parameters via the {@link Observable#lift(Operator)} method.
311 *
312 * @return operator that acts on parameters
313 */
314 public Operator<Integer, Object> parameterOperator() {
315 return new QueryUpdateOperator<Object>(this, OperatorType.PARAMETER);
316 }
317
318 /**
319 * Returns an {@link Operator} to allow the query to be pushed
320 * dependencies via the {@link Observable#lift(Operator)} method.
321 *
322 * @return operator that acts on dependencies
323 */
324 public Operator<Integer, Object> dependsOnOperator() {
325 return new QueryUpdateOperator<Object>(this, OperatorType.DEPENDENCY);
326 }
327
328 /**
329 * Returns an {@link Operator} to allow the query to be run once per
330 * parameter list in the source.
331 *
332 * @return operator
333 */
334 public Operator<Observable<Integer>, Observable<Object>> parameterListOperator() {
335 return new QueryUpdateOperatorFromObservable<Object>(this);
336 }
337
338 /**
339 * Clears the parameter inputs for the query.
340 *
341 * @return the current builder
342 */
343 public Builder clearParameters() {
344 builder.clearParameters();
345 return this;
346 }
347 }
348
349 public static class ReturnGeneratedKeysBuilder {
350
351 private final QueryBuilder builder;
352
353 public ReturnGeneratedKeysBuilder(QueryBuilder builder) {
354 this.builder = builder;
355 }
356
357 /**
358 * Transforms the results using the given function.
359 *
360 * @param function
361 * @return
362 */
363 public <T> Observable<T> get(ResultSetMapper<? extends T> function) {
364 return QueryUpdate.get(new QueryUpdate<T>(builder.sql(), builder.parameters(), builder
365 .depends(), builder.context(), function));
366 }
367
368 /**
369 * <p>
370 * Transforms each row of the {@link ResultSet} into an instance of
371 * <code>T</code> using <i>automapping</i> of the ResultSet columns into
372 * corresponding constructor parameters that are assignable. Beyond
373 * normal assignable criteria (for example Integer 123 is assignable to
374 * a Double) other conversions exist to facilitate the automapping:
375 * </p>
376 * <p>
377 * They are:
378 * <ul>
379 * <li>java.sql.Blob ➟ byte[]</li>
380 * <li>java.sql.Blob ➟ java.io.InputStream</li>
381 * <li>java.sql.Clob ➟ String</li>
382 * <li>java.sql.Clob ➟ java.io.Reader</li>
383 * <li>java.sql.Date ➟ java.util.Date</li>
384 * <li>java.sql.Date ➟ Long</li>
385 * <li>java.sql.Timestamp ➟ java.util.Date</li>
386 * <li>java.sql.Timestamp ➟ Long</li>
387 * <li>java.sql.Time ➟ java.util.Date</li>
388 * <li>java.sql.Time ➟ Long</li>
389 * <li>java.math.BigInteger ➟
390 * Short,Integer,Long,Float,Double,BigDecimal</li>
391 * <li>java.math.BigDecimal ➟
392 * Short,Integer,Long,Float,Double,BigInteger</li>
393 * </p>
394 *
395 * @param cls
396 * @return
397 */
398 public <T> Observable<T> autoMap(Class<T> cls) {
399 Util.setSqlFromQueryAnnotation(cls, builder);
400 return get(Util.autoMap(cls));
401 }
402
403 /**
404 * Automaps the first column of the ResultSet into the target class
405 * <code>cls</code>.
406 *
407 * @param cls
408 * @return
409 */
410 public <T> Observable<T> getAs(Class<T> cls) {
411 return get(Tuples.single(cls));
412 }
413
414 /**
415 * Automaps all the columns of the {@link ResultSet} into the target
416 * class <code>cls</code>. See {@link #autoMap(Class) autoMap()}.
417 *
418 * @param cls
419 * @return
420 */
421 public <T> Observable<TupleN<T>> getTupleN(Class<T> cls) {
422 return get(Tuples.tupleN(cls));
423 }
424
425 /**
426 * Automaps all the columns of the {@link ResultSet} into {@link Object}
427 * . See {@link #autoMap(Class) autoMap()}.
428 *
429 * @param cls
430 * @return
431 */
432 public Observable<TupleN<Object>> getTupleN() {
433 return get(Tuples.tupleN(Object.class));
434 }
435
436 /**
437 * Automaps the columns of the {@link ResultSet} into the specified
438 * classes. See {@link #autoMap(Class) autoMap()}.
439 *
440 * @param cls1
441 * @param cls2
442 * @return
443 */
444 public <T1, T2> Observable<Tuple2<T1, T2>> getAs(Class<T1> cls1, Class<T2> cls2) {
445 return get(Tuples.tuple(cls1, cls2));
446 }
447
448 /**
449 * Automaps the columns of the {@link ResultSet} into the specified
450 * classes. See {@link #autoMap(Class) autoMap()}.
451 *
452 * @param cls1
453 * @param cls2
454 * @param cls3
455 * @return
456 */
457 public <T1, T2, T3> Observable<Tuple3<T1, T2, T3>> getAs(Class<T1> cls1, Class<T2> cls2,
458 Class<T3> cls3) {
459 return get(Tuples.tuple(cls1, cls2, cls3));
460 }
461
462 /**
463 * Automaps the columns of the {@link ResultSet} into the specified
464 * classes. See {@link #autoMap(Class) autoMap()}.
465 *
466 * @param cls1
467 * @param cls2
468 * @param cls3
469 * @param cls4
470 * @return
471 */
472 public <T1, T2, T3, T4> Observable<Tuple4<T1, T2, T3, T4>> getAs(Class<T1> cls1,
473 Class<T2> cls2, Class<T3> cls3, Class<T4> cls4) {
474 return get(Tuples.tuple(cls1, cls2, cls3, cls4));
475 }
476
477 /**
478 * Automaps the columns of the {@link ResultSet} into the specified
479 * classes. See {@link #autoMap(Class) autoMap()}.
480 *
481 * @param cls1
482 * @param cls2
483 * @param cls3
484 * @param cls4
485 * @param cls5
486 * @return
487 */
488 public <T1, T2, T3, T4, T5> Observable<Tuple5<T1, T2, T3, T4, T5>> getAs(Class<T1> cls1,
489 Class<T2> cls2, Class<T3> cls3, Class<T4> cls4, Class<T5> cls5) {
490 return get(Tuples.tuple(cls1, cls2, cls3, cls4, cls5));
491 }
492
493 /**
494 * Automaps the columns of the {@link ResultSet} into the specified
495 * classes. See {@link #autoMap(Class) autoMap()}.
496 *
497 * @param cls1
498 * @param cls2
499 * @param cls3
500 * @param cls4
501 * @param cls5
502 * @param cls6
503 * @return
504 */
505 public <T1, T2, T3, T4, T5, T6> Observable<Tuple6<T1, T2, T3, T4, T5, T6>> getAs(
506 Class<T1> cls1, Class<T2> cls2, Class<T3> cls3, Class<T4> cls4, Class<T5> cls5,
507 Class<T6> cls6) {
508 return get(Tuples.tuple(cls1, cls2, cls3, cls4, cls5, cls6));
509 }
510
511 /**
512 * Automaps the columns of the {@link ResultSet} into the specified
513 * classes. See {@link #autoMap(Class) autoMap()}.
514 *
515 * @param cls1
516 * @param cls2
517 * @param cls3
518 * @param cls4
519 * @param cls5
520 * @param cls6
521 * @param cls7
522 * @return
523 */
524 public <T1, T2, T3, T4, T5, T6, T7> Observable<Tuple7<T1, T2, T3, T4, T5, T6, T7>> getAs(
525 Class<T1> cls1, Class<T2> cls2, Class<T3> cls3, Class<T4> cls4, Class<T5> cls5,
526 Class<T6> cls6, Class<T7> cls7) {
527 return get(Tuples.tuple(cls1, cls2, cls3, cls4, cls5, cls6, cls7));
528 }
529
530 public Observable<Integer> count() {
531 return get(Util.toOne()).count();
532 }
533
534 }
535
536 }