View Javadoc
1   package com.github.davidmoten.rx.jdbc;
2   
3   import java.sql.Connection;
4   import java.sql.PreparedStatement;
5   import java.sql.ResultSet;
6   import java.sql.SQLException;
7   import java.util.concurrent.atomic.AtomicLong;
8   
9   import org.slf4j.Logger;
10  import org.slf4j.LoggerFactory;
11  
12  import rx.Producer;
13  import rx.Subscriber;
14  
15  import com.github.davidmoten.rx.RxUtil;
16  
17  class QuerySelectProducer<T> implements Producer {
18  
19      private static final Logger log = LoggerFactory.getLogger(QuerySelectProducer.class);
20  
21      private final ResultSetMapper<? extends T> function;
22      private final Subscriber<? super T> subscriber;
23      private final Connection con;
24      private final PreparedStatement ps;
25      private final ResultSet rs;
26      private volatile boolean keepGoing = true;
27  
28      private final AtomicLong requested = new AtomicLong(0);
29  
30      QuerySelectProducer(ResultSetMapper<? extends T> function, Subscriber<? super T> subscriber,
31              Connection con, PreparedStatement ps, ResultSet rs) {
32          this.function = function;
33          this.subscriber = subscriber;
34          this.con = con;
35          this.ps = ps;
36          this.rs = rs;
37      }
38  
39      @Override
40      public void request(long n) {
41          if (requested.get() == Long.MAX_VALUE)
42              // already started with fast path
43              return;
44          else if (n == Long.MAX_VALUE && requested.compareAndSet(0, Long.MAX_VALUE)) {
45              requestAll();
46          } else if (n > 0) {
47              requestSome(n);
48          }
49      }
50  
51      private void requestAll() {
52          // fast path
53          try {
54              while (keepGoing) {
55                  processRow(subscriber);
56              }
57              closeQuietly();
58              complete(subscriber);
59          } catch (Exception e) {
60              closeAndHandleException(e);
61          }
62      }
63  
64      private void requestSome(long n) {
65          // back pressure path
66          // this algorithm copied generally from OnSubscribeFromIterable.java
67          long previousCount = RxUtil.getAndAddRequest(requested, n);
68          if (previousCount == 0) {
69              try {
70                  while (true) {
71                      long r = requested.get();
72                      long numToEmit = r;
73  
74                      while (keepGoing && --numToEmit >= 0) {
75                          processRow(subscriber);
76                      }
77                      if (keepGoing) {
78                          if (requested.addAndGet(-r) == 0) {
79                              return;
80                          }
81                      } else {
82                          closeQuietly();
83                          complete(subscriber);
84                          return;
85                      }
86                  }
87              } catch (Exception e) {
88                  closeAndHandleException(e);
89              }
90          }
91      }
92  
93      private void closeAndHandleException(Exception e) {
94          try {
95              closeQuietly();
96          } finally {
97              handleException(e, subscriber);
98          }
99      }
100 
101     /**
102      * Processes each row of the {@link ResultSet}.
103      * 
104      * @param subscriber
105      * 
106      * @throws SQLException
107      */
108     private void processRow(Subscriber<? super T> subscriber) throws SQLException {
109         checkSubscription(subscriber);
110         if (!keepGoing)
111             return;
112         if (rs.next()) {
113             log.trace("onNext");
114             subscriber.onNext(function.call(rs));
115         } else
116             keepGoing = false;
117     }
118 
119     /**
120      * Tells observer that stream is complete and closes resources.
121      * 
122      * @param subscriber
123      */
124     private void complete(Subscriber<? super T> subscriber) {
125         if (subscriber.isUnsubscribed()) {
126             log.debug("unsubscribed");
127         } else {
128             log.debug("onCompleted");
129             subscriber.onCompleted();
130         }
131     }
132 
133     /**
134      * Tells observer about exception.
135      * 
136      * @param e
137      * @param subscriber
138      */
139     private void handleException(Exception e, Subscriber<? super T> subscriber) {
140         log.debug("onError: " + e.getMessage());
141         if (subscriber.isUnsubscribed())
142             log.debug("unsubscribed");
143         else {
144             subscriber.onError(e);
145         }
146     }
147 
148     /**
149      * Closes connection resources (connection, prepared statement and result
150      * set).
151      */
152     private void closeQuietly() {
153         log.debug("closing rs");
154         Util.closeQuietly(rs);
155         log.debug("closing ps");
156         Util.closeQuietly(ps);
157         log.debug("closing con");
158         Util.closeQuietlyIfAutoCommit(con);
159         log.debug("closed");
160     }
161 
162     /**
163      * If subscribe unsubscribed sets keepGoing to false.
164      * 
165      * @param subscriber
166      */
167     private void checkSubscription(Subscriber<? super T> subscriber) {
168         if (subscriber.isUnsubscribed()) {
169             keepGoing = false;
170             log.debug("unsubscribing");
171         }
172     }
173 
174 }