1 package org.davidmoten.rx.jdbc;
2
3 import java.sql.Connection;
4 import java.sql.ResultSet;
5 import java.util.ArrayList;
6 import java.util.List;
7 import java.util.concurrent.atomic.AtomicReference;
8
9 import org.davidmoten.rx.jdbc.callable.CallableResultSet1;
10 import org.davidmoten.rx.jdbc.callable.CallableResultSet2;
11 import org.davidmoten.rx.jdbc.callable.CallableResultSet3;
12 import org.davidmoten.rx.jdbc.callable.CallableResultSet4;
13 import org.davidmoten.rx.jdbc.callable.CallableResultSetN;
14 import org.davidmoten.rx.jdbc.callable.internal.In;
15 import org.davidmoten.rx.jdbc.callable.internal.InOut;
16 import org.davidmoten.rx.jdbc.callable.internal.Out;
17 import org.davidmoten.rx.jdbc.callable.internal.TxGetter1;
18 import org.davidmoten.rx.jdbc.callable.internal.TxGetter2;
19 import org.davidmoten.rx.jdbc.callable.internal.TxGetter3;
20 import org.davidmoten.rx.jdbc.callable.internal.TxGetter4;
21 import org.davidmoten.rx.jdbc.callable.internal.TxGetterN;
22 import org.davidmoten.rx.jdbc.tuple.Tuple2;
23 import org.davidmoten.rx.jdbc.tuple.Tuple3;
24 import org.davidmoten.rx.jdbc.tuple.Tuple4;
25 import org.davidmoten.rx.jdbc.tuple.TupleN;
26
27 import com.github.davidmoten.guavamini.Lists;
28 import com.github.davidmoten.guavamini.Preconditions;
29
30 import io.reactivex.Flowable;
31 import io.reactivex.Notification;
32 import io.reactivex.Single;
33 import io.reactivex.functions.Function;
34
35 public final class TransactedCallableBuilder implements TxGetter1 {
36
37 private CallableBuilder b;
38
39 public TransactedCallableBuilder(CallableBuilder b) {
40 this.b = b;
41 }
42
43 public Flowable<List<Object>> parameterGroups() {
44 return b.parameterGroups();
45 }
46
47 public TransactedCallableBuilder in() {
48 b.params.add(In.IN);
49 return this;
50 }
51
52 public Single<TxWithoutValue> input(Flowable<?> f) {
53 Preconditions.checkArgument(b.inStream == null, "you can only specify in flowable once, current=" + b.inStream);
54 b.inStream = f;
55 return build();
56 }
57
58 public Single<TxWithoutValue> once() {
59 return input(1);
60 }
61
62 public Single<TxWithoutValue> input(Object... objects) {
63 return input(Flowable.fromArray(objects));
64 }
65
66 public <T> CallableBuilder1<T> inOut(Type type, Class<T> cls) {
67 b.params.add(new InOut(type, cls));
68 return new CallableBuilder1<T>(b, cls);
69 }
70
71 public <T> CallableBuilder1<T> out(Type type, Class<T> cls) {
72 b.params.add(new Out(type, cls));
73 return new CallableBuilder1<T>(b, cls);
74 }
75
76 @Override
77 public <T> CallableResultSets1Builder<T> get(Function<? super ResultSet, ? extends T> function) {
78 return new CallableResultSets1Builder<T>(b, function);
79 }
80
81 public <T> CallableResultSets1Builder<T> autoMap(Class<T> cls) {
82 return get(Util.autoMap(cls));
83 }
84
85 @SuppressWarnings("unchecked")
86 private Single<TxWithoutValue> build() {
87 return Single.defer(() -> {
88 AtomicReference<Connection> con = new AtomicReference<Connection>();
89
90 Single<Connection> transactedConnection = b.connection
91 .map(c -> Util.toTransactedConnection(con, c));
92 return Call
93 .createWithZeroOutParameters(transactedConnection, b.sql, parameterGroups(), b.params)
94 .materialize()
95 .filter(x -> !x.isOnNext())
96 .<TxWithoutValue>flatMap(n -> Tx.toTx(n, con.get(), b.db))
97 .doOnNext(tx -> {
98 if (tx.isComplete()) {
99 ((TxImpl<Object>) tx).connection().commit();
100 }
101 })
102 .lastOrError();
103 });
104 }
105
106 public static final class CallableBuilder1<T1> implements TxGetter1 {
107
108 private final CallableBuilder b;
109 private final Class<T1> cls;
110
111 public CallableBuilder1(CallableBuilder b, Class<T1> cls) {
112 this.b = b;
113 this.cls = cls;
114 }
115
116 public CallableBuilder1<T1> in() {
117 b.in();
118 return this;
119 }
120
121 public <T2> CallableBuilder2<T1, T2> out(Type type, Class<T2> cls2) {
122 b.out(type, cls2);
123 return new CallableBuilder2<T1, T2>(b, cls, cls2);
124 }
125
126 public <T2> CallableBuilder2<T1, T2> inOut(Type type, Class<T2> cls2) {
127 b.inOut(type, cls2);
128 return new CallableBuilder2<T1, T2>(b, cls, cls2);
129 }
130
131 public Flowable<Tx<T1>> input(Flowable<?> f) {
132 b.input(f);
133 return build();
134 }
135
136 public Flowable<Tx<T1>> input(Object... objects) {
137 return input(Flowable.fromArray(objects));
138 }
139
140 @Override
141 public <T> CallableResultSets1Builder<T> get(Function<? super ResultSet, ? extends T> function) {
142 return new CallableResultSets1Builder<T>(b, function);
143 }
144
145 public <T> CallableResultSets1Builder<T> autoMap(Class<T> cls) {
146 return get(Util.autoMap(cls));
147 }
148
149 private Flowable<Tx<T1>> build() {
150 return inTransaction(b, con -> Call
151 .createWithOneOutParameter(con, b.sql, b.parameterGroups(), b.params, cls));
152 }
153 }
154
155 public static final class CallableBuilder2<T1, T2> implements TxGetter1 {
156
157 private final CallableBuilder b;
158 private final Class<T1> cls1;
159 private final Class<T2> cls2;
160
161 public CallableBuilder2(CallableBuilder b, Class<T1> cls1, Class<T2> cls2) {
162 this.b = b;
163 this.cls1 = cls1;
164 this.cls2 = cls2;
165 }
166
167 public <T3> CallableBuilder3<T1, T2, T3> out(Type type, Class<T3> cls3) {
168 b.out(type, cls3);
169 return new CallableBuilder3<T1, T2, T3>(b, cls1, cls2, cls3);
170 }
171
172 public Flowable<Tx<Tuple2<T1, T2>>> input(Flowable<?> f) {
173 b.input(f);
174 return build();
175 }
176
177 public CallableBuilder2<T1, T2> in() {
178 b.in();
179 return this;
180 }
181
182 public Flowable<Tx<Tuple2<T1, T2>>> input(Object... objects) {
183 return input(Flowable.fromArray(objects));
184 }
185
186 public <T3> CallableBuilder3<T1, T2, T3> inOut(Type type, Class<T3> cls3) {
187 b.inOut(type, cls3);
188 return new CallableBuilder3<T1, T2, T3>(b, cls1, cls2, cls3);
189 }
190
191 @Override
192 public <T> CallableResultSets1Builder<T> get(Function<? super ResultSet, ? extends T> function) {
193 return new CallableResultSets1Builder<T>(b, function);
194 }
195
196 public <T> CallableResultSets1Builder<T> autoMap(Class<T> cls) {
197 return get(Util.autoMap(cls));
198 }
199
200 private Flowable<Tx<Tuple2<T1, T2>>> build() {
201 return inTransaction(b,
202 con -> Call.createWithTwoOutParameters(con, b.sql, b.parameterGroups(), b.params, cls1, cls2));
203 }
204 }
205
206 public static final class CallableBuilder3<T1, T2, T3> implements TxGetter1 {
207
208 private final CallableBuilder b;
209 private final Class<T1> cls1;
210 private final Class<T2> cls2;
211 private final Class<T3> cls3;
212
213 public CallableBuilder3(CallableBuilder b, Class<T1> cls1, Class<T2> cls2, Class<T3> cls3) {
214 this.b = b;
215 this.cls1 = cls1;
216 this.cls2 = cls2;
217 this.cls3 = cls3;
218 }
219
220 public <T4> CallableBuilder4<T1, T2, T3, T4> out(Type type, Class<T4> cls4) {
221 b.out(type, cls4);
222 return new CallableBuilder4<T1, T2, T3, T4>(b, cls1, cls2, cls3, cls4);
223 }
224
225 public Flowable<Tx<Tuple3<T1, T2, T3>>> input(Flowable<?> f) {
226 b.input(f);
227 return build();
228 }
229
230 public CallableBuilder3<T1, T2, T3> in() {
231 b.in();
232 return this;
233 }
234
235 public Flowable<Tx<Tuple3<T1, T2, T3>>> input(Object... objects) {
236 return input(Flowable.fromArray(objects));
237 }
238
239 public <T4> CallableBuilder4<T1, T2, T3, T4> inOut(Type type, Class<T4> cls4) {
240 b.inOut(type, cls4);
241 return new CallableBuilder4<T1, T2, T3, T4>(b, cls1, cls2, cls3, cls4);
242 }
243
244 @Override
245 public <T> CallableResultSets1Builder<T> get(Function<? super ResultSet, ? extends T> function) {
246 return new CallableResultSets1Builder<T>(b, function);
247 }
248
249 public <T> CallableResultSets1Builder<T> autoMap(Class<T> cls) {
250 return get(Util.autoMap(cls));
251 }
252
253 private Flowable<Tx<Tuple3<T1, T2, T3>>> build() {
254 return inTransaction(b, con -> Call.createWithThreeOutParameters(con, b.sql, b.parameterGroups(), b.params,
255 cls1, cls2, cls3));
256
257 }
258 }
259
260 public static final class CallableBuilder4<T1, T2, T3, T4> implements TxGetter1 {
261
262 private final CallableBuilder b;
263 private final Class<T1> cls1;
264 private final Class<T2> cls2;
265 private final Class<T3> cls3;
266 private final Class<T4> cls4;
267
268 public CallableBuilder4(CallableBuilder b, Class<T1> cls1, Class<T2> cls2, Class<T3> cls3, Class<T4> cls4) {
269 this.b = b;
270 this.cls1 = cls1;
271 this.cls2 = cls2;
272 this.cls3 = cls3;
273 this.cls4 = cls4;
274 }
275
276 public Flowable<Tx<Tuple4<T1, T2, T3, T4>>> input(Flowable<?> f) {
277 b.input(f);
278 return build();
279 }
280
281 public CallableBuilder4<T1, T2, T3, T4> in() {
282 b.in();
283 return this;
284 }
285
286 public Flowable<Tx<Tuple4<T1, T2, T3, T4>>> input(Object... objects) {
287 return input(Flowable.fromArray(objects));
288 }
289
290 public CallableBuilderN inOut(Type type, Class<T3> cls5) {
291 b.inOut(type, cls5);
292 return new CallableBuilderN(b, Lists.newArrayList(cls1, cls2, cls3, cls4, cls5));
293 }
294
295 public CallableBuilderN out(Type type, Class<?> cls5) {
296 b.out(type, cls5);
297 return new CallableBuilderN(b, Lists.newArrayList(cls1, cls2, cls3, cls4, cls5));
298 }
299
300 @Override
301 public <T> CallableResultSets1Builder<T> get(Function<? super ResultSet, ? extends T> function) {
302 return new CallableResultSets1Builder<T>(b, function);
303 }
304
305 public <T> CallableResultSets1Builder<T> autoMap(Class<T> cls) {
306 return get(Util.autoMap(cls));
307 }
308
309 private Flowable<Tx<Tuple4<T1, T2, T3, T4>>> build() {
310 return inTransaction(b, con -> Call.createWithFourOutParameters(con, b.sql, b.parameterGroups(), b.params,
311 cls1, cls2, cls3, cls4));
312 }
313 }
314
315 public static final class CallableBuilderN implements TxGetter1 {
316
317 private final CallableBuilder b;
318 private final List<Class<?>> outClasses;
319
320 public CallableBuilderN(CallableBuilder b, List<Class<?>> outClasses) {
321 this.b = b;
322 this.outClasses = outClasses;
323 }
324
325 public Flowable<Tx<TupleN<Object>>> input(Flowable<?> f) {
326 b.input(f);
327 return build();
328 }
329
330 public CallableBuilderN in() {
331 b.in();
332 return this;
333 }
334
335 public Flowable<Tx<TupleN<Object>>> input(Object... objects) {
336 return input(Flowable.fromArray(objects));
337 }
338
339 public CallableBuilderN out(Type type, Class<?> cls) {
340 b.out(type, cls);
341 return new CallableBuilderN(b, createList(outClasses, cls));
342 }
343
344 @Override
345 public <T> CallableResultSets1Builder<T> get(Function<? super ResultSet, ? extends T> function) {
346 return new CallableResultSets1Builder<T>(b, function);
347 }
348
349 public <T> CallableResultSets1Builder<T> autoMap(Class<T> cls) {
350 return get(Util.autoMap(cls));
351 }
352
353 private Flowable<Tx<TupleN<Object>>> build() {
354 return inTransaction(b,
355 con -> Call.createWithNParameters(con, b.sql, b.parameterGroups(), b.params, outClasses));
356 }
357 }
358
359 public static final class CallableResultSets1Builder<T1> implements TxGetter2<T1> {
360
361 private final CallableBuilder b;
362 private final Function<? super ResultSet, ? extends T1> f1;
363
364 CallableResultSets1Builder(CallableBuilder b, Function<? super ResultSet, ? extends T1> function) {
365 this.b = b;
366 this.f1 = function;
367 }
368
369 public CallableResultSets1Builder<T1> out(Type type, Class<?> cls5) {
370 b.out(type, cls5);
371 return this;
372 }
373
374 public <T2> CallableResultSets2Builder<T1, T2> autoMap(Class<T2> cls) {
375 return get(Util.autoMap(cls));
376 }
377
378 public <T2> CallableResultSets2Builder<T1, T2> get(Function<? super ResultSet, ? extends T2> f2) {
379 return new CallableResultSets2Builder<T1, T2>(b, f1, f2);
380 }
381
382 public Flowable<Tx<CallableResultSet1<T1>>> input(Flowable<?> f) {
383 b.input(f);
384 return build();
385 }
386
387 public CallableResultSets1Builder<T1> in() {
388 b.in();
389 return this;
390 }
391
392 public Flowable<Tx<CallableResultSet1<T1>>> input(Object... objects) {
393 return input(Flowable.fromArray(objects));
394 }
395
396 public CallableResultSets1Builder<T1> inOut(Type type, Class<?> cls) {
397 b.inOut(type, cls);
398 return this;
399 }
400
401 private Flowable<Tx<CallableResultSet1<T1>>> build() {
402 return inTransaction(b,
403 con -> Call.createWithOneResultSet(con, b.sql, b.parameterGroups(), b.params, f1, 0));
404 }
405
406 }
407
408 public static final class CallableResultSets2Builder<T1, T2> implements TxGetter3<T1, T2> {
409
410 private final CallableBuilder b;
411 private final Function<? super ResultSet, ? extends T1> f1;
412 private final Function<? super ResultSet, ? extends T2> f2;
413
414 CallableResultSets2Builder(CallableBuilder b, Function<? super ResultSet, ? extends T1> f1,
415 Function<? super ResultSet, ? extends T2> f2) {
416 this.b = b;
417 this.f1 = f1;
418 this.f2 = f2;
419 }
420
421 public CallableResultSets2Builder<T1, T2> out(Type type, Class<?> cls5) {
422 b.out(type, cls5);
423 return this;
424 }
425
426 public Flowable<Tx<CallableResultSet2<T1, T2>>> input(Flowable<?> f) {
427 b.input(f);
428 return build();
429 }
430
431 public CallableResultSets2Builder<T1, T2> in() {
432 b.in();
433 return this;
434 }
435
436 public Flowable<Tx<CallableResultSet2<T1, T2>>> input(Object... objects) {
437 return input(Flowable.fromArray(objects));
438 }
439
440 public CallableResultSets2Builder<T1, T2> inOut(Type type, Class<?> cls) {
441 b.inOut(type, cls);
442 return this;
443 }
444
445 public <T3> CallableResultSets3Builder<T1, T2, T3> autoMap(Class<T3> cls) {
446 return get(Util.autoMap(cls));
447 }
448
449 public <T3> CallableResultSets3Builder<T1, T2, T3> get(Function<? super ResultSet, ? extends T3> f3) {
450 return new CallableResultSets3Builder<T1, T2, T3>(b, f1, f2, f3);
451 }
452
453 private Flowable<Tx<CallableResultSet2<T1, T2>>> build() {
454 return inTransaction(b,
455 con -> Call.createWithTwoResultSets(con, b.sql, b.parameterGroups(), b.params, f1, f2, 0));
456 }
457 }
458
459 public static final class CallableResultSets3Builder<T1, T2, T3> implements TxGetter4<T1, T2, T3> {
460
461 private final CallableBuilder b;
462 private final Function<? super ResultSet, ? extends T1> f1;
463 private final Function<? super ResultSet, ? extends T2> f2;
464 private final Function<? super ResultSet, ? extends T3> f3;
465
466 CallableResultSets3Builder(CallableBuilder b, Function<? super ResultSet, ? extends T1> f1,
467 Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3) {
468 this.b = b;
469 this.f1 = f1;
470 this.f2 = f2;
471 this.f3 = f3;
472 }
473
474 public CallableResultSets3Builder<T1, T2, T3> out(Type type, Class<?> cls5) {
475 b.out(type, cls5);
476 return this;
477 }
478
479 public CallableResultSets3Builder<T1, T2, T3> in() {
480 b.in();
481 return this;
482 }
483
484 public Flowable<Tx<CallableResultSet3<T1, T2, T3>>> input(Flowable<?> f) {
485 b.input(f);
486 return build();
487 }
488
489 public Flowable<Tx<CallableResultSet3<T1, T2, T3>>> input(Object... objects) {
490 return input(Flowable.fromArray(objects));
491 }
492
493 public CallableResultSets3Builder<T1, T2, T3> inOut(Type type, Class<?> cls) {
494 b.inOut(type, cls);
495 return this;
496 }
497
498 public <T4> CallableResultSets4Builder<T1, T2, T3, T4> autoMap(Class<T4> cls) {
499 return get(Util.autoMap(cls));
500 }
501
502 public <T4> CallableResultSets4Builder<T1, T2, T3, T4> get(Function<? super ResultSet, ? extends T4> f4) {
503 return new CallableResultSets4Builder<T1, T2, T3, T4>(b, f1, f2, f3, f4);
504 }
505
506 private Flowable<Tx<CallableResultSet3<T1, T2, T3>>> build() {
507 return inTransaction(b, con -> Call.createWithThreeResultSets(con, b.sql, b.parameterGroups(),
508 b.params, f1, f2, f3, 0));
509 }
510 }
511
512 public static final class CallableResultSets4Builder<T1, T2, T3, T4> implements TxGetterN {
513
514 private final CallableBuilder b;
515 private final Function<? super ResultSet, ? extends T1> f1;
516 private final Function<? super ResultSet, ? extends T2> f2;
517 private final Function<? super ResultSet, ? extends T3> f3;
518 private final Function<? super ResultSet, ? extends T4> f4;
519
520 CallableResultSets4Builder(CallableBuilder b, Function<? super ResultSet, ? extends T1> f1,
521 Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3,
522 Function<? super ResultSet, ? extends T4> f4) {
523 this.b = b;
524 this.f1 = f1;
525 this.f2 = f2;
526 this.f3 = f3;
527 this.f4 = f4;
528 }
529
530 public CallableResultSets4Builder<T1, T2, T3, T4> out(Type type, Class<?> cls5) {
531 b.out(type, cls5);
532 return this;
533 }
534
535 public CallableResultSets4Builder<T1, T2, T3, T4> in() {
536 b.in();
537 return this;
538 }
539
540 public Flowable<Tx<CallableResultSet4<T1, T2, T3, T4>>> input(Flowable<?> f) {
541 b.input(f);
542 return build();
543 }
544
545 public Flowable<Tx<CallableResultSet4<T1, T2, T3, T4>>> input(Object... objects) {
546 return input(Flowable.fromArray(objects));
547 }
548
549 public CallableResultSets4Builder<T1, T2, T3, T4> inOut(Type type, Class<?> cls) {
550 b.inOut(type, cls);
551 return this;
552 }
553
554 public <T> CallableResultSetsNBuilder autoMap(Class<T> cls) {
555 return get(Util.autoMap(cls));
556 }
557
558 @SuppressWarnings("unchecked")
559 public CallableResultSetsNBuilder get(Function<? super ResultSet, ?> f5) {
560 return new CallableResultSetsNBuilder(b, Lists.newArrayList(f1, f2, f3, f4, f5));
561 }
562
563 public Flowable<Tx<CallableResultSet4<T1, T2, T3, T4>>> build() {
564 return inTransaction(b,
565 con -> Call.createWithFourResultSets(con, b.sql, b.parameterGroups(), b.params, f1, f2, f3, f4, 0));
566 }
567 }
568
569 public static final class CallableResultSetsNBuilder {
570
571 private final CallableBuilder b;
572 private final List<Function<? super ResultSet, ?>> functions;
573
574 CallableResultSetsNBuilder(CallableBuilder b, List<Function<? super ResultSet, ?>> functions) {
575 this.b = b;
576 this.functions = functions;
577 }
578
579 public CallableResultSetsNBuilder in() {
580 b.in();
581 return this;
582 }
583
584 public Flowable<Tx<CallableResultSetN>> input(Flowable<?> f) {
585 b.input(f);
586 return build();
587 }
588
589 public Flowable<Tx<CallableResultSetN>> input(Object... objects) {
590 return input(Flowable.fromArray(objects));
591 }
592
593 public CallableResultSetsNBuilder inOut(Type type, Class<?> cls) {
594 b.inOut(type, cls);
595 return this;
596 }
597
598 public <T> CallableResultSetsNBuilder autoMap(Class<T> cls) {
599 return get(Util.autoMap(cls));
600 }
601
602 public CallableResultSetsNBuilder get(Function<? super ResultSet, ?> f) {
603 functions.add(f);
604 return this;
605 }
606
607 private Flowable<Tx<CallableResultSetN>> build() {
608 return inTransaction(b,
609 con -> Call.createWithNResultSets(con, b.sql, b.parameterGroups(), b.params, functions, 0)
610 );
611 }
612 }
613
614 private static <T> List<T> createList(List<T> list, T t) {
615 ArrayList<T> r = new ArrayList<>(list);
616 r.add(t);
617 return r;
618 }
619
620 private static <T> Flowable<Tx<T>> inTransaction(CallableBuilder b,
621 Function<Single<Connection>, Flowable<Notification<T>>> f) {
622 return Flowable.defer(() -> {
623 AtomicReference<Connection> con = new AtomicReference<Connection>();
624
625 Single<Connection> transactedConnection = b.connection
626 .map(c -> Util.toTransactedConnection(con, c));
627 return f.apply(transactedConnection)
628 .<Tx<T>>flatMap(n -> Tx.toTx(n, con.get(), b.db))
629 .doOnNext(tx -> {
630 if (tx.isComplete()) {
631 ((TxImpl<T>) tx).connection().commit();
632 }
633 });
634 });
635 }
636
637 }