1 package com.github.davidmoten.rx.jdbc;
2
3 import rx.Observable;
4 import rx.Observable.Operator;
5 import rx.Subscriber;
6 import rx.functions.Func1;
7
8 import com.github.davidmoten.rx.RxUtil;
9
10
11
12
13 class QueryUpdateOperatorFromObservable<R> implements Operator<Observable<Integer>, Observable<R>> {
14
15 private final Operator<Observable<Integer>, Observable<R>> operator;
16
17
18
19
20
21
22
23 QueryUpdateOperatorFromObservable(final QueryUpdate.Builder builder) {
24 operator = RxUtil
25 .toOperator(new Func1<Observable<Observable<R>>, Observable<Observable<Integer>>>() {
26
27 @Override
28 public Observable<Observable<Integer>> call(Observable<Observable<R>> observable) {
29
30 return observable.map(new Func1<Observable<R>, Observable<Integer>>() {
31 @Override
32 public Observable<Integer> call(Observable<R> parameters) {
33 return builder.clearParameters().parameters(parameters).count();
34 }
35 });
36 }
37 });
38 }
39
40 @Override
41 public Subscriber<? super Observable<R>> call(Subscriber<? super Observable<Integer>> subscriber) {
42 return operator.call(subscriber);
43 }
44 }