View Javadoc
1   package org.davidmoten.rx.jdbc;
2   
3   import java.sql.Connection;
4   import java.sql.ResultSet;
5   import java.util.List;
6   import java.util.concurrent.atomic.AtomicReference;
7   
8   import javax.annotation.Nonnull;
9   
10  import org.slf4j.Logger;
11  import org.slf4j.LoggerFactory;
12  
13  import io.reactivex.Flowable;
14  import io.reactivex.Single;
15  
16  public final class TransactedUpdateBuilder implements DependsOn<TransactedUpdateBuilder> {
17  
18      private static final Logger log = LoggerFactory.getLogger(TransactedUpdateBuilder.class);
19  
20      final UpdateBuilder updateBuilder;
21      private final Database db;
22      private boolean valuesOnly;
23  
24      TransactedUpdateBuilder(UpdateBuilder b, Database db) {
25          this.updateBuilder = b;
26          this.db = db;
27      }
28  
29      public TransactedUpdateBuilder parameterStream(@Nonnull Flowable<?> values) {
30          updateBuilder.parameterStream(values);
31          return this;
32      }
33  
34      public TransactedUpdateBuilder parameterListStream(@Nonnull Flowable<List<?>> valueLists) {
35          updateBuilder.parameterListStream(valueLists);
36          return this;
37      }
38  
39      public TransactedUpdateBuilder parameters(@Nonnull List<?> values) {
40          updateBuilder.parameters(values);
41          return this;
42      }
43  
44      public TransactedUpdateBuilder parameter(@Nonnull String name, Object value) {
45          updateBuilder.parameter(name, value);
46          return this;
47      }
48  
49      public TransactedUpdateBuilder parameter(Object value) {
50          return parameters(value);
51      }
52  
53      public TransactedUpdateBuilder parameters(@Nonnull Object... values) {
54          updateBuilder.parameters(values);
55          return this;
56      }
57  
58      @Override
59      public TransactedUpdateBuilder dependsOn(@Nonnull Flowable<?> dependency) {
60          updateBuilder.dependsOn(dependency);
61          return this;
62      }
63  
64      public TransactedUpdateBuilder batchSize(int batchSize) {
65          updateBuilder.batchSize(batchSize);
66          return this;
67      }
68  
69      /**
70       * Returns a builder used to specify how to process the generated keys
71       * {@link ResultSet}. Not all jdbc drivers support this functionality and some
72       * have limitations in their support (h2 for instance only returns the last
73       * generated key when multiple inserts happen in the one statement).
74       * 
75       * @return a builder used to specify how to process the generated keys ResultSet
76       */
77      public TransactedReturnGeneratedKeysBuilder returnGeneratedKeys() {
78          return new TransactedReturnGeneratedKeysBuilder(this, db);
79      }
80  
81      public TransactedUpdateBuilder transactedValuesOnly() {
82          this.valuesOnly = true;
83          return this;
84      }
85  
86      public TransactedUpdateBuilderValuesOnly valuesOnly() {
87          return new TransactedUpdateBuilderValuesOnly(this, db);
88      }
89  
90      public static final class TransactedUpdateBuilderValuesOnly {
91          private final TransactedUpdateBuilder b;
92          private final Database db;
93  
94          TransactedUpdateBuilderValuesOnly(TransactedUpdateBuilder b, Database db) {
95              this.b = b;
96              this.db = db;
97          }
98  
99          // TODO add other methods e.g. parameter setting methods? Lots of
100         // copy-and-paste not attractive here so may accept restricting
101         // functionality once valuesOnly() called
102 
103         public Flowable<Integer> counts() {
104             return createFlowable(b.updateBuilder, db) //
105                     .flatMap(Tx.flattenToValuesOnly());
106         }
107     }
108 
109     public Flowable<Tx<Integer>> counts() {
110         Flowable<Tx<Integer>> o = createFlowable(updateBuilder, db);
111         if (valuesOnly) {
112             return o.filter(tx -> tx.isValue());
113         } else {
114             return o;
115         }
116     }
117 
118     public Flowable<Integer> countsOnly() {
119         return valuesOnly().counts();
120     }
121 
122     @SuppressWarnings("unchecked")
123     public Flowable<Tx<?>> tx() {
124         return (Flowable<Tx<?>>) (Flowable<?>) createFlowable(updateBuilder, db) //
125                 .filter(x -> x.isValue());
126     }
127 
128     private static Flowable<Tx<Integer>> createFlowable(UpdateBuilder ub, Database db) {
129         return Flowable.defer(() -> {
130             log.debug("creating deferred flowable");
131             AtomicReference<Connection> connection = new AtomicReference<Connection>();
132             Single<Connection> con = ub.connections //
133                     .map(c -> Util.toTransactedConnection(connection, c));
134             TxImpl<?>[] t = new TxImpl[1];
135             return ub.startWithDependency( //
136                     Update.create(con, //
137                             ub.parameterGroupsToFlowable(), //
138                             ub.sql, //
139                             ub.batchSize, //
140                             false, //
141                             ub.queryTimeoutSec) //
142                             .flatMap(n -> Tx.toTx(n, connection.get(), db)) //
143                             .doOnNext(tx -> {
144                                 t[0] = ((TxImpl<Integer>) tx);
145                             }) //
146                             .doOnComplete(() -> {
147                                 TxImpl<?> tx = t[0];
148                                 if (tx.isComplete()) {
149                                     tx.connection().commit();
150                                 }
151                                 Util.closeSilently(tx.connection());
152                             }));
153         });
154     }
155 
156     public Flowable<List<Object>> parameterGroupsToFlowable() {
157         return updateBuilder.parameterGroupsToFlowable();
158     }
159 }