1 package org.davidmoten.rxjava3.jdbc;
2
3 import java.sql.CallableStatement;
4 import java.sql.Connection;
5 import java.sql.PreparedStatement;
6 import java.sql.ResultSet;
7 import java.sql.SQLException;
8 import java.sql.Statement;
9 import java.util.ArrayList;
10 import java.util.List;
11
12 import org.davidmoten.rxjava3.jdbc.callable.CallableResultSet1;
13 import org.davidmoten.rxjava3.jdbc.callable.CallableResultSet2;
14 import org.davidmoten.rxjava3.jdbc.callable.CallableResultSet3;
15 import org.davidmoten.rxjava3.jdbc.callable.CallableResultSet4;
16 import org.davidmoten.rxjava3.jdbc.callable.CallableResultSetN;
17 import org.davidmoten.rxjava3.jdbc.callable.internal.InParameterPlaceholder;
18 import org.davidmoten.rxjava3.jdbc.callable.internal.OutParameterPlaceholder;
19 import org.davidmoten.rxjava3.jdbc.callable.internal.ParameterPlaceholder;
20 import org.davidmoten.rxjava3.jdbc.internal.Functions;
21 import org.davidmoten.rxjava3.jdbc.tuple.Tuple2;
22 import org.davidmoten.rxjava3.jdbc.tuple.Tuple3;
23 import org.davidmoten.rxjava3.jdbc.tuple.Tuple4;
24 import org.davidmoten.rxjava3.jdbc.tuple.TupleN;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import com.github.davidmoten.guavamini.Lists;
29
30 import io.reactivex.rxjava3.core.Emitter;
31 import io.reactivex.rxjava3.core.Flowable;
32 import io.reactivex.rxjava3.core.Notification;
33 import io.reactivex.rxjava3.core.Single;
34 import io.reactivex.rxjava3.functions.BiConsumer;
35 import io.reactivex.rxjava3.functions.BiFunction;
36 import io.reactivex.rxjava3.functions.Consumer;
37 import io.reactivex.rxjava3.functions.Function;
38 import io.reactivex.rxjava3.functions.Supplier;
39
40 final class Call {
41
42 private static final Logger log = LoggerFactory.getLogger(Call.class);
43
44 private Call() {
45
46 }
47
48
49
50
51
52 static Flowable<Integer> createWithZeroOutParameters(Single<Connection> connection, String sql,
53 Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders) {
54 return connection.toFlowable()
55 .flatMap(con -> Call.<Integer>createWithParameters(con, sql, parameterGroups, parameterPlaceholders,
56 (stmt, parameters) -> createWithZeroOutParameters(stmt, parameters, parameterPlaceholders)))
57 .dematerialize(Functions.identity());
58 }
59
60 private static Single<Integer> createWithZeroOutParameters(NamedCallableStatement stmt, List<Object> parameters,
61 List<ParameterPlaceholder> parameterPlaceholders) {
62 return Single.fromCallable(() -> {
63 CallableStatement st = stmt.stmt;
64 execute(stmt, parameters, parameterPlaceholders, 0, st);
65 return 1;
66 });
67 }
68
69
70
71
72
73 static <T1> Flowable<Notification<T1>> createWithOneOutParameter(Single<Connection> connection, String sql,
74 Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls) {
75 return connection.toFlowable()
76 .flatMap(con -> createWithParameters(con, sql, parameterGroups, parameterPlaceholders,
77 (stmt, parameters) -> createWithOneParameter(stmt, parameters, parameterPlaceholders, cls)));
78 }
79
80 private static <T> Single<T> createWithOneParameter(NamedCallableStatement stmt, List<Object> parameters,
81 List<ParameterPlaceholder> parameterPlaceholders, Class<T> cls1) {
82 return Single.fromCallable(() -> {
83 CallableStatement st = stmt.stmt;
84 List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, 2, st);
85 return Util.mapObject(st, cls1, outs.get(0).pos, outs.get(0).type);
86 });
87 }
88
89
90
91
92
93 static <T1, T2> Flowable<Notification<Tuple2<T1, T2>>> createWithTwoOutParameters(Single<Connection> connection,
94 String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
95 Class<T1> cls1, Class<T2> cls2) {
96 return connection.toFlowable().flatMap(con -> createWithParameters(con, sql, parameterGroups,
97 parameterPlaceholders,
98 (stmt, parameters) -> createWithTwoParameters(stmt, parameters, parameterPlaceholders, cls1, cls2)));
99 }
100
101 private static <T1, T2> Single<Tuple2<T1, T2>> createWithTwoParameters(NamedCallableStatement stmt,
102 List<Object> parameters, List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2) {
103 return Single.fromCallable(() -> {
104 CallableStatement st = stmt.stmt;
105 List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, 2, st);
106 T1 o1 = Util.mapObject(st, cls1, outs.get(0).pos, outs.get(0).type);
107 T2 o2 = Util.mapObject(st, cls2, outs.get(1).pos, outs.get(1).type);
108 return Tuple2.create(o1, o2);
109 });
110 }
111
112 private static final class PlaceAndType {
113 final int pos;
114 final Type type;
115
116 PlaceAndType(int pos, Type type) {
117 this.pos = pos;
118 this.type = type;
119 }
120
121 }
122
123
124
125
126
127 static <T1, T2, T3> Flowable<Notification<Tuple3<T1, T2, T3>>> createWithThreeOutParameters(
128 Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
129 List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2, Class<T3> cls3) {
130 return connection.toFlowable()
131 .flatMap(con -> createWithParameters(con, sql, parameterGroups, parameterPlaceholders,
132 (stmt, parameters) -> createWithThreeParameters(stmt, parameters, parameterPlaceholders, cls1,
133 cls2, cls3)));
134 }
135
136 private static <T1, T2, T3> Single<Tuple3<T1, T2, T3>> createWithThreeParameters(NamedCallableStatement stmt,
137 List<Object> parameters, List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2,
138 Class<T3> cls3) {
139 return Single.fromCallable(() -> {
140 CallableStatement st = stmt.stmt;
141 List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, 3, st);
142 T1 o1 = Util.mapObject(st, cls1, outs.get(0).pos, outs.get(0).type);
143 T2 o2 = Util.mapObject(st, cls2, outs.get(1).pos, outs.get(1).type);
144 T3 o3 = Util.mapObject(st, cls3, outs.get(2).pos, outs.get(2).type);
145 return Tuple3.create(o1, o2, o3);
146 });
147 }
148
149
150
151
152
153 static <T1, T2, T3, T4> Flowable<Notification<Tuple4<T1, T2, T3, T4>>> createWithFourOutParameters(
154 Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
155 List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2, Class<T3> cls3,
156 Class<T4> cls4) {
157 return connection.toFlowable()
158 .flatMap(con -> createWithParameters(con, sql, parameterGroups, parameterPlaceholders,
159 (stmt, parameters) -> createWithFourParameters(stmt, parameters, parameterPlaceholders, cls1,
160 cls2, cls3, cls4)));
161 }
162
163 private static <T1, T2, T3, T4> Single<Tuple4<T1, T2, T3, T4>> createWithFourParameters(NamedCallableStatement stmt,
164 List<Object> parameters, List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2,
165 Class<T3> cls3, Class<T4> cls4) {
166 return Single.fromCallable(() -> {
167 CallableStatement st = stmt.stmt;
168 List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, 4, st);
169 T1 o1 = Util.mapObject(st, cls1, outs.get(0).pos, outs.get(0).type);
170 T2 o2 = Util.mapObject(st, cls2, outs.get(1).pos, outs.get(1).type);
171 T3 o3 = Util.mapObject(st, cls3, outs.get(2).pos, outs.get(2).type);
172 T4 o4 = Util.mapObject(st, cls4, outs.get(3).pos, outs.get(3).type);
173 return Tuple4.create(o1, o2, o3, o4);
174 });
175 }
176
177
178
179
180
181 static Flowable<Notification<TupleN<Object>>> createWithNParameters(
182 Single<Connection> connection,
183 String sql,
184 Flowable<List<Object>> parameterGroups,
185 List<ParameterPlaceholder> parameterPlaceholders,
186 List<Class<?>> outClasses) {
187 return connection
188 .toFlowable()
189 .flatMap(
190 con -> createWithParameters(
191 con,
192 sql,
193 parameterGroups,
194 parameterPlaceholders,
195 (stmt, parameters) -> createWithNParameters(stmt, parameters, parameterPlaceholders,
196 outClasses)));
197 }
198
199 private static Single<TupleN<Object>> createWithNParameters(
200 NamedCallableStatement stmt,
201 List<Object> parameters,
202 List<ParameterPlaceholder> parameterPlaceholders,
203 List<Class<?>> outClasses) {
204 return Single.fromCallable(() -> {
205 CallableStatement st = stmt.stmt;
206 List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, Integer.MAX_VALUE, st);
207 Object[] outputs = new Object[outClasses.size()];
208 for (int i = 0; i < outClasses.size(); i++) {
209 outputs[i] = Util.mapObject(st, outClasses.get(i), outs.get(i).pos, outs.get(i).type);
210 }
211 return TupleN.create(outputs);
212 });
213 }
214
215
216
217
218
219 static <T1> Flowable<Notification<CallableResultSet1<T1>>> createWithOneResultSet(Single<Connection> connection,
220 String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
221 Function<? super ResultSet, ? extends T1> f1, int fetchSize) {
222 return connection.toFlowable().flatMap(
223 con -> createWithOneResultSet(con, sql, parameterGroups, parameterPlaceholders, f1, fetchSize));
224 }
225
226 private static <T1> Flowable<Notification<CallableResultSet1<T1>>> createWithOneResultSet(Connection con,
227 String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
228 Function<? super ResultSet, ? extends T1> f1, int fetchSize) {
229 log.debug("Update.create {}", sql);
230 Supplier<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, fetchSize, sql, parameterPlaceholders);
231 final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet1<T1>>>> flowableFactory =
232 stmt -> parameterGroups
233 .flatMap(parameters -> {
234 List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
235 parameters);
236 Flowable<T1> flowable1 = createFlowable(stmt, f1);
237 return Single.just(new CallableResultSet1<T1>(outputValues, flowable1)).toFlowable();
238 })
239 .materialize()
240 .doOnComplete(() -> Util.commit(stmt.stmt))
241 .doOnError(e -> Util.rollback(stmt.stmt));
242 Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
243 return Flowable.using(resourceFactory, flowableFactory, disposer, true);
244 }
245
246
247
248
249
250 static <T1, T2> Flowable<Notification<CallableResultSet2<T1, T2>>> createWithTwoResultSets(
251 Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
252 List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
253 Function<? super ResultSet, ? extends T2> f2, int fetchSize) {
254 return connection.toFlowable().flatMap(
255 con -> createWithTwoResultSets(con, sql, parameterGroups, parameterPlaceholders, f1, f2, fetchSize));
256 }
257
258 private static <T1, T2> Flowable<Notification<CallableResultSet2<T1, T2>>> createWithTwoResultSets(Connection con,
259 String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
260 Function<? super ResultSet, ? extends T1> f1, Function<? super ResultSet, ? extends T2> f2, int fetchSize) {
261 Supplier<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, fetchSize, sql, parameterPlaceholders);
262 final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet2<T1, T2>>>> flowableFactory =
263 stmt -> parameterGroups
264 .flatMap(parameters -> {
265 List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
266 parameters);
267 final Flowable<T1> flowable1 = createFlowable(stmt, f1);
268 stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
269 final Flowable<T2> flowable2 = createFlowable(stmt, f2);
270 return Single.just(new CallableResultSet2<T1, T2>(outputValues, flowable1, flowable2))
271 .toFlowable();
272 })
273 .materialize()
274 .doOnComplete(() -> Util.commit(stmt.stmt))
275 .doOnError(e -> Util.rollback(stmt.stmt));
276 Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
277 return Flowable.using(resourceFactory, flowableFactory, disposer, true);
278 }
279
280
281
282
283
284 static <T1, T2, T3> Flowable<Notification<CallableResultSet3<T1, T2, T3>>> createWithThreeResultSets(
285 Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
286 List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
287 Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3, int fetchSize) {
288 return connection.toFlowable().flatMap(con -> createWithThreeResultSets(con, sql, parameterGroups,
289 parameterPlaceholders, f1, f2, f3, fetchSize));
290 }
291
292 private static <T1, T2, T3> Flowable<Notification<CallableResultSet3<T1, T2, T3>>> createWithThreeResultSets(
293 Connection con, String sql, Flowable<List<Object>> parameterGroups,
294 List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
295 Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3, int fetchSize) {
296 Supplier<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, fetchSize, sql, parameterPlaceholders);
297 final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet3<T1, T2, T3>>>> flowableFactory =
298 stmt -> parameterGroups
299 .flatMap(parameters -> {
300 List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
301 parameters);
302 final Flowable<T1> flowable1 = createFlowable(stmt, f1);
303 stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
304 final Flowable<T2> flowable2 = createFlowable(stmt, f2);
305 stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
306 final Flowable<T3> flowable3 = createFlowable(stmt, f3);
307 return Single.just(
308 new CallableResultSet3<T1, T2, T3>(outputValues, flowable1, flowable2, flowable3))
309 .toFlowable();
310 })
311 .materialize()
312 .doOnComplete(() -> Util.commit(stmt.stmt))
313 .doOnError(e -> Util.rollback(stmt.stmt));
314 Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
315 return Flowable.using(resourceFactory, flowableFactory, disposer, true);
316 }
317
318
319
320
321 static <T1, T2, T3, T4> Flowable<Notification<CallableResultSet4<T1, T2, T3, T4>>> createWithFourResultSets(
322 Single<Connection> connection, String sql, Flowable<List<Object>> parameterGroups,
323 List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
324 Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3,
325 Function<? super ResultSet, ? extends T4> f4, int fetchSize) {
326 return connection.toFlowable().flatMap(con -> createWithFourResultSets(con, sql, parameterGroups,
327 parameterPlaceholders, f1, f2, f3, f4, fetchSize));
328 }
329
330 private static <T1, T2, T3, T4> Flowable<Notification<CallableResultSet4<T1, T2, T3, T4>>> createWithFourResultSets(
331 Connection con, String sql, Flowable<List<Object>> parameterGroups,
332 List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
333 Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3,
334 Function<? super ResultSet, ? extends T4> f4, int fetchSize) {
335 Supplier<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, fetchSize, sql, parameterPlaceholders);
336 final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet4<T1, T2, T3, T4>>>> flowableFactory =
337 stmt -> parameterGroups
338 .flatMap(parameters -> {
339 List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
340 parameters);
341 final Flowable<T1> flowable1 = createFlowable(stmt, f1);
342 stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
343 final Flowable<T2> flowable2 = createFlowable(stmt, f2);
344 stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
345 final Flowable<T3> flowable3 = createFlowable(stmt, f3);
346 stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
347 final Flowable<T4> flowable4 = createFlowable(stmt, f4);
348 return Single.just(new CallableResultSet4<T1, T2, T3, T4>(outputValues, flowable1,
349 flowable2, flowable3, flowable4)).toFlowable();
350 })
351 .materialize()
352 .doOnComplete(() -> Util.commit(stmt.stmt))
353 .doOnError(e -> Util.rollback(stmt.stmt));
354 Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
355 return Flowable.using(resourceFactory, flowableFactory, disposer, true);
356 }
357
358
359
360
361
362 static Flowable<Notification<CallableResultSetN>> createWithNResultSets(Single<Connection> connection, String sql,
363 Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
364 List<Function<? super ResultSet, ?>> functions, int fetchSize) {
365 return connection.toFlowable().flatMap(
366 con -> createWithNResultSets(con, sql, parameterGroups, parameterPlaceholders, functions, fetchSize));
367 }
368
369 private static Flowable<Notification<CallableResultSetN>> createWithNResultSets(Connection con, String sql,
370 Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
371 List<Function<? super ResultSet, ?>> functions, int fetchSize) {
372 Supplier<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, fetchSize, sql, parameterPlaceholders);
373 final Function<NamedCallableStatement, Flowable<Notification<CallableResultSetN>>> flowableFactory =
374 stmt -> parameterGroups
375 .flatMap(parameters -> {
376 List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
377 parameters);
378 List<Flowable<?>> flowables = Lists.newArrayList();
379 int i = 0;
380 do {
381 Function<? super ResultSet, ?> f = functions.get(i);
382 flowables.add(createFlowable(stmt, f));
383 i++;
384 } while (stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT));
385 return Single.just(new CallableResultSetN(outputValues, flowables)).toFlowable();
386 })
387 .materialize()
388 .doOnComplete(() -> Util.commit(stmt.stmt))
389 .doOnError(e -> Util.rollback(stmt.stmt));
390 Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
391 return Flowable.using(resourceFactory, flowableFactory, disposer, true);
392 }
393
394
395
396
397
398 private static <T> Flowable<Notification<T>> createWithParameters(Connection con, String sql,
399 Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
400 BiFunction<NamedCallableStatement, List<Object>, Single<T>> single) {
401 Supplier<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
402 final Function<NamedCallableStatement, Flowable<Notification<T>>> flowableFactory =
403 stmt -> parameterGroups
404 .flatMap(parameters -> single.apply(stmt, parameters).toFlowable())
405 .materialize()
406 .doOnComplete(() -> Util.commit(stmt.stmt))
407 .doOnError(e -> Util.rollback(stmt.stmt));
408 Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
409 return Flowable.using(resourceFactory, flowableFactory, disposer, true);
410 }
411
412 static PreparedStatement setParameters(PreparedStatement ps, List<Object> parameters,
413 List<ParameterPlaceholder> parameterPlaceholders, List<String> names) throws SQLException {
414
415 if (names.isEmpty()) {
416 int i = 0;
417 for (int j = 0; j < parameterPlaceholders.size() && i < parameters.size(); j++) {
418 ParameterPlaceholder p = parameterPlaceholders.get(j);
419 if (p instanceof InParameterPlaceholder) {
420 Util.setParameter(ps, j + 1, parameters.get(i));
421 i++;
422 }
423 }
424 } else {
425
426 throw new RuntimeException("named paramters not implemented yet for CallableStatement yet");
427
428 }
429 return ps;
430 }
431
432 private static List<PlaceAndType> execute(NamedCallableStatement stmt, List<Object> parameters,
433 List<ParameterPlaceholder> parameterPlaceholders, int outCount, CallableStatement st) throws SQLException {
434 Util.incrementCounter(st.getConnection());
435 setParameters(st, parameters, parameterPlaceholders, stmt.names);
436 int initialSize = outCount == Integer.MAX_VALUE ? 16 : outCount;
437 List<PlaceAndType> outs = new ArrayList<PlaceAndType>(initialSize);
438 for (int j = 0; j < parameterPlaceholders.size(); j++) {
439 ParameterPlaceholder p = parameterPlaceholders.get(j);
440 if (p instanceof OutParameterPlaceholder) {
441 outs.add(new PlaceAndType(j + 1, ((OutParameterPlaceholder) p).type()));
442 if (outs.size() == outCount) {
443 break;
444 }
445 }
446 }
447 st.execute();
448 return outs;
449 }
450
451 private static <T> Flowable<T> createFlowable(NamedCallableStatement stmt,
452 Function<? super ResultSet, ? extends T> f) throws SQLException {
453 ResultSet rsActual = stmt.stmt.getResultSet();
454 Supplier<ResultSet> initialState = () -> rsActual;
455 BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
456 log.debug("getting row from ps={}, rs={}", stmt.stmt, rs);
457 if (rs.next()) {
458 T v = f.apply(rs);
459 log.debug("emitting {}", v);
460 emitter.onNext(v);
461 } else {
462 log.debug("completed");
463 emitter.onComplete();
464 }
465 };
466 Consumer<ResultSet> disposeState = Util::closeSilently;
467 return Flowable.generate(initialState, generator, disposeState);
468 }
469
470 private static List<Object> executeAndReturnOutputValues(List<ParameterPlaceholder> parameterPlaceholders,
471 NamedCallableStatement stmt, List<Object> parameters) throws SQLException {
472 List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, Integer.MAX_VALUE, stmt.stmt);
473 List<Object> list = new ArrayList<>(outs.size());
474 for (PlaceAndType p : outs) {
475
476 list.add(stmt.stmt.getObject(p.pos));
477 }
478 return list;
479 }
480
481 }