1 package com.github.davidmoten.rx.internal.operators;
2
3 import rx.Observable.Operator;
4 import rx.Subscriber;
5 import rx.functions.Action1;
6
7 public final class OperatorDoOnNth<T> implements Operator<T, T> {
8
9 public static <T> OperatorDoOnNth<T> create(Action1<? super T> action, int n) {
10 return new OperatorDoOnNth<T>(action, n);
11 }
12
13 private final Action1<? super T> action;
14 private final int n;
15
16 private OperatorDoOnNth(Action1<? super T> action, int n) {
17 this.action = action;
18 this.n = n;
19 }
20
21 @Override
22 public Subscriber<? super T> call(final Subscriber<? super T> child) {
23 return new Subscriber<T>(child) {
24 int count;
25
26 @Override
27 public void onCompleted() {
28 child.onCompleted();
29 }
30
31 @Override
32 public void onError(Throwable e) {
33 child.onError(e);
34 }
35
36 @Override
37 public void onNext(T t) {
38 count++;
39 if (count == n) {
40 action.call(t);
41 }
42 child.onNext(t);
43 }
44
45 };
46 }
47
48 }