1 package com.github.davidmoten.rx.jdbc;
2
3 import rx.Observable;
4 import rx.Observable.Operator;
5 import rx.Subscriber;
6 import rx.functions.Func1;
7
8 import com.github.davidmoten.rx.Transformers;
9
10
11
12
13 final class QueryUpdateOperator<R> implements Operator<Integer, R> {
14
15 private final Operator<Integer, R> operator;
16
17
18
19
20
21
22
23 QueryUpdateOperator(final QueryUpdate.Builder builder, final OperatorType operatorType) {
24 operator = Transformers.toOperator(new Func1<Observable<R>, Observable<Integer>>() {
25 @Override
26 public Observable<Integer> call(Observable<R> observable) {
27 if (operatorType == OperatorType.PARAMETER)
28 return builder.parameters(observable).count();
29 else if (operatorType == OperatorType.DEPENDENCY)
30
31 return builder.dependsOn(observable).count();
32 else
33 throw new RuntimeException("does not handle " + operatorType);
34 }
35 });
36 }
37
38 @Override
39 public Subscriber<? super R> call(Subscriber<? super Integer> subscriber) {
40 return operator.call(subscriber);
41 }
42 }