1 package org.davidmoten.rx.jdbc;
2
3 import java.sql.Connection;
4 import java.sql.ResultSet;
5 import java.util.List;
6 import java.util.concurrent.atomic.AtomicReference;
7
8 import javax.annotation.Nonnull;
9
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 import io.reactivex.Flowable;
14 import io.reactivex.Single;
15
16 public final class TransactedUpdateBuilder implements DependsOn<TransactedUpdateBuilder> {
17
18 private static final Logger log = LoggerFactory.getLogger(TransactedUpdateBuilder.class);
19
20 final UpdateBuilder updateBuilder;
21 private final Database db;
22 private boolean valuesOnly;
23
24 TransactedUpdateBuilder(UpdateBuilder b, Database db) {
25 this.updateBuilder = b;
26 this.db = db;
27 }
28
29 public TransactedUpdateBuilder parameterStream(@Nonnull Flowable<?> values) {
30 updateBuilder.parameterStream(values);
31 return this;
32 }
33
34 public TransactedUpdateBuilder parameterListStream(@Nonnull Flowable<List<?>> valueLists) {
35 updateBuilder.parameterListStream(valueLists);
36 return this;
37 }
38
39 public TransactedUpdateBuilder parameters(@Nonnull List<?> values) {
40 updateBuilder.parameters(values);
41 return this;
42 }
43
44 public TransactedUpdateBuilder parameter(@Nonnull String name, Object value) {
45 updateBuilder.parameter(name, value);
46 return this;
47 }
48
49 public TransactedUpdateBuilder parameter(Object value) {
50 return parameters(value);
51 }
52
53 public TransactedUpdateBuilder parameters(@Nonnull Object... values) {
54 updateBuilder.parameters(values);
55 return this;
56 }
57
58 @Override
59 public TransactedUpdateBuilder dependsOn(@Nonnull Flowable<?> dependency) {
60 updateBuilder.dependsOn(dependency);
61 return this;
62 }
63
64 public TransactedUpdateBuilder batchSize(int batchSize) {
65 updateBuilder.batchSize(batchSize);
66 return this;
67 }
68
69
70
71
72
73
74
75
76
77 public TransactedReturnGeneratedKeysBuilder returnGeneratedKeys() {
78 return new TransactedReturnGeneratedKeysBuilder(this, db);
79 }
80
81 public TransactedUpdateBuilder transactedValuesOnly() {
82 this.valuesOnly = true;
83 return this;
84 }
85
86 public TransactedUpdateBuilderValuesOnly valuesOnly() {
87 return new TransactedUpdateBuilderValuesOnly(this, db);
88 }
89
90 public static final class TransactedUpdateBuilderValuesOnly {
91 private final TransactedUpdateBuilder b;
92 private final Database db;
93
94 TransactedUpdateBuilderValuesOnly(TransactedUpdateBuilder b, Database db) {
95 this.b = b;
96 this.db = db;
97 }
98
99
100
101
102
103 public Flowable<Integer> counts() {
104 return createFlowable(b.updateBuilder, db)
105 .flatMap(Tx.flattenToValuesOnly());
106 }
107 }
108
109 public Flowable<Tx<Integer>> counts() {
110 Flowable<Tx<Integer>> o = createFlowable(updateBuilder, db);
111 if (valuesOnly) {
112 return o.filter(tx -> tx.isValue());
113 } else {
114 return o;
115 }
116 }
117
118 public Flowable<Integer> countsOnly() {
119 return valuesOnly().counts();
120 }
121
122 @SuppressWarnings("unchecked")
123 public Flowable<Tx<?>> tx() {
124 return (Flowable<Tx<?>>) (Flowable<?>) createFlowable(updateBuilder, db)
125 .filter(x -> x.isValue());
126 }
127
128 private static Flowable<Tx<Integer>> createFlowable(UpdateBuilder ub, Database db) {
129 return Flowable.defer(() -> {
130 log.debug("creating deferred flowable");
131 AtomicReference<Connection> connection = new AtomicReference<Connection>();
132 Single<Connection> con = ub.connections
133 .map(c -> Util.toTransactedConnection(connection, c));
134 TxImpl<?>[] t = new TxImpl[1];
135 return ub.startWithDependency(
136 Update.create(con,
137 ub.parameterGroupsToFlowable(),
138 ub.sql,
139 ub.batchSize,
140 false,
141 ub.queryTimeoutSec)
142 .flatMap(n -> Tx.toTx(n, connection.get(), db))
143 .doOnNext(tx -> {
144 t[0] = ((TxImpl<Integer>) tx);
145 })
146 .doOnComplete(() -> {
147 TxImpl<?> tx = t[0];
148 if (tx.isComplete()) {
149 tx.connection().commit();
150 }
151 Util.closeSilently(tx.connection());
152 }));
153 });
154 }
155
156 public Flowable<List<Object>> parameterGroupsToFlowable() {
157 return updateBuilder.parameterGroupsToFlowable();
158 }
159 }