View Javadoc
1   package org.davidmoten.rxjava3.jdbc;
2   
3   import java.sql.CallableStatement;
4   import java.sql.Connection;
5   import java.sql.PreparedStatement;
6   import java.sql.ResultSet;
7   import java.sql.SQLException;
8   import java.sql.Statement;
9   import java.util.ArrayList;
10  import java.util.List;
11  
12  import org.davidmoten.rxjava3.jdbc.callable.CallableResultSet1;
13  import org.davidmoten.rxjava3.jdbc.callable.CallableResultSet2;
14  import org.davidmoten.rxjava3.jdbc.callable.CallableResultSet3;
15  import org.davidmoten.rxjava3.jdbc.callable.CallableResultSet4;
16  import org.davidmoten.rxjava3.jdbc.callable.CallableResultSetN;
17  import org.davidmoten.rxjava3.jdbc.callable.internal.InParameterPlaceholder;
18  import org.davidmoten.rxjava3.jdbc.callable.internal.OutParameterPlaceholder;
19  import org.davidmoten.rxjava3.jdbc.callable.internal.ParameterPlaceholder;
20  import org.davidmoten.rxjava3.jdbc.internal.Functions;
21  import org.davidmoten.rxjava3.jdbc.tuple.Tuple2;
22  import org.davidmoten.rxjava3.jdbc.tuple.Tuple3;
23  import org.davidmoten.rxjava3.jdbc.tuple.Tuple4;
24  import org.davidmoten.rxjava3.jdbc.tuple.TupleN;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import com.github.davidmoten.guavamini.Lists;
29  
30  import io.reactivex.rxjava3.core.Emitter;
31  import io.reactivex.rxjava3.core.Flowable;
32  import io.reactivex.rxjava3.core.Notification;
33  import io.reactivex.rxjava3.core.Single;
34  import io.reactivex.rxjava3.functions.BiConsumer;
35  import io.reactivex.rxjava3.functions.BiFunction;
36  import io.reactivex.rxjava3.functions.Consumer;
37  import io.reactivex.rxjava3.functions.Function;
38  import io.reactivex.rxjava3.functions.Supplier;
39  
40  final class Call {
41  
42      private static final Logger log = LoggerFactory.getLogger(Call.class);
43  
44      private Call() {
45          // prevent instantiation
46      }
47  
48      /////////////////////////
49      // No Parameters
50      /////////////////////////
51  
52      static Flowable<Integer> createWithZeroOutParameters(Single<Connection> connection, String sql,
53              Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders) {
54          return connection.toFlowable()
55                  .flatMap(con -> Call.<Integer>createWithParameters(con, sql, parameterGroups, parameterPlaceholders,
56                          (stmt, parameters) -> createWithZeroOutParameters(stmt, parameters, parameterPlaceholders)))
57                  .dematerialize(Functions.identity());
58      }
59  
60      private static Single<Integer> createWithZeroOutParameters(NamedCallableStatement stmt, List<Object> parameters,
61              List<ParameterPlaceholder> parameterPlaceholders) {
62          return Single.fromCallable(() -> {
63              CallableStatement st = stmt.stmt;
64              execute(stmt, parameters, parameterPlaceholders, 0, st);
65              return 1;
66          });
67      }
68  
69      /////////////////////////
70      // One Out Parameter
71      /////////////////////////
72  
73      static <T1> Flowable<Notification<T1>> createWithOneOutParameter(Single<Connection> connection, String sql,
74              Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls) {
75          return connection.toFlowable()
76                  .flatMap(con -> createWithParameters(con, sql, parameterGroups, parameterPlaceholders,
77                          (stmt, parameters) -> createWithOneParameter(stmt, parameters, parameterPlaceholders, cls)));
78      }
79  
80      private static <T> Single<T> createWithOneParameter(NamedCallableStatement stmt, List<Object> parameters,
81              List<ParameterPlaceholder> parameterPlaceholders, Class<T> cls1) {
82          return Single.fromCallable(() -> {
83              CallableStatement st = stmt.stmt;
84              List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, 2, st);
85              return Util.mapObject(st, cls1, outs.get(0).pos, outs.get(0).type);
86          });
87      }
88  
89      /////////////////////////
90      // Two Out Parameters
91      /////////////////////////
92  
93      static <T1, T2> Flowable<Notification<Tuple2<T1, T2>>> createWithTwoOutParameters(Single<Connection> connection,
94              String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
95              Class<T1> cls1, Class<T2> cls2) {
96          return connection.toFlowable().flatMap(con -> createWithParameters(con, sql, parameterGroups,
97                  parameterPlaceholders,
98                  (stmt, parameters) -> createWithTwoParameters(stmt, parameters, parameterPlaceholders, cls1, cls2)));
99      }
100 
101     private static <T1, T2> Single<Tuple2<T1, T2>> createWithTwoParameters(NamedCallableStatement stmt,
102             List<Object> parameters, List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2) {
103         return Single.fromCallable(() -> {
104             CallableStatement st = stmt.stmt;
105             List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, 2, st);
106             T1 o1 = Util.mapObject(st, cls1, outs.get(0).pos, outs.get(0).type);
107             T2 o2 = Util.mapObject(st, cls2, outs.get(1).pos, outs.get(1).type);
108             return Tuple2.create(o1, o2);
109         });
110     }
111 
112     private static final class PlaceAndType {
113         final int pos;
114         final Type type;
115 
116         PlaceAndType(int pos, Type type) {
117             this.pos = pos;
118             this.type = type;
119         }
120 
121     }
122 
123     /////////////////////////
124     // Three Out Parameters
125     /////////////////////////
126 
127     static <T1, T2, T3> Flowable<Notification<Tuple3<T1, T2, T3>>> createWithThreeOutParameters(
128             Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
129             List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2, Class<T3> cls3) {
130         return connection.toFlowable()
131                 .flatMap(con -> createWithParameters(con, sql, parameterGroups, parameterPlaceholders,
132                         (stmt, parameters) -> createWithThreeParameters(stmt, parameters, parameterPlaceholders, cls1,
133                                 cls2, cls3)));
134     }
135 
136     private static <T1, T2, T3> Single<Tuple3<T1, T2, T3>> createWithThreeParameters(NamedCallableStatement stmt,
137             List<Object> parameters, List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2,
138             Class<T3> cls3) {
139         return Single.fromCallable(() -> {
140             CallableStatement st = stmt.stmt;
141             List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, 3, st);
142             T1 o1 = Util.mapObject(st, cls1, outs.get(0).pos, outs.get(0).type);
143             T2 o2 = Util.mapObject(st, cls2, outs.get(1).pos, outs.get(1).type);
144             T3 o3 = Util.mapObject(st, cls3, outs.get(2).pos, outs.get(2).type);
145             return Tuple3.create(o1, o2, o3);
146         });
147     }
148 
149     /////////////////////////
150     // Four Out Parameters
151     /////////////////////////
152 
153     static <T1, T2, T3, T4> Flowable<Notification<Tuple4<T1, T2, T3, T4>>> createWithFourOutParameters(
154             Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
155             List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2, Class<T3> cls3,
156             Class<T4> cls4) {
157         return connection.toFlowable()
158                 .flatMap(con -> createWithParameters(con, sql, parameterGroups, parameterPlaceholders,
159                         (stmt, parameters) -> createWithFourParameters(stmt, parameters, parameterPlaceholders, cls1,
160                                 cls2, cls3, cls4)));
161     }
162 
163     private static <T1, T2, T3, T4> Single<Tuple4<T1, T2, T3, T4>> createWithFourParameters(NamedCallableStatement stmt,
164             List<Object> parameters, List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2,
165             Class<T3> cls3, Class<T4> cls4) {
166         return Single.fromCallable(() -> {
167             CallableStatement st = stmt.stmt;
168             List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, 4, st);
169             T1 o1 = Util.mapObject(st, cls1, outs.get(0).pos, outs.get(0).type);
170             T2 o2 = Util.mapObject(st, cls2, outs.get(1).pos, outs.get(1).type);
171             T3 o3 = Util.mapObject(st, cls3, outs.get(2).pos, outs.get(2).type);
172             T4 o4 = Util.mapObject(st, cls4, outs.get(3).pos, outs.get(3).type);
173             return Tuple4.create(o1, o2, o3, o4);
174         });
175     }
176 
177     /////////////////////////
178     // N Out Parameters
179     /////////////////////////
180 
181     static Flowable<Notification<TupleN<Object>>> createWithNParameters( //
182             Single<Connection> connection, //
183             String sql, //
184             Flowable<List<Object>> parameterGroups, //
185             List<ParameterPlaceholder> parameterPlaceholders, //
186             List<Class<?>> outClasses) {
187         return connection //
188                 .toFlowable() //
189                 .flatMap( //
190                         con -> createWithParameters( //
191                                 con, //
192                                 sql, //
193                                 parameterGroups, //
194                                 parameterPlaceholders, //
195                                 (stmt, parameters) -> createWithNParameters(stmt, parameters, parameterPlaceholders,
196                                         outClasses)));
197     }
198 
199     private static Single<TupleN<Object>> createWithNParameters( //
200             NamedCallableStatement stmt, //
201             List<Object> parameters, //
202             List<ParameterPlaceholder> parameterPlaceholders, //
203             List<Class<?>> outClasses) {
204         return Single.fromCallable(() -> {
205             CallableStatement st = stmt.stmt;
206             List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, Integer.MAX_VALUE, st);
207             Object[] outputs = new Object[outClasses.size()];
208             for (int i = 0; i < outClasses.size(); i++) {
209                 outputs[i] = Util.mapObject(st, outClasses.get(i), outs.get(i).pos, outs.get(i).type);
210             }
211             return TupleN.create(outputs);
212         });
213     }
214 
215     /////////////////////////
216     // One ResultSet
217     /////////////////////////
218 
219     static <T1> Flowable<Notification<CallableResultSet1<T1>>> createWithOneResultSet(Single<Connection> connection,
220             String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
221             Function<? super ResultSet, ? extends T1> f1, int fetchSize) {
222         return connection.toFlowable().flatMap(
223                 con -> createWithOneResultSet(con, sql, parameterGroups, parameterPlaceholders, f1, fetchSize));
224     }
225 
226     private static <T1> Flowable<Notification<CallableResultSet1<T1>>> createWithOneResultSet(Connection con,
227             String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
228             Function<? super ResultSet, ? extends T1> f1, int fetchSize) {
229         log.debug("Update.create {}", sql);
230         Supplier<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, fetchSize, sql, parameterPlaceholders);
231         final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet1<T1>>>> flowableFactory = //
232                 stmt -> parameterGroups //
233                         .flatMap(parameters -> {
234                             List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
235                                     parameters);
236                             Flowable<T1> flowable1 = createFlowable(stmt, f1);
237                             return Single.just(new CallableResultSet1<T1>(outputValues, flowable1)).toFlowable();
238                         }) //
239                         .materialize() //
240                         .doOnComplete(() -> Util.commit(stmt.stmt)) //
241                         .doOnError(e -> Util.rollback(stmt.stmt));
242         Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
243         return Flowable.using(resourceFactory, flowableFactory, disposer, true);
244     }
245 
246     /////////////////////////
247     // Two ResultSets
248     /////////////////////////
249 
250     static <T1, T2> Flowable<Notification<CallableResultSet2<T1, T2>>> createWithTwoResultSets(
251             Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
252             List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
253             Function<? super ResultSet, ? extends T2> f2, int fetchSize) {
254         return connection.toFlowable().flatMap(
255                 con -> createWithTwoResultSets(con, sql, parameterGroups, parameterPlaceholders, f1, f2, fetchSize));
256     }
257 
258     private static <T1, T2> Flowable<Notification<CallableResultSet2<T1, T2>>> createWithTwoResultSets(Connection con,
259             String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
260             Function<? super ResultSet, ? extends T1> f1, Function<? super ResultSet, ? extends T2> f2, int fetchSize) {
261         Supplier<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, fetchSize, sql, parameterPlaceholders);
262         final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet2<T1, T2>>>> flowableFactory = //
263                 stmt -> parameterGroups //
264                         .flatMap(parameters -> {
265                             List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
266                                     parameters);
267                             final Flowable<T1> flowable1 = createFlowable(stmt, f1);
268                             stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
269                             final Flowable<T2> flowable2 = createFlowable(stmt, f2);
270                             return Single.just(new CallableResultSet2<T1, T2>(outputValues, flowable1, flowable2))
271                                     .toFlowable();
272                         }) //
273                         .materialize() //
274                         .doOnComplete(() -> Util.commit(stmt.stmt)) //
275                         .doOnError(e -> Util.rollback(stmt.stmt));
276         Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
277         return Flowable.using(resourceFactory, flowableFactory, disposer, true);
278     }
279 
280     /////////////////////////
281     // Three ResultSets
282     /////////////////////////
283 
284     static <T1, T2, T3> Flowable<Notification<CallableResultSet3<T1, T2, T3>>> createWithThreeResultSets(
285             Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
286             List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
287             Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3, int fetchSize) {
288         return connection.toFlowable().flatMap(con -> createWithThreeResultSets(con, sql, parameterGroups,
289                 parameterPlaceholders, f1, f2, f3, fetchSize));
290     }
291 
292     private static <T1, T2, T3> Flowable<Notification<CallableResultSet3<T1, T2, T3>>> createWithThreeResultSets(
293             Connection con, String sql, Flowable<List<Object>> parameterGroups,
294             List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
295             Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3, int fetchSize) {
296         Supplier<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, fetchSize, sql, parameterPlaceholders);
297         final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet3<T1, T2, T3>>>> flowableFactory = //
298                 stmt -> parameterGroups //
299                         .flatMap(parameters -> {
300                             List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
301                                     parameters);
302                             final Flowable<T1> flowable1 = createFlowable(stmt, f1);
303                             stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
304                             final Flowable<T2> flowable2 = createFlowable(stmt, f2);
305                             stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
306                             final Flowable<T3> flowable3 = createFlowable(stmt, f3);
307                             return Single.just(
308                                     new CallableResultSet3<T1, T2, T3>(outputValues, flowable1, flowable2, flowable3))
309                                     .toFlowable();
310                         }) //
311                         .materialize() //
312                         .doOnComplete(() -> Util.commit(stmt.stmt)) //
313                         .doOnError(e -> Util.rollback(stmt.stmt));
314         Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
315         return Flowable.using(resourceFactory, flowableFactory, disposer, true);
316     }
317     /////////////////////////
318     // Four ResultSets
319     /////////////////////////
320 
321     static <T1, T2, T3, T4> Flowable<Notification<CallableResultSet4<T1, T2, T3, T4>>> createWithFourResultSets(
322             Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
323             List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
324             Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3,
325             Function<? super ResultSet, ? extends T4> f4, int fetchSize) {
326         return connection.toFlowable().flatMap(con -> createWithFourResultSets(con, sql, parameterGroups,
327                 parameterPlaceholders, f1, f2, f3, f4, fetchSize));
328     }
329 
330     private static <T1, T2, T3, T4> Flowable<Notification<CallableResultSet4<T1, T2, T3, T4>>> createWithFourResultSets(
331             Connection con, String sql, Flowable<List<Object>> parameterGroups,
332             List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
333             Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3,
334             Function<? super ResultSet, ? extends T4> f4, int fetchSize) {
335         Supplier<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, fetchSize, sql, parameterPlaceholders);
336         final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet4<T1, T2, T3, T4>>>> flowableFactory = //
337                 stmt -> parameterGroups //
338                         .flatMap(parameters -> {
339                             List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
340                                     parameters);
341                             final Flowable<T1> flowable1 = createFlowable(stmt, f1);
342                             stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
343                             final Flowable<T2> flowable2 = createFlowable(stmt, f2);
344                             stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
345                             final Flowable<T3> flowable3 = createFlowable(stmt, f3);
346                             stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
347                             final Flowable<T4> flowable4 = createFlowable(stmt, f4);
348                             return Single.just(new CallableResultSet4<T1, T2, T3, T4>(outputValues, flowable1,
349                                     flowable2, flowable3, flowable4)).toFlowable();
350                         }) //
351                         .materialize() //
352                         .doOnComplete(() -> Util.commit(stmt.stmt)) //
353                         .doOnError(e -> Util.rollback(stmt.stmt));
354         Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
355         return Flowable.using(resourceFactory, flowableFactory, disposer, true);
356     }
357 
358     /////////////////////////
359     // N ResultSets
360     /////////////////////////
361 
362     static Flowable<Notification<CallableResultSetN>> createWithNResultSets(Single<Connection> connection, String sql,
363             Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
364             List<Function<? super ResultSet, ?>> functions, int fetchSize) {
365         return connection.toFlowable().flatMap(
366                 con -> createWithNResultSets(con, sql, parameterGroups, parameterPlaceholders, functions, fetchSize));
367     }
368 
369     private static Flowable<Notification<CallableResultSetN>> createWithNResultSets(Connection con, String sql,
370             Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
371             List<Function<? super ResultSet, ?>> functions, int fetchSize) {
372         Supplier<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, fetchSize, sql, parameterPlaceholders);
373         final Function<NamedCallableStatement, Flowable<Notification<CallableResultSetN>>> flowableFactory = //
374                 stmt -> parameterGroups //
375                         .flatMap(parameters -> {
376                             List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
377                                     parameters);
378                             List<Flowable<?>> flowables = Lists.newArrayList();
379                             int i = 0;
380                             do {
381                                 Function<? super ResultSet, ?> f = functions.get(i);
382                                 flowables.add(createFlowable(stmt, f));
383                                 i++;
384                             } while (stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT));
385                             return Single.just(new CallableResultSetN(outputValues, flowables)).toFlowable();
386                         }) //
387                         .materialize() //
388                         .doOnComplete(() -> Util.commit(stmt.stmt)) //
389                         .doOnError(e -> Util.rollback(stmt.stmt));
390         Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
391         return Flowable.using(resourceFactory, flowableFactory, disposer, true);
392     }
393 
394     ////////////////////////////////////
395     // Utilty Methods
396     ///////////////////////////////////
397 
398     private static <T> Flowable<Notification<T>> createWithParameters(Connection con, String sql,
399             Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
400             BiFunction<NamedCallableStatement, List<Object>, Single<T>> single) {
401         Supplier<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
402         final Function<NamedCallableStatement, Flowable<Notification<T>>> flowableFactory = //
403                 stmt -> parameterGroups //
404                         .flatMap(parameters -> single.apply(stmt, parameters).toFlowable()) //
405                         .materialize() //
406                         .doOnComplete(() -> Util.commit(stmt.stmt)) //
407                         .doOnError(e -> Util.rollback(stmt.stmt));
408         Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
409         return Flowable.using(resourceFactory, flowableFactory, disposer, true);
410     }
411 
412     static PreparedStatement setParameters(PreparedStatement ps, List<Object> parameters,
413             List<ParameterPlaceholder> parameterPlaceholders, List<String> names) throws SQLException {
414         // TODO handle Parameter objects (named)
415         if (names.isEmpty()) {
416             int i = 0;
417             for (int j = 0; j < parameterPlaceholders.size() && i < parameters.size(); j++) {
418                 ParameterPlaceholder p = parameterPlaceholders.get(j);
419                 if (p instanceof InParameterPlaceholder) {
420                     Util.setParameter(ps, j + 1, parameters.get(i));
421                     i++;
422                 }
423             }
424         } else {
425             // TODO
426             throw new RuntimeException("named paramters not implemented yet for CallableStatement yet");
427             // Util.setNamedParameters(ps, params, names);
428         }
429         return ps;
430     }
431 
432     private static List<PlaceAndType> execute(NamedCallableStatement stmt, List<Object> parameters,
433             List<ParameterPlaceholder> parameterPlaceholders, int outCount, CallableStatement st) throws SQLException {
434         Util.incrementCounter(st.getConnection());
435         setParameters(st, parameters, parameterPlaceholders, stmt.names);
436         int initialSize = outCount == Integer.MAX_VALUE ? 16 : outCount;
437         List<PlaceAndType> outs = new ArrayList<PlaceAndType>(initialSize);
438         for (int j = 0; j < parameterPlaceholders.size(); j++) {
439             ParameterPlaceholder p = parameterPlaceholders.get(j);
440             if (p instanceof OutParameterPlaceholder) {
441                 outs.add(new PlaceAndType(j + 1, ((OutParameterPlaceholder) p).type()));
442                 if (outs.size() == outCount) {
443                     break;
444                 }
445             }
446         }
447         st.execute();
448         return outs;
449     }
450 
451     private static <T> Flowable<T> createFlowable(NamedCallableStatement stmt,
452             Function<? super ResultSet, ? extends T> f) throws SQLException {
453         ResultSet rsActual = stmt.stmt.getResultSet();
454         Supplier<ResultSet> initialState = () -> rsActual;
455         BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
456             log.debug("getting row from ps={}, rs={}", stmt.stmt, rs);
457             if (rs.next()) {
458                 T v = f.apply(rs);
459                 log.debug("emitting {}", v);
460                 emitter.onNext(v);
461             } else {
462                 log.debug("completed");
463                 emitter.onComplete();
464             }
465         };
466         Consumer<ResultSet> disposeState = Util::closeSilently;
467         return Flowable.generate(initialState, generator, disposeState);
468     }
469 
470     private static List<Object> executeAndReturnOutputValues(List<ParameterPlaceholder> parameterPlaceholders,
471             NamedCallableStatement stmt, List<Object> parameters) throws SQLException {
472         List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, Integer.MAX_VALUE, stmt.stmt);
473         List<Object> list = new ArrayList<>(outs.size());
474         for (PlaceAndType p : outs) {
475             // TODO convert to a desired return type?
476             list.add(stmt.stmt.getObject(p.pos));
477         }
478         return list;
479     }
480 
481 }