View Javadoc
1   package org.davidmoten.rx.jdbc;
2   
3   import java.sql.Connection;
4   import java.util.List;
5   import java.util.concurrent.atomic.AtomicReference;
6   
7   import javax.annotation.Nonnull;
8   
9   import io.reactivex.Flowable;
10  import io.reactivex.Single;
11  
12  public final class TransactedSelectBuilder implements DependsOn<TransactedSelectBuilder>, GetterTx {
13  
14      private final SelectBuilder selectBuilder;
15  
16      private boolean valuesOnly = false;
17  
18      private final Database db;
19  
20      TransactedSelectBuilder(SelectBuilder selectBuilder, Database db) {
21          this.selectBuilder = selectBuilder;
22          this.db = db;
23      }
24  
25      public TransactedSelectBuilder parameters(@Nonnull Flowable<List<Object>> parameters) {
26          selectBuilder.parameters(parameters);
27          return this;
28      }
29  
30      public TransactedSelectBuilder parameters(@Nonnull List<?> values) {
31          selectBuilder.parameters(values);
32          return this;
33      }
34  
35      public TransactedSelectBuilder parameter(@Nonnull String name, Object value) {
36          selectBuilder.parameter(name, value);
37          return this;
38      }
39  
40      public TransactedSelectBuilder parameters(@Nonnull Object... values) {
41          selectBuilder.parameters(values);
42          return this;
43      }
44  
45      public TransactedSelectBuilder parameter(Object value) {
46          return parameters(value);
47      }
48  
49      public TransactedSelectBuilder fetchSize(int size) {
50          selectBuilder.fetchSize(size);
51          return this;
52      }
53  
54      public TransactedSelectBuilder transactedValuesOnly() {
55          this.valuesOnly = true;
56          return this;
57      }
58  
59      @Override
60      public TransactedSelectBuilder dependsOn(@Nonnull Flowable<?> flowable) {
61          selectBuilder.dependsOn(flowable);
62          return this;
63      }
64  
65      public TransactedSelectBuilderValuesOnly valuesOnly() {
66          return new TransactedSelectBuilderValuesOnly(this, db);
67      }
68  
69      public static final class TransactedSelectBuilderValuesOnly implements Getter {
70          private final TransactedSelectBuilder b;
71          private final Database db;
72  
73          TransactedSelectBuilderValuesOnly(TransactedSelectBuilder b, Database db) {
74              this.b = b;
75              this.db = db;
76          }
77  
78          public <T> Flowable<T> get(@Nonnull ResultSetMapper<? extends T> function) {
79              return createFlowable(b.selectBuilder, function, db) //
80                      .flatMap(Tx.flattenToValuesOnly());
81          }
82  
83      }
84  
85      @Override
86      public <T> Flowable<Tx<T>> get(ResultSetMapper<? extends T> function) {
87          Flowable<Tx<T>> o = createFlowable(selectBuilder, function, db);
88          if (valuesOnly) {
89              return o.filter(tx -> tx.isValue());
90          } else {
91              return o;
92          }
93      }
94  
95      @SuppressWarnings("unchecked")
96      private static <T> Flowable<Tx<T>> createFlowable(SelectBuilder sb,
97              ResultSetMapper<? extends T> mapper, Database db) {
98          return (Flowable<Tx<T>>) (Flowable<?>) Flowable.defer(() -> {
99              //Select.<T>create(connection, pg, sql, fetchSize, mapper, true);
100             AtomicReference<Connection> connection = new AtomicReference<Connection>();
101             Single<Connection> con = sb.connection //
102                     .map(c -> Util.toTransactedConnection(connection, c));
103             return Select.create(con, //
104                     sb.parameterGroupsToFlowable(), //
105                     sb.sql, //
106                     sb.fetchSize, //
107                     mapper, //
108                     false, //
109                     sb.queryTimeoutSec) //
110                     .materialize() //
111                     .flatMap(n -> Tx.toTx(n, connection.get(), db)) //
112                     .doOnNext(tx -> {
113                         if (tx.isComplete()) {
114                             ((TxImpl<T>) tx).connection().commit();
115                         }
116                     });
117         });
118     }
119 
120 }