View Javadoc
1   package com.github.davidmoten.rx.jdbc;
2   
3   import rx.Observable;
4   import rx.Observable.Operator;
5   import rx.Subscriber;
6   import rx.functions.Func1;
7   
8   import com.github.davidmoten.rx.RxUtil;
9   
10  /**
11   * {@link Operator} corresonding to {@link QueryUpdateOperation}.
12   */
13  class QueryUpdateOperatorFromObservable<R> implements Operator<Observable<Integer>, Observable<R>> {
14  
15      private final Operator<Observable<Integer>, Observable<R>> operator;
16  
17      /**
18       * Constructor.
19       * 
20       * @param builder
21       * @param operatorType
22       */
23      QueryUpdateOperatorFromObservable(final QueryUpdate.Builder builder) {
24          operator = RxUtil
25                  .toOperator(new Func1<Observable<Observable<R>>, Observable<Observable<Integer>>>() {
26  
27                      @Override
28                      public Observable<Observable<Integer>> call(Observable<Observable<R>> observable) {
29  
30                          return observable.map(new Func1<Observable<R>, Observable<Integer>>() {
31                              @Override
32                              public Observable<Integer> call(Observable<R> parameters) {
33                                  return builder.clearParameters().parameters(parameters).count();
34                              }
35                          });
36                      }
37                  });
38      }
39  
40      @Override
41      public Subscriber<? super Observable<R>> call(Subscriber<? super Observable<Integer>> subscriber) {
42          return operator.call(subscriber);
43      }
44  }