View Javadoc
1   package org.davidmoten.rx.jdbc;
2   
3   import java.sql.Connection;
4   import java.sql.ResultSet;
5   import java.util.ArrayList;
6   import java.util.List;
7   import java.util.concurrent.atomic.AtomicReference;
8   
9   import org.davidmoten.rx.jdbc.callable.CallableResultSet1;
10  import org.davidmoten.rx.jdbc.callable.CallableResultSet2;
11  import org.davidmoten.rx.jdbc.callable.CallableResultSet3;
12  import org.davidmoten.rx.jdbc.callable.CallableResultSet4;
13  import org.davidmoten.rx.jdbc.callable.CallableResultSetN;
14  import org.davidmoten.rx.jdbc.callable.internal.In;
15  import org.davidmoten.rx.jdbc.callable.internal.InOut;
16  import org.davidmoten.rx.jdbc.callable.internal.Out;
17  import org.davidmoten.rx.jdbc.callable.internal.TxGetter1;
18  import org.davidmoten.rx.jdbc.callable.internal.TxGetter2;
19  import org.davidmoten.rx.jdbc.callable.internal.TxGetter3;
20  import org.davidmoten.rx.jdbc.callable.internal.TxGetter4;
21  import org.davidmoten.rx.jdbc.callable.internal.TxGetterN;
22  import org.davidmoten.rx.jdbc.tuple.Tuple2;
23  import org.davidmoten.rx.jdbc.tuple.Tuple3;
24  import org.davidmoten.rx.jdbc.tuple.Tuple4;
25  import org.davidmoten.rx.jdbc.tuple.TupleN;
26  
27  import com.github.davidmoten.guavamini.Lists;
28  import com.github.davidmoten.guavamini.Preconditions;
29  
30  import io.reactivex.Flowable;
31  import io.reactivex.Notification;
32  import io.reactivex.Single;
33  import io.reactivex.functions.Function;
34  
35  public final class TransactedCallableBuilder implements TxGetter1 {
36  
37      private CallableBuilder b;
38  
39      public TransactedCallableBuilder(CallableBuilder b) {
40          this.b = b;
41      }
42  
43      public Flowable<List<Object>> parameterGroups() {
44          return b.parameterGroups();
45      }
46  
47      public TransactedCallableBuilder in() {
48          b.params.add(In.IN);
49          return this;
50      }
51  
52      public Single<TxWithoutValue> input(Flowable<?> f) {
53          Preconditions.checkArgument(b.inStream == null, "you can only specify in flowable once, current=" + b.inStream);
54          b.inStream = f;
55          return build();
56      }
57  
58      public Single<TxWithoutValue> once() {
59          return input(1);
60      }
61  
62      public Single<TxWithoutValue> input(Object... objects) {
63          return input(Flowable.fromArray(objects));
64      }
65  
66      public <T> CallableBuilder1<T> inOut(Type type, Class<T> cls) {
67          b.params.add(new InOut(type, cls));
68          return new CallableBuilder1<T>(b, cls);
69      }
70  
71      public <T> CallableBuilder1<T> out(Type type, Class<T> cls) {
72          b.params.add(new Out(type, cls));
73          return new CallableBuilder1<T>(b, cls);
74      }
75  
76      @Override
77      public <T> CallableResultSets1Builder<T> get(Function<? super ResultSet, ? extends T> function) {
78          return new CallableResultSets1Builder<T>(b, function);
79      }
80  
81      public <T> CallableResultSets1Builder<T> autoMap(Class<T> cls) {
82          return get(Util.autoMap(cls));
83      }
84  
85      @SuppressWarnings("unchecked")
86      private Single<TxWithoutValue> build() {
87          return Single.defer(() -> {
88              AtomicReference<Connection> con = new AtomicReference<Connection>();
89              // set the atomic reference when transactedConnection emits
90              Single<Connection> transactedConnection = b.connection //
91                      .map(c -> Util.toTransactedConnection(con, c));
92              return Call //
93                      .createWithZeroOutParameters(transactedConnection, b.sql, parameterGroups(), b.params) //
94                      .materialize() //
95                      .filter(x -> !x.isOnNext()) //
96                      .<TxWithoutValue>flatMap(n -> Tx.toTx(n, con.get(), b.db)) //
97                      .doOnNext(tx -> {
98                          if (tx.isComplete()) {
99                              ((TxImpl<Object>) tx).connection().commit();
100                         }
101                     }) //
102                     .lastOrError();
103         });
104     }
105 
106     public static final class CallableBuilder1<T1> implements TxGetter1 {
107 
108         private final CallableBuilder b;
109         private final Class<T1> cls;
110 
111         public CallableBuilder1(CallableBuilder b, Class<T1> cls) {
112             this.b = b;
113             this.cls = cls;
114         }
115 
116         public CallableBuilder1<T1> in() {
117             b.in();
118             return this;
119         }
120 
121         public <T2> CallableBuilder2<T1, T2> out(Type type, Class<T2> cls2) {
122             b.out(type, cls2);
123             return new CallableBuilder2<T1, T2>(b, cls, cls2);
124         }
125 
126         public <T2> CallableBuilder2<T1, T2> inOut(Type type, Class<T2> cls2) {
127             b.inOut(type, cls2);
128             return new CallableBuilder2<T1, T2>(b, cls, cls2);
129         }
130 
131         public Flowable<Tx<T1>> input(Flowable<?> f) {
132             b.input(f);
133             return build();
134         }
135 
136         public Flowable<Tx<T1>> input(Object... objects) {
137             return input(Flowable.fromArray(objects));
138         }
139 
140         @Override
141         public <T> CallableResultSets1Builder<T> get(Function<? super ResultSet, ? extends T> function) {
142             return new CallableResultSets1Builder<T>(b, function);
143         }
144 
145         public <T> CallableResultSets1Builder<T> autoMap(Class<T> cls) {
146             return get(Util.autoMap(cls));
147         }
148 
149         private Flowable<Tx<T1>> build() {
150             return inTransaction(b, con -> Call //
151                     .createWithOneOutParameter(con, b.sql, b.parameterGroups(), b.params, cls));
152         }
153     }
154 
155     public static final class CallableBuilder2<T1, T2> implements TxGetter1 {
156 
157         private final CallableBuilder b;
158         private final Class<T1> cls1;
159         private final Class<T2> cls2;
160 
161         public CallableBuilder2(CallableBuilder b, Class<T1> cls1, Class<T2> cls2) {
162             this.b = b;
163             this.cls1 = cls1;
164             this.cls2 = cls2;
165         }
166 
167         public <T3> CallableBuilder3<T1, T2, T3> out(Type type, Class<T3> cls3) {
168             b.out(type, cls3);
169             return new CallableBuilder3<T1, T2, T3>(b, cls1, cls2, cls3);
170         }
171 
172         public Flowable<Tx<Tuple2<T1, T2>>> input(Flowable<?> f) {
173             b.input(f);
174             return build();
175         }
176 
177         public CallableBuilder2<T1, T2> in() {
178             b.in();
179             return this;
180         }
181 
182         public Flowable<Tx<Tuple2<T1, T2>>> input(Object... objects) {
183             return input(Flowable.fromArray(objects));
184         }
185 
186         public <T3> CallableBuilder3<T1, T2, T3> inOut(Type type, Class<T3> cls3) {
187             b.inOut(type, cls3);
188             return new CallableBuilder3<T1, T2, T3>(b, cls1, cls2, cls3);
189         }
190 
191         @Override
192         public <T> CallableResultSets1Builder<T> get(Function<? super ResultSet, ? extends T> function) {
193             return new CallableResultSets1Builder<T>(b, function);
194         }
195 
196         public <T> CallableResultSets1Builder<T> autoMap(Class<T> cls) {
197             return get(Util.autoMap(cls));
198         }
199 
200         private Flowable<Tx<Tuple2<T1, T2>>> build() {
201             return inTransaction(b,
202                     con -> Call.createWithTwoOutParameters(con, b.sql, b.parameterGroups(), b.params, cls1, cls2));
203         }
204     }
205 
206     public static final class CallableBuilder3<T1, T2, T3> implements TxGetter1 {
207 
208         private final CallableBuilder b;
209         private final Class<T1> cls1;
210         private final Class<T2> cls2;
211         private final Class<T3> cls3;
212 
213         public CallableBuilder3(CallableBuilder b, Class<T1> cls1, Class<T2> cls2, Class<T3> cls3) {
214             this.b = b;
215             this.cls1 = cls1;
216             this.cls2 = cls2;
217             this.cls3 = cls3;
218         }
219 
220         public <T4> CallableBuilder4<T1, T2, T3, T4> out(Type type, Class<T4> cls4) {
221             b.out(type, cls4);
222             return new CallableBuilder4<T1, T2, T3, T4>(b, cls1, cls2, cls3, cls4);
223         }
224 
225         public Flowable<Tx<Tuple3<T1, T2, T3>>> input(Flowable<?> f) {
226             b.input(f);
227             return build();
228         }
229 
230         public CallableBuilder3<T1, T2, T3> in() {
231             b.in();
232             return this;
233         }
234 
235         public Flowable<Tx<Tuple3<T1, T2, T3>>> input(Object... objects) {
236             return input(Flowable.fromArray(objects));
237         }
238 
239         public <T4> CallableBuilder4<T1, T2, T3, T4> inOut(Type type, Class<T4> cls4) {
240             b.inOut(type, cls4);
241             return new CallableBuilder4<T1, T2, T3, T4>(b, cls1, cls2, cls3, cls4);
242         }
243 
244         @Override
245         public <T> CallableResultSets1Builder<T> get(Function<? super ResultSet, ? extends T> function) {
246             return new CallableResultSets1Builder<T>(b, function);
247         }
248 
249         public <T> CallableResultSets1Builder<T> autoMap(Class<T> cls) {
250             return get(Util.autoMap(cls));
251         }
252 
253         private Flowable<Tx<Tuple3<T1, T2, T3>>> build() {
254             return inTransaction(b, con -> Call.createWithThreeOutParameters(con, b.sql, b.parameterGroups(), b.params,
255                     cls1, cls2, cls3));
256 
257         }
258     }
259 
260     public static final class CallableBuilder4<T1, T2, T3, T4> implements TxGetter1 {
261 
262         private final CallableBuilder b;
263         private final Class<T1> cls1;
264         private final Class<T2> cls2;
265         private final Class<T3> cls3;
266         private final Class<T4> cls4;
267 
268         public CallableBuilder4(CallableBuilder b, Class<T1> cls1, Class<T2> cls2, Class<T3> cls3, Class<T4> cls4) {
269             this.b = b;
270             this.cls1 = cls1;
271             this.cls2 = cls2;
272             this.cls3 = cls3;
273             this.cls4 = cls4;
274         }
275 
276         public Flowable<Tx<Tuple4<T1, T2, T3, T4>>> input(Flowable<?> f) {
277             b.input(f);
278             return build();
279         }
280 
281         public CallableBuilder4<T1, T2, T3, T4> in() {
282             b.in();
283             return this;
284         }
285 
286         public Flowable<Tx<Tuple4<T1, T2, T3, T4>>> input(Object... objects) {
287             return input(Flowable.fromArray(objects));
288         }
289 
290         public CallableBuilderN inOut(Type type, Class<T3> cls5) {
291             b.inOut(type, cls5);
292             return new CallableBuilderN(b, Lists.newArrayList(cls1, cls2, cls3, cls4, cls5));
293         }
294 
295         public CallableBuilderN out(Type type, Class<?> cls5) {
296             b.out(type, cls5);
297             return new CallableBuilderN(b, Lists.newArrayList(cls1, cls2, cls3, cls4, cls5));
298         }
299 
300         @Override
301         public <T> CallableResultSets1Builder<T> get(Function<? super ResultSet, ? extends T> function) {
302             return new CallableResultSets1Builder<T>(b, function);
303         }
304 
305         public <T> CallableResultSets1Builder<T> autoMap(Class<T> cls) {
306             return get(Util.autoMap(cls));
307         }
308 
309         private Flowable<Tx<Tuple4<T1, T2, T3, T4>>> build() {
310             return inTransaction(b, con -> Call.createWithFourOutParameters(con, b.sql, b.parameterGroups(), b.params,
311                     cls1, cls2, cls3, cls4));
312         }
313     }
314 
315     public static final class CallableBuilderN implements TxGetter1 {
316 
317         private final CallableBuilder b;
318         private final List<Class<?>> outClasses;
319 
320         public CallableBuilderN(CallableBuilder b, List<Class<?>> outClasses) {
321             this.b = b;
322             this.outClasses = outClasses;
323         }
324 
325         public Flowable<Tx<TupleN<Object>>> input(Flowable<?> f) {
326             b.input(f);
327             return build();
328         }
329 
330         public CallableBuilderN in() {
331             b.in();
332             return this;
333         }
334 
335         public Flowable<Tx<TupleN<Object>>> input(Object... objects) {
336             return input(Flowable.fromArray(objects));
337         }
338 
339         public CallableBuilderN out(Type type, Class<?> cls) {
340             b.out(type, cls);
341             return new CallableBuilderN(b, createList(outClasses, cls));
342         }
343 
344         @Override
345         public <T> CallableResultSets1Builder<T> get(Function<? super ResultSet, ? extends T> function) {
346             return new CallableResultSets1Builder<T>(b, function);
347         }
348 
349         public <T> CallableResultSets1Builder<T> autoMap(Class<T> cls) {
350             return get(Util.autoMap(cls));
351         }
352 
353         private Flowable<Tx<TupleN<Object>>> build() {
354             return inTransaction(b,
355                     con -> Call.createWithNParameters(con, b.sql, b.parameterGroups(), b.params, outClasses));
356         }
357     }
358 
359     public static final class CallableResultSets1Builder<T1> implements TxGetter2<T1> {
360 
361         private final CallableBuilder b;
362         private final Function<? super ResultSet, ? extends T1> f1;
363 
364         CallableResultSets1Builder(CallableBuilder b, Function<? super ResultSet, ? extends T1> function) {
365             this.b = b;
366             this.f1 = function;
367         }
368 
369         public CallableResultSets1Builder<T1> out(Type type, Class<?> cls5) {
370             b.out(type, cls5);
371             return this;
372         }
373 
374         public <T2> CallableResultSets2Builder<T1, T2> autoMap(Class<T2> cls) {
375             return get(Util.autoMap(cls));
376         }
377 
378         public <T2> CallableResultSets2Builder<T1, T2> get(Function<? super ResultSet, ? extends T2> f2) {
379             return new CallableResultSets2Builder<T1, T2>(b, f1, f2);
380         }
381 
382         public Flowable<Tx<CallableResultSet1<T1>>> input(Flowable<?> f) {
383             b.input(f);
384             return build();
385         }
386 
387         public CallableResultSets1Builder<T1> in() {
388             b.in();
389             return this;
390         }
391 
392         public Flowable<Tx<CallableResultSet1<T1>>> input(Object... objects) {
393             return input(Flowable.fromArray(objects));
394         }
395 
396         public CallableResultSets1Builder<T1> inOut(Type type, Class<?> cls) {
397             b.inOut(type, cls);
398             return this;
399         }
400 
401         private Flowable<Tx<CallableResultSet1<T1>>> build() {
402             return inTransaction(b,
403                     con -> Call.createWithOneResultSet(con, b.sql, b.parameterGroups(), b.params, f1, 0));
404         }
405 
406     }
407 
408     public static final class CallableResultSets2Builder<T1, T2> implements TxGetter3<T1, T2> {
409 
410         private final CallableBuilder b;
411         private final Function<? super ResultSet, ? extends T1> f1;
412         private final Function<? super ResultSet, ? extends T2> f2;
413 
414         CallableResultSets2Builder(CallableBuilder b, Function<? super ResultSet, ? extends T1> f1,
415                 Function<? super ResultSet, ? extends T2> f2) {
416             this.b = b;
417             this.f1 = f1;
418             this.f2 = f2;
419         }
420 
421         public CallableResultSets2Builder<T1, T2> out(Type type, Class<?> cls5) {
422             b.out(type, cls5);
423             return this;
424         }
425 
426         public Flowable<Tx<CallableResultSet2<T1, T2>>> input(Flowable<?> f) {
427             b.input(f);
428             return build();
429         }
430 
431         public CallableResultSets2Builder<T1, T2> in() {
432             b.in();
433             return this;
434         }
435 
436         public Flowable<Tx<CallableResultSet2<T1, T2>>> input(Object... objects) {
437             return input(Flowable.fromArray(objects));
438         }
439 
440         public CallableResultSets2Builder<T1, T2> inOut(Type type, Class<?> cls) {
441             b.inOut(type, cls);
442             return this;
443         }
444 
445         public <T3> CallableResultSets3Builder<T1, T2, T3> autoMap(Class<T3> cls) {
446             return get(Util.autoMap(cls));
447         }
448 
449         public <T3> CallableResultSets3Builder<T1, T2, T3> get(Function<? super ResultSet, ? extends T3> f3) {
450             return new CallableResultSets3Builder<T1, T2, T3>(b, f1, f2, f3);
451         }
452 
453         private Flowable<Tx<CallableResultSet2<T1, T2>>> build() {
454             return inTransaction(b,
455                     con -> Call.createWithTwoResultSets(con, b.sql, b.parameterGroups(), b.params, f1, f2, 0));
456         }
457     }
458 
459     public static final class CallableResultSets3Builder<T1, T2, T3> implements TxGetter4<T1, T2, T3> {
460 
461         private final CallableBuilder b;
462         private final Function<? super ResultSet, ? extends T1> f1;
463         private final Function<? super ResultSet, ? extends T2> f2;
464         private final Function<? super ResultSet, ? extends T3> f3;
465 
466         CallableResultSets3Builder(CallableBuilder b, Function<? super ResultSet, ? extends T1> f1,
467                 Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3) {
468             this.b = b;
469             this.f1 = f1;
470             this.f2 = f2;
471             this.f3 = f3;
472         }
473 
474         public CallableResultSets3Builder<T1, T2, T3> out(Type type, Class<?> cls5) {
475             b.out(type, cls5);
476             return this;
477         }
478 
479         public CallableResultSets3Builder<T1, T2, T3> in() {
480             b.in();
481             return this;
482         }
483 
484         public Flowable<Tx<CallableResultSet3<T1, T2, T3>>> input(Flowable<?> f) {
485             b.input(f);
486             return build();
487         }
488 
489         public Flowable<Tx<CallableResultSet3<T1, T2, T3>>> input(Object... objects) {
490             return input(Flowable.fromArray(objects));
491         }
492 
493         public CallableResultSets3Builder<T1, T2, T3> inOut(Type type, Class<?> cls) {
494             b.inOut(type, cls);
495             return this;
496         }
497 
498         public <T4> CallableResultSets4Builder<T1, T2, T3, T4> autoMap(Class<T4> cls) {
499             return get(Util.autoMap(cls));
500         }
501 
502         public <T4> CallableResultSets4Builder<T1, T2, T3, T4> get(Function<? super ResultSet, ? extends T4> f4) {
503             return new CallableResultSets4Builder<T1, T2, T3, T4>(b, f1, f2, f3, f4);
504         }
505 
506         private Flowable<Tx<CallableResultSet3<T1, T2, T3>>> build() {
507             return inTransaction(b, con -> Call.createWithThreeResultSets(con, b.sql, b.parameterGroups(),
508                     b.params, f1, f2, f3, 0));
509         }
510     }
511 
512     public static final class CallableResultSets4Builder<T1, T2, T3, T4> implements TxGetterN {
513 
514         private final CallableBuilder b;
515         private final Function<? super ResultSet, ? extends T1> f1;
516         private final Function<? super ResultSet, ? extends T2> f2;
517         private final Function<? super ResultSet, ? extends T3> f3;
518         private final Function<? super ResultSet, ? extends T4> f4;
519 
520         CallableResultSets4Builder(CallableBuilder b, Function<? super ResultSet, ? extends T1> f1,
521                 Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3,
522                 Function<? super ResultSet, ? extends T4> f4) {
523             this.b = b;
524             this.f1 = f1;
525             this.f2 = f2;
526             this.f3 = f3;
527             this.f4 = f4;
528         }
529 
530         public CallableResultSets4Builder<T1, T2, T3, T4> out(Type type, Class<?> cls5) {
531             b.out(type, cls5);
532             return this;
533         }
534 
535         public CallableResultSets4Builder<T1, T2, T3, T4> in() {
536             b.in();
537             return this;
538         }
539 
540         public Flowable<Tx<CallableResultSet4<T1, T2, T3, T4>>> input(Flowable<?> f) {
541             b.input(f);
542             return build();
543         }
544 
545         public Flowable<Tx<CallableResultSet4<T1, T2, T3, T4>>> input(Object... objects) {
546             return input(Flowable.fromArray(objects));
547         }
548 
549         public CallableResultSets4Builder<T1, T2, T3, T4> inOut(Type type, Class<?> cls) {
550             b.inOut(type, cls);
551             return this;
552         }
553 
554         public <T> CallableResultSetsNBuilder autoMap(Class<T> cls) {
555             return get(Util.autoMap(cls));
556         }
557 
558         @SuppressWarnings("unchecked")
559         public CallableResultSetsNBuilder get(Function<? super ResultSet, ?> f5) {
560             return new CallableResultSetsNBuilder(b, Lists.newArrayList(f1, f2, f3, f4, f5));
561         }
562 
563         public Flowable<Tx<CallableResultSet4<T1, T2, T3, T4>>> build() {
564             return inTransaction(b,
565                     con -> Call.createWithFourResultSets(con, b.sql, b.parameterGroups(), b.params, f1, f2, f3, f4, 0));
566         }
567     }
568 
569     public static final class CallableResultSetsNBuilder {
570 
571         private final CallableBuilder b;
572         private final List<Function<? super ResultSet, ?>> functions;
573 
574         CallableResultSetsNBuilder(CallableBuilder b, List<Function<? super ResultSet, ?>> functions) {
575             this.b = b;
576             this.functions = functions;
577         }
578 
579         public CallableResultSetsNBuilder in() {
580             b.in();
581             return this;
582         }
583 
584         public Flowable<Tx<CallableResultSetN>> input(Flowable<?> f) {
585             b.input(f);
586             return build();
587         }
588 
589         public Flowable<Tx<CallableResultSetN>> input(Object... objects) {
590             return input(Flowable.fromArray(objects));
591         }
592 
593         public CallableResultSetsNBuilder inOut(Type type, Class<?> cls) {
594             b.inOut(type, cls);
595             return this;
596         }
597 
598         public <T> CallableResultSetsNBuilder autoMap(Class<T> cls) {
599             return get(Util.autoMap(cls));
600         }
601 
602         public CallableResultSetsNBuilder get(Function<? super ResultSet, ?> f) {
603             functions.add(f);
604             return this;
605         }
606 
607         private Flowable<Tx<CallableResultSetN>> build() {
608             return inTransaction(b,
609                     con -> Call.createWithNResultSets(con, b.sql, b.parameterGroups(), b.params, functions, 0) //
610             );
611         }
612     }
613 
614     private static <T> List<T> createList(List<T> list, T t) {
615         ArrayList<T> r = new ArrayList<>(list);
616         r.add(t);
617         return r;
618     }
619 
620     private static <T> Flowable<Tx<T>> inTransaction(CallableBuilder b,
621             Function<Single<Connection>, Flowable<Notification<T>>> f) {
622         return Flowable.defer(() -> {
623             AtomicReference<Connection> con = new AtomicReference<Connection>();
624             // set the atomic reference when transactedConnection emits
625             Single<Connection> transactedConnection = b.connection //
626                     .map(c -> Util.toTransactedConnection(con, c));
627             return f.apply(transactedConnection) //
628                     .<Tx<T>>flatMap(n -> Tx.toTx(n, con.get(), b.db)) //
629                     .doOnNext(tx -> {
630                         if (tx.isComplete()) {
631                             ((TxImpl<T>) tx).connection().commit();
632                         }
633                     });
634         });
635     }
636 
637 }