View Javadoc
1   package com.github.davidmoten.rx.jdbc;
2   
3   import rx.Observable;
4   import rx.Observable.Operator;
5   import rx.Subscriber;
6   import rx.functions.Func1;
7   
8   import com.github.davidmoten.rx.Transformers;
9   import com.github.davidmoten.rx.jdbc.QuerySelect.Builder;
10  
11  /**
12   * Operator corresponding to the QuerySelectOperation.
13   * 
14   * @param <T>
15   */
16  final class QuerySelectOperator<T, R> implements Operator<T, R> {
17  
18      private final Operator<T, R> operator;
19  
20      /**
21       * Constructor.
22       * 
23       * @param builder
24       * @param function
25       * @param operatorType
26       */
27      QuerySelectOperator(final QuerySelect.Builder builder,
28              final ResultSetMapper<? extends T> function, final OperatorType operatorType) {
29          operator = Transformers.toOperator(new ApplyQuerySelect<R, T>(builder, function,
30                  operatorType));
31      }
32  
33      @Override
34      public Subscriber<? super R> call(Subscriber<? super T> subscriber) {
35          return operator.call(subscriber);
36      }
37  
38      private static class ApplyQuerySelect<R, T> implements Func1<Observable<R>, Observable<T>> {
39  
40          private Builder builder;
41          private ResultSetMapper<? extends T> function;
42          private OperatorType operatorType;
43  
44          private ApplyQuerySelect(QuerySelect.Builder builder,
45                  ResultSetMapper<? extends T> function, OperatorType operatorType) {
46              this.builder = builder;
47              this.function = function;
48              this.operatorType = operatorType;
49          }
50  
51          @Override
52          public Observable<T> call(Observable<R> observable) {
53              if (operatorType == OperatorType.PARAMETER)
54                  return builder.parameters(observable).get(function);
55              else if (operatorType == OperatorType.DEPENDENCY)
56                  // dependency
57                  return builder.dependsOn(observable).get(function);
58              else // PARAMETER_LIST
59              {
60                  @SuppressWarnings("unchecked")
61                  Observable<Observable<Object>> obs = (Observable<Observable<Object>>) observable;
62                  return obs.concatMap(new Func1<Observable<Object>, Observable<T>>() {
63                      @Override
64                      public Observable<T> call(Observable<Object> parameters) {
65                          return builder.parameters(parameters).get(function);
66                      }
67                  });
68              }
69          }
70  
71      }
72  }