View Javadoc
1   package com.github.davidmoten.rx.jdbc;
2   
3   import rx.Observable;
4   import rx.Observable.Operator;
5   import rx.Subscriber;
6   import rx.functions.Func1;
7   
8   import com.github.davidmoten.rx.Transformers;
9   
10  /**
11   * {@link Operator} corresonding to {@link QueryUpdateOperation}.
12   */
13  final class QueryUpdateOperator<R> implements Operator<Integer, R> {
14  
15      private final Operator<Integer, R> operator;
16  
17      /**
18       * Constructor.
19       * 
20       * @param builder
21       * @param operatorType
22       */
23      QueryUpdateOperator(final QueryUpdate.Builder builder, final OperatorType operatorType) {
24          operator = Transformers.toOperator(new Func1<Observable<R>, Observable<Integer>>() {
25              @Override
26              public Observable<Integer> call(Observable<R> observable) {
27                  if (operatorType == OperatorType.PARAMETER)
28                      return builder.parameters(observable).count();
29                  else if (operatorType == OperatorType.DEPENDENCY)
30                      // dependency
31                      return builder.dependsOn(observable).count();
32                  else
33                      throw new RuntimeException("does not handle " + operatorType);
34              }
35          });
36      }
37  
38      @Override
39      public Subscriber<? super R> call(Subscriber<? super Integer> subscriber) {
40          return operator.call(subscriber);
41      }
42  }