1 package com.github.davidmoten.rx.jdbc;
2
3 import rx.Observable;
4 import rx.Observable.Operator;
5 import rx.Subscriber;
6 import rx.functions.Func1;
7
8 import com.github.davidmoten.rx.Transformers;
9 import com.github.davidmoten.rx.jdbc.QuerySelect.Builder;
10
11
12
13
14
15
16 final class QuerySelectOperator<T, R> implements Operator<T, R> {
17
18 private final Operator<T, R> operator;
19
20
21
22
23
24
25
26
27 QuerySelectOperator(final QuerySelect.Builder builder,
28 final ResultSetMapper<? extends T> function, final OperatorType operatorType) {
29 operator = Transformers.toOperator(new ApplyQuerySelect<R, T>(builder, function,
30 operatorType));
31 }
32
33 @Override
34 public Subscriber<? super R> call(Subscriber<? super T> subscriber) {
35 return operator.call(subscriber);
36 }
37
38 private static class ApplyQuerySelect<R, T> implements Func1<Observable<R>, Observable<T>> {
39
40 private Builder builder;
41 private ResultSetMapper<? extends T> function;
42 private OperatorType operatorType;
43
44 private ApplyQuerySelect(QuerySelect.Builder builder,
45 ResultSetMapper<? extends T> function, OperatorType operatorType) {
46 this.builder = builder;
47 this.function = function;
48 this.operatorType = operatorType;
49 }
50
51 @Override
52 public Observable<T> call(Observable<R> observable) {
53 if (operatorType == OperatorType.PARAMETER)
54 return builder.parameters(observable).get(function);
55 else if (operatorType == OperatorType.DEPENDENCY)
56
57 return builder.dependsOn(observable).get(function);
58 else
59 {
60 @SuppressWarnings("unchecked")
61 Observable<Observable<Object>> obs = (Observable<Observable<Object>>) observable;
62 return obs.concatMap(new Func1<Observable<Object>, Observable<T>>() {
63 @Override
64 public Observable<T> call(Observable<Object> parameters) {
65 return builder.parameters(parameters).get(function);
66 }
67 });
68 }
69 }
70
71 }
72 }