View Javadoc
1   package com.github.davidmoten.rx.jdbc;
2   
3   import java.sql.ResultSet;
4   import java.sql.SQLException;
5   import java.util.List;
6   
7   import org.slf4j.Logger;
8   import org.slf4j.LoggerFactory;
9   
10  import rx.Observable;
11  import rx.Observable.OnSubscribe;
12  import rx.Subscriber;
13  import rx.functions.Action0;
14  import rx.subscriptions.Subscriptions;
15  
16  /**
17   * OnSubscribe create method for a select query.
18   */
19  final class QuerySelectOnSubscribe<T> implements OnSubscribe<T> {
20  
21      private static final Logger log = LoggerFactory.getLogger(QuerySelectOnSubscribe.class);
22  
23      /**
24       * Returns an Observable of the results of pushing one set of parameters
25       * through a select query.
26       * 
27       * @param params
28       *            one set of parameters to be run with the query
29       * @return
30       */
31      static <T> Observable<T> execute(QuerySelect query, List<Parameter> parameters,
32              ResultSetMapper<? extends T> function) {
33          return Observable.create(new QuerySelectOnSubscribe<T>(query, parameters, function));
34      }
35  
36      private final ResultSetMapper<? extends T> function;
37      private final QuerySelect query;
38      private final List<Parameter> parameters;
39      private final boolean stateProvided;
40  
41      /**
42       * Constructor.
43       * 
44       * @param query
45       * @param parameters
46       */
47      private QuerySelectOnSubscribe(QuerySelect query, List<Parameter> parameters,
48              ResultSetMapper<? extends T> function) {
49          this.query = query;
50          this.parameters = parameters;
51          this.function = function;
52          this.stateProvided = query.sql().equals(QuerySelect.RETURN_GENERATED_KEYS);
53      }
54  
55      @Override
56      public void call(Subscriber<? super T> subscriber) {
57          State state = null;
58          try {
59              if (stateProvided) {
60                  state = (State) parameters.get(0).value();
61                  setupUnsubscription(subscriber, state);
62              } else {
63                  state = new State();
64                  connectAndPrepareStatement(subscriber, state);
65                  setupUnsubscription(subscriber, state);
66                  executeQuery(subscriber, state);
67              }
68              subscriber.setProducer(new QuerySelectProducer<T>(function, subscriber, state.con,
69                      state.ps, state.rs));
70          } catch (Exception e) {
71              query.context().endTransactionObserve();
72              query.context().endTransactionSubscribe();
73              try {
74                  if (state != null)
75                      closeQuietly(state);
76              } finally {
77                  handleException(e, subscriber);
78              }
79          }
80      }
81  
82      private static <T> void setupUnsubscription(Subscriber<T> subscriber, final State state) {
83          subscriber.add(Subscriptions.create(new Action0() {
84              @Override
85              public void call() {
86                  closeQuietly(state);
87              }
88          }));
89      }
90  
91      /**
92       * Obtains connection, creates prepared statement and assigns parameters to
93       * the prepared statement.
94       * 
95       * @param subscriber
96       * @param state
97       * 
98       * @throws SQLException
99       */
100     private void connectAndPrepareStatement(Subscriber<? super T> subscriber, State state)
101             throws SQLException {
102         log.debug("connectionProvider={}", query.context().connectionProvider());
103         if (!subscriber.isUnsubscribed()) {
104             log.debug("getting connection");
105             state.con = query.context().connectionProvider().get();
106             log.debug("preparing statement,sql={}", query.sql());
107             state.ps = state.con.prepareStatement(query.sql(),ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
108             log.debug("setting parameters");
109             Util.setParameters(state.ps, parameters, query.names());
110         }
111     }
112 
113     
114 
115     /**
116      * Executes the prepared statement.
117      * 
118      * @param subscriber
119      * @param state
120      * 
121      * @throws SQLException
122      */
123     private void executeQuery(Subscriber<? super T> subscriber, State state) throws SQLException {
124         if (!subscriber.isUnsubscribed()) {
125             try {
126                 log.debug("executing ps");
127                 state.rs = state.ps.executeQuery();
128                 log.debug("executed ps={}", state.ps);
129             } catch (SQLException e) {
130                 throw new SQLException("failed to run sql=" + query.sql(), e);
131             }
132         }
133     }
134 
135     /**
136      * Tells observer about exception.
137      * 
138      * @param e
139      * @param subscriber
140      */
141     private void handleException(Exception e, Subscriber<? super T> subscriber) {
142         log.debug("onError: " + e.getMessage());
143         if (subscriber.isUnsubscribed())
144             log.debug("unsubscribed");
145         else {
146             subscriber.onError(e);
147         }
148     }
149 
150     /**
151      * Closes connection resources (connection, prepared statement and result
152      * set).
153      * 
154      * @param state
155      */
156     private static void closeQuietly(State state) {
157         // ensure only closed once and avoid race conditions
158         if (state.closed.compareAndSet(false, true)) {
159             // set the state fields to null after closing for garbage
160             // collection purposes
161             log.debug("closing rs");
162             Util.closeQuietly(state.rs);
163             log.debug("closing ps");
164             Util.closeQuietly(state.ps);
165             log.debug("closing con");
166             Util.closeQuietlyIfAutoCommit(state.con);
167             log.debug("closed");
168         }
169     }
170 
171 }