1 package com.github.davidmoten.rx.internal.operators; 2 3 import rx.Observable.Operator; 4 import rx.Subscriber; 5 import rx.functions.Action1; 6 7 public final class OperatorDoOnNth<T> implements Operator<T, T> { 8 9 public static <T> OperatorDoOnNth<T> create(Action1<? super T> action, int n) { 10 return new OperatorDoOnNth<T>(action, n); 11 } 12 13 private final Action1<? super T> action; 14 private final int n; 15 16 private OperatorDoOnNth(Action1<? super T> action, int n) { 17 this.action = action; 18 this.n = n; 19 } 20 21 @Override 22 public Subscriber<? super T> call(final Subscriber<? super T> child) { 23 return new Subscriber<T>(child) { 24 int count; 25 26 @Override 27 public void onCompleted() { 28 child.onCompleted(); 29 } 30 31 @Override 32 public void onError(Throwable e) { 33 child.onError(e); 34 } 35 36 @Override 37 public void onNext(T t) { 38 count++; 39 if (count == n) { 40 action.call(t); 41 } 42 child.onNext(t); 43 } 44 45 }; 46 } 47 48 }