View Javadoc
1   package org.davidmoten.rx.jdbc;
2   
3   import java.sql.CallableStatement;
4   import java.sql.Connection;
5   import java.sql.PreparedStatement;
6   import java.sql.ResultSet;
7   import java.sql.SQLException;
8   import java.sql.Statement;
9   import java.util.ArrayList;
10  import java.util.List;
11  import java.util.concurrent.Callable;
12  
13  import org.davidmoten.rx.jdbc.callable.CallableResultSet1;
14  import org.davidmoten.rx.jdbc.callable.CallableResultSet2;
15  import org.davidmoten.rx.jdbc.callable.CallableResultSet3;
16  import org.davidmoten.rx.jdbc.callable.CallableResultSet4;
17  import org.davidmoten.rx.jdbc.callable.CallableResultSetN;
18  import org.davidmoten.rx.jdbc.callable.internal.InParameterPlaceholder;
19  import org.davidmoten.rx.jdbc.callable.internal.OutParameterPlaceholder;
20  import org.davidmoten.rx.jdbc.callable.internal.ParameterPlaceholder;
21  import org.davidmoten.rx.jdbc.tuple.Tuple2;
22  import org.davidmoten.rx.jdbc.tuple.Tuple3;
23  import org.davidmoten.rx.jdbc.tuple.Tuple4;
24  import org.davidmoten.rx.jdbc.tuple.TupleN;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import com.github.davidmoten.guavamini.Lists;
29  
30  import io.reactivex.Emitter;
31  import io.reactivex.Flowable;
32  import io.reactivex.Notification;
33  import io.reactivex.Single;
34  import io.reactivex.functions.BiConsumer;
35  import io.reactivex.functions.BiFunction;
36  import io.reactivex.functions.Consumer;
37  import io.reactivex.functions.Function;
38  
39  final class Call {
40  
41      private static final Logger log = LoggerFactory.getLogger(Call.class);
42  
43      private Call() {
44          // prevent instantiation
45      }
46  
47      /////////////////////////
48      // No Parameters
49      /////////////////////////
50  
51      static Flowable<Integer> createWithZeroOutParameters(Single<Connection> connection, String sql,
52              Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders) {
53          return connection.toFlowable()
54                  .flatMap(con -> Call.<Integer>createWithParameters(con, sql, parameterGroups, parameterPlaceholders,
55                          (stmt, parameters) -> createWithZeroOutParameters(stmt, parameters, parameterPlaceholders)))
56                  .dematerialize();
57      }
58  
59      private static Single<Integer> createWithZeroOutParameters(NamedCallableStatement stmt, List<Object> parameters,
60              List<ParameterPlaceholder> parameterPlaceholders) {
61          return Single.fromCallable(() -> {
62              CallableStatement st = stmt.stmt;
63              execute(stmt, parameters, parameterPlaceholders, 0, st);
64              return 1;
65          });
66      }
67  
68      /////////////////////////
69      // One Out Parameter
70      /////////////////////////
71  
72      static <T1> Flowable<Notification<T1>> createWithOneOutParameter(Single<Connection> connection, String sql,
73              Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls) {
74          return connection.toFlowable()
75                  .flatMap(con -> createWithParameters(con, sql, parameterGroups, parameterPlaceholders,
76                          (stmt, parameters) -> createWithOneParameter(stmt, parameters, parameterPlaceholders, cls)));
77      }
78  
79      private static <T> Single<T> createWithOneParameter(NamedCallableStatement stmt, List<Object> parameters,
80              List<ParameterPlaceholder> parameterPlaceholders, Class<T> cls1) {
81          return Single.fromCallable(() -> {
82              CallableStatement st = stmt.stmt;
83              List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, 2, st);
84              return Util.mapObject(st, cls1, outs.get(0).pos, outs.get(0).type);
85          });
86      }
87  
88      /////////////////////////
89      // Two Out Parameters
90      /////////////////////////
91  
92      static <T1, T2> Flowable<Notification<Tuple2<T1, T2>>> createWithTwoOutParameters(Single<Connection> connection,
93              String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
94              Class<T1> cls1, Class<T2> cls2) {
95          return connection.toFlowable().flatMap(con -> createWithParameters(con, sql, parameterGroups,
96                  parameterPlaceholders,
97                  (stmt, parameters) -> createWithTwoParameters(stmt, parameters, parameterPlaceholders, cls1, cls2)));
98      }
99  
100     private static <T1, T2> Single<Tuple2<T1, T2>> createWithTwoParameters(NamedCallableStatement stmt,
101             List<Object> parameters, List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2) {
102         return Single.fromCallable(() -> {
103             CallableStatement st = stmt.stmt;
104             List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, 2, st);
105             T1 o1 = Util.mapObject(st, cls1, outs.get(0).pos, outs.get(0).type);
106             T2 o2 = Util.mapObject(st, cls2, outs.get(1).pos, outs.get(1).type);
107             return Tuple2.create(o1, o2);
108         });
109     }
110 
111     private static final class PlaceAndType {
112         final int pos;
113         final Type type;
114 
115         PlaceAndType(int pos, Type type) {
116             this.pos = pos;
117             this.type = type;
118         }
119 
120     }
121 
122     /////////////////////////
123     // Three Out Parameters
124     /////////////////////////
125 
126     static <T1, T2, T3> Flowable<Notification<Tuple3<T1, T2, T3>>> createWithThreeOutParameters(
127             Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
128             List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2, Class<T3> cls3) {
129         return connection.toFlowable()
130                 .flatMap(con -> createWithParameters(con, sql, parameterGroups, parameterPlaceholders,
131                         (stmt, parameters) -> createWithThreeParameters(stmt, parameters, parameterPlaceholders, cls1,
132                                 cls2, cls3)));
133     }
134 
135     private static <T1, T2, T3> Single<Tuple3<T1, T2, T3>> createWithThreeParameters(NamedCallableStatement stmt,
136             List<Object> parameters, List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2,
137             Class<T3> cls3) {
138         return Single.fromCallable(() -> {
139             CallableStatement st = stmt.stmt;
140             List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, 3, st);
141             T1 o1 = Util.mapObject(st, cls1, outs.get(0).pos, outs.get(0).type);
142             T2 o2 = Util.mapObject(st, cls2, outs.get(1).pos, outs.get(1).type);
143             T3 o3 = Util.mapObject(st, cls3, outs.get(2).pos, outs.get(2).type);
144             return Tuple3.create(o1, o2, o3);
145         });
146     }
147 
148     /////////////////////////
149     // Four Out Parameters
150     /////////////////////////
151 
152     static <T1, T2, T3, T4> Flowable<Notification<Tuple4<T1, T2, T3, T4>>> createWithFourOutParameters(
153             Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
154             List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2, Class<T3> cls3,
155             Class<T4> cls4) {
156         return connection.toFlowable()
157                 .flatMap(con -> createWithParameters(con, sql, parameterGroups, parameterPlaceholders,
158                         (stmt, parameters) -> createWithFourParameters(stmt, parameters, parameterPlaceholders, cls1,
159                                 cls2, cls3, cls4)));
160     }
161 
162     private static <T1, T2, T3, T4> Single<Tuple4<T1, T2, T3, T4>> createWithFourParameters(NamedCallableStatement stmt,
163             List<Object> parameters, List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2,
164             Class<T3> cls3, Class<T4> cls4) {
165         return Single.fromCallable(() -> {
166             CallableStatement st = stmt.stmt;
167             List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, 4, st);
168             T1 o1 = Util.mapObject(st, cls1, outs.get(0).pos, outs.get(0).type);
169             T2 o2 = Util.mapObject(st, cls2, outs.get(1).pos, outs.get(1).type);
170             T3 o3 = Util.mapObject(st, cls3, outs.get(2).pos, outs.get(2).type);
171             T4 o4 = Util.mapObject(st, cls4, outs.get(3).pos, outs.get(3).type);
172             return Tuple4.create(o1, o2, o3, o4);
173         });
174     }
175 
176     /////////////////////////
177     // N Out Parameters
178     /////////////////////////
179 
180     static Flowable<Notification<TupleN<Object>>> createWithNParameters( //
181             Single<Connection> connection, //
182             String sql, //
183             Flowable<List<Object>> parameterGroups, //
184             List<ParameterPlaceholder> parameterPlaceholders, //
185             List<Class<?>> outClasses) {
186         return connection //
187                 .toFlowable() //
188                 .flatMap( //
189                         con -> createWithParameters( //
190                                 con, //
191                                 sql, //
192                                 parameterGroups, //
193                                 parameterPlaceholders, //
194                                 (stmt, parameters) -> createWithNParameters(stmt, parameters, parameterPlaceholders,
195                                         outClasses)));
196     }
197 
198     private static Single<TupleN<Object>> createWithNParameters( //
199             NamedCallableStatement stmt, //
200             List<Object> parameters, //
201             List<ParameterPlaceholder> parameterPlaceholders, //
202             List<Class<?>> outClasses) {
203         return Single.fromCallable(() -> {
204             CallableStatement st = stmt.stmt;
205             List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, Integer.MAX_VALUE, st);
206             Object[] outputs = new Object[outClasses.size()];
207             for (int i = 0; i < outClasses.size(); i++) {
208                 outputs[i] = Util.mapObject(st, outClasses.get(i), outs.get(i).pos, outs.get(i).type);
209             }
210             return TupleN.create(outputs);
211         });
212     }
213 
214     /////////////////////////
215     // One ResultSet
216     /////////////////////////
217 
218     static <T1> Flowable<Notification<CallableResultSet1<T1>>> createWithOneResultSet(Single<Connection> connection,
219             String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
220             Function<? super ResultSet, ? extends T1> f1, int fetchSize) {
221         return connection.toFlowable().flatMap(
222                 con -> createWithOneResultSet(con, sql, parameterGroups, parameterPlaceholders, f1, fetchSize));
223     }
224 
225     private static <T1> Flowable<Notification<CallableResultSet1<T1>>> createWithOneResultSet(Connection con,
226             String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
227             Function<? super ResultSet, ? extends T1> f1, int fetchSize) {
228         log.debug("Update.create {}", sql);
229         Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
230         final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet1<T1>>>> flowableFactory = //
231                 stmt -> parameterGroups //
232                         .flatMap(parameters -> {
233                             List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
234                                     parameters);
235                             Flowable<T1> flowable1 = createFlowable(stmt, f1);
236                             return Single.just(new CallableResultSet1<T1>(outputValues, flowable1)).toFlowable();
237                         }) //
238                         .materialize() //
239                         .doOnComplete(() -> Util.commit(stmt.stmt)) //
240                         .doOnError(e -> Util.rollback(stmt.stmt));
241         Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
242         return Flowable.using(resourceFactory, flowableFactory, disposer, true);
243     }
244 
245     /////////////////////////
246     // Two ResultSets
247     /////////////////////////
248 
249     static <T1, T2> Flowable<Notification<CallableResultSet2<T1, T2>>> createWithTwoResultSets(
250             Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
251             List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
252             Function<? super ResultSet, ? extends T2> f2, int fetchSize) {
253         return connection.toFlowable().flatMap(
254                 con -> createWithTwoResultSets(con, sql, parameterGroups, parameterPlaceholders, f1, f2, fetchSize));
255     }
256 
257     private static <T1, T2> Flowable<Notification<CallableResultSet2<T1, T2>>> createWithTwoResultSets(Connection con,
258             String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
259             Function<? super ResultSet, ? extends T1> f1, Function<? super ResultSet, ? extends T2> f2, int fetchSize) {
260         Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
261         final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet2<T1, T2>>>> flowableFactory = //
262                 stmt -> parameterGroups //
263                         .flatMap(parameters -> {
264                             List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
265                                     parameters);
266                             final Flowable<T1> flowable1 = createFlowable(stmt, f1);
267                             stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
268                             final Flowable<T2> flowable2 = createFlowable(stmt, f2);
269                             return Single.just(new CallableResultSet2<T1, T2>(outputValues, flowable1, flowable2))
270                                     .toFlowable();
271                         }) //
272                         .materialize() //
273                         .doOnComplete(() -> Util.commit(stmt.stmt)) //
274                         .doOnError(e -> Util.rollback(stmt.stmt));
275         Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
276         return Flowable.using(resourceFactory, flowableFactory, disposer, true);
277     }
278 
279     /////////////////////////
280     // Three ResultSets
281     /////////////////////////
282 
283     static <T1, T2, T3> Flowable<Notification<CallableResultSet3<T1, T2, T3>>> createWithThreeResultSets(
284             Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
285             List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
286             Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3, int fetchSize) {
287         return connection.toFlowable().flatMap(con -> createWithThreeResultSets(con, sql, parameterGroups,
288                 parameterPlaceholders, f1, f2, f3, fetchSize));
289     }
290 
291     private static <T1, T2, T3> Flowable<Notification<CallableResultSet3<T1, T2, T3>>> createWithThreeResultSets(
292             Connection con, String sql, Flowable<List<Object>> parameterGroups,
293             List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
294             Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3, int fetchSize) {
295         Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
296         final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet3<T1, T2, T3>>>> flowableFactory = //
297                 stmt -> parameterGroups //
298                         .flatMap(parameters -> {
299                             List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
300                                     parameters);
301                             final Flowable<T1> flowable1 = createFlowable(stmt, f1);
302                             stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
303                             final Flowable<T2> flowable2 = createFlowable(stmt, f2);
304                             stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
305                             final Flowable<T3> flowable3 = createFlowable(stmt, f3);
306                             return Single.just(
307                                     new CallableResultSet3<T1, T2, T3>(outputValues, flowable1, flowable2, flowable3))
308                                     .toFlowable();
309                         }) //
310                         .materialize() //
311                         .doOnComplete(() -> Util.commit(stmt.stmt)) //
312                         .doOnError(e -> Util.rollback(stmt.stmt));
313         Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
314         return Flowable.using(resourceFactory, flowableFactory, disposer, true);
315     }
316     /////////////////////////
317     // Four ResultSets
318     /////////////////////////
319 
320     static <T1, T2, T3, T4> Flowable<Notification<CallableResultSet4<T1, T2, T3, T4>>> createWithFourResultSets(
321             Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
322             List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
323             Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3,
324             Function<? super ResultSet, ? extends T4> f4, int fetchSize) {
325         return connection.toFlowable().flatMap(con -> createWithFourResultSets(con, sql, parameterGroups,
326                 parameterPlaceholders, f1, f2, f3, f4, fetchSize));
327     }
328 
329     private static <T1, T2, T3, T4> Flowable<Notification<CallableResultSet4<T1, T2, T3, T4>>> createWithFourResultSets(
330             Connection con, String sql, Flowable<List<Object>> parameterGroups,
331             List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
332             Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3,
333             Function<? super ResultSet, ? extends T4> f4, int fetchSize) {
334         Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
335         final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet4<T1, T2, T3, T4>>>> flowableFactory = //
336                 stmt -> parameterGroups //
337                         .flatMap(parameters -> {
338                             List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
339                                     parameters);
340                             final Flowable<T1> flowable1 = createFlowable(stmt, f1);
341                             stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
342                             final Flowable<T2> flowable2 = createFlowable(stmt, f2);
343                             stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
344                             final Flowable<T3> flowable3 = createFlowable(stmt, f3);
345                             stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
346                             final Flowable<T4> flowable4 = createFlowable(stmt, f4);
347                             return Single.just(new CallableResultSet4<T1, T2, T3, T4>(outputValues, flowable1,
348                                     flowable2, flowable3, flowable4)).toFlowable();
349                         }) //
350                         .materialize() //
351                         .doOnComplete(() -> Util.commit(stmt.stmt)) //
352                         .doOnError(e -> Util.rollback(stmt.stmt));
353         Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
354         return Flowable.using(resourceFactory, flowableFactory, disposer, true);
355     }
356 
357     /////////////////////////
358     // N ResultSets
359     /////////////////////////
360 
361     static Flowable<Notification<CallableResultSetN>> createWithNResultSets(Single<Connection> connection, String sql,
362             Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
363             List<Function<? super ResultSet, ?>> functions, int fetchSize) {
364         return connection.toFlowable().flatMap(
365                 con -> createWithNResultSets(con, sql, parameterGroups, parameterPlaceholders, functions, fetchSize));
366     }
367 
368     private static Flowable<Notification<CallableResultSetN>> createWithNResultSets(Connection con, String sql,
369             Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
370             List<Function<? super ResultSet, ?>> functions, int fetchSize) {
371         Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
372         final Function<NamedCallableStatement, Flowable<Notification<CallableResultSetN>>> flowableFactory = //
373                 stmt -> parameterGroups //
374                         .flatMap(parameters -> {
375                             List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
376                                     parameters);
377                             List<Flowable<?>> flowables = Lists.newArrayList();
378                             int i = 0;
379                             do {
380                                 Function<? super ResultSet, ?> f = functions.get(i);
381                                 flowables.add(createFlowable(stmt, f));
382                                 i++;
383                             } while (stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT));
384                             return Single.just(new CallableResultSetN(outputValues, flowables)).toFlowable();
385                         }) //
386                         .materialize() //
387                         .doOnComplete(() -> Util.commit(stmt.stmt)) //
388                         .doOnError(e -> Util.rollback(stmt.stmt));
389         Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
390         return Flowable.using(resourceFactory, flowableFactory, disposer, true);
391     }
392 
393     ////////////////////////////////////
394     // Utilty Methods
395     ///////////////////////////////////
396 
397     private static <T> Flowable<Notification<T>> createWithParameters(Connection con, String sql,
398             Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
399             BiFunction<NamedCallableStatement, List<Object>, Single<T>> single) {
400         Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
401         final Function<NamedCallableStatement, Flowable<Notification<T>>> flowableFactory = //
402                 stmt -> parameterGroups //
403                         .flatMap(parameters -> single.apply(stmt, parameters).toFlowable()) //
404                         .materialize() //
405                         .doOnComplete(() -> Util.commit(stmt.stmt)) //
406                         .doOnError(e -> Util.rollback(stmt.stmt));
407         Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
408         return Flowable.using(resourceFactory, flowableFactory, disposer, true);
409     }
410 
411     static PreparedStatement setParameters(PreparedStatement ps, List<Object> parameters,
412             List<ParameterPlaceholder> parameterPlaceholders, List<String> names) throws SQLException {
413         // TODO handle Parameter objects (named)
414         if (names.isEmpty()) {
415             int i = 0;
416             for (int j = 0; j < parameterPlaceholders.size() && i < parameters.size(); j++) {
417                 ParameterPlaceholder p = parameterPlaceholders.get(j);
418                 if (p instanceof InParameterPlaceholder) {
419                     Util.setParameter(ps, j + 1, parameters.get(i));
420                     i++;
421                 }
422             }
423         } else {
424             // TODO
425             throw new RuntimeException("named paramters not implemented yet for CallableStatement yet");
426             // Util.setNamedParameters(ps, params, names);
427         }
428         return ps;
429     }
430 
431     private static List<PlaceAndType> execute(NamedCallableStatement stmt, List<Object> parameters,
432             List<ParameterPlaceholder> parameterPlaceholders, int outCount, CallableStatement st) throws SQLException {
433         Util.incrementCounter(st.getConnection());
434         setParameters(st, parameters, parameterPlaceholders, stmt.names);
435         int initialSize = outCount == Integer.MAX_VALUE ? 16 : outCount;
436         List<PlaceAndType> outs = new ArrayList<PlaceAndType>(initialSize);
437         for (int j = 0; j < parameterPlaceholders.size(); j++) {
438             ParameterPlaceholder p = parameterPlaceholders.get(j);
439             if (p instanceof OutParameterPlaceholder) {
440                 outs.add(new PlaceAndType(j + 1, ((OutParameterPlaceholder) p).type()));
441                 if (outs.size() == outCount) {
442                     break;
443                 }
444             }
445         }
446         st.execute();
447         return outs;
448     }
449 
450     private static <T> Flowable<T> createFlowable(NamedCallableStatement stmt,
451             Function<? super ResultSet, ? extends T> f) throws SQLException {
452         ResultSet rsActual = stmt.stmt.getResultSet();
453         Callable<ResultSet> initialState = () -> rsActual;
454         BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
455             log.debug("getting row from ps={}, rs={}", stmt.stmt, rs);
456             if (rs.next()) {
457                 T v = f.apply(rs);
458                 log.debug("emitting {}", v);
459                 emitter.onNext(v);
460             } else {
461                 log.debug("completed");
462                 emitter.onComplete();
463             }
464         };
465         Consumer<ResultSet> disposeState = Util::closeSilently;
466         return Flowable.generate(initialState, generator, disposeState);
467     }
468 
469     private static List<Object> executeAndReturnOutputValues(List<ParameterPlaceholder> parameterPlaceholders,
470             NamedCallableStatement stmt, List<Object> parameters) throws SQLException {
471         List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, Integer.MAX_VALUE, stmt.stmt);
472         List<Object> list = new ArrayList<>(outs.size());
473         for (PlaceAndType p : outs) {
474             // TODO convert to a desired return type?
475             list.add(stmt.stmt.getObject(p.pos));
476         }
477         return list;
478     }
479 
480 }