1 package org.davidmoten.rx.jdbc;
2
3 import java.sql.Connection;
4 import java.util.List;
5 import java.util.concurrent.atomic.AtomicReference;
6
7 import javax.annotation.Nonnull;
8
9 import io.reactivex.Flowable;
10 import io.reactivex.Single;
11
12 public final class TransactedSelectBuilder implements DependsOn<TransactedSelectBuilder>, GetterTx {
13
14 private final SelectBuilder selectBuilder;
15
16 private boolean valuesOnly = false;
17
18 private final Database db;
19
20 TransactedSelectBuilder(SelectBuilder selectBuilder, Database db) {
21 this.selectBuilder = selectBuilder;
22 this.db = db;
23 }
24
25 public TransactedSelectBuilder parameters(@Nonnull Flowable<List<Object>> parameters) {
26 selectBuilder.parameters(parameters);
27 return this;
28 }
29
30 public TransactedSelectBuilder parameters(@Nonnull List<?> values) {
31 selectBuilder.parameters(values);
32 return this;
33 }
34
35 public TransactedSelectBuilder parameter(@Nonnull String name, Object value) {
36 selectBuilder.parameter(name, value);
37 return this;
38 }
39
40 public TransactedSelectBuilder parameters(@Nonnull Object... values) {
41 selectBuilder.parameters(values);
42 return this;
43 }
44
45 public TransactedSelectBuilder parameter(Object value) {
46 return parameters(value);
47 }
48
49 public TransactedSelectBuilder fetchSize(int size) {
50 selectBuilder.fetchSize(size);
51 return this;
52 }
53
54 public TransactedSelectBuilder transactedValuesOnly() {
55 this.valuesOnly = true;
56 return this;
57 }
58
59 @Override
60 public TransactedSelectBuilder dependsOn(@Nonnull Flowable<?> flowable) {
61 selectBuilder.dependsOn(flowable);
62 return this;
63 }
64
65 public TransactedSelectBuilderValuesOnly valuesOnly() {
66 return new TransactedSelectBuilderValuesOnly(this, db);
67 }
68
69 public static final class TransactedSelectBuilderValuesOnly implements Getter {
70 private final TransactedSelectBuilder b;
71 private final Database db;
72
73 TransactedSelectBuilderValuesOnly(TransactedSelectBuilder b, Database db) {
74 this.b = b;
75 this.db = db;
76 }
77
78 public <T> Flowable<T> get(@Nonnull ResultSetMapper<? extends T> function) {
79 return createFlowable(b.selectBuilder, function, db)
80 .flatMap(Tx.flattenToValuesOnly());
81 }
82
83 }
84
85 @Override
86 public <T> Flowable<Tx<T>> get(ResultSetMapper<? extends T> function) {
87 Flowable<Tx<T>> o = createFlowable(selectBuilder, function, db);
88 if (valuesOnly) {
89 return o.filter(tx -> tx.isValue());
90 } else {
91 return o;
92 }
93 }
94
95 @SuppressWarnings("unchecked")
96 private static <T> Flowable<Tx<T>> createFlowable(SelectBuilder sb,
97 ResultSetMapper<? extends T> mapper, Database db) {
98 return (Flowable<Tx<T>>) (Flowable<?>) Flowable.defer(() -> {
99
100 AtomicReference<Connection> connection = new AtomicReference<Connection>();
101 Single<Connection> con = sb.connection
102 .map(c -> Util.toTransactedConnection(connection, c));
103 return Select.create(con,
104 sb.parameterGroupsToFlowable(),
105 sb.sql,
106 sb.fetchSize,
107 mapper,
108 false,
109 sb.queryTimeoutSec)
110 .materialize()
111 .flatMap(n -> Tx.toTx(n, connection.get(), db))
112 .doOnNext(tx -> {
113 if (tx.isComplete()) {
114 ((TxImpl<T>) tx).connection().commit();
115 }
116 });
117 });
118 }
119
120 }