View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import rx.Observable.Operator;
4   import rx.Subscriber;
5   import rx.functions.Action1;
6   
7   public final class OperatorDoOnNth<T> implements Operator<T, T> {
8   
9       public static <T> OperatorDoOnNth<T> create(Action1<? super T> action, int n) {
10          return new OperatorDoOnNth<T>(action, n);
11      }
12  
13      private final Action1<? super T> action;
14      private final int n;
15  
16      private OperatorDoOnNth(Action1<? super T> action, int n) {
17          this.action = action;
18          this.n = n;
19      }
20  
21      @Override
22      public Subscriber<? super T> call(final Subscriber<? super T> child) {
23          return new Subscriber<T>(child) {
24              int count;
25  
26              @Override
27              public void onCompleted() {
28                  child.onCompleted();
29              }
30  
31              @Override
32              public void onError(Throwable e) {
33                  child.onError(e);
34              }
35  
36              @Override
37              public void onNext(T t) {
38                  count++;
39                  if (count == n) {
40                      action.call(t);
41                  }
42                  child.onNext(t);
43              }
44  
45          };
46      }
47  
48  }