View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.util.concurrent.TimeUnit;
4   
5   import rx.Observable.Operator;
6   import rx.Scheduler;
7   import rx.Subscriber;
8   
9   /**
10   * Throttle by windowing a stream and returning the first value in each window.
11   * @param <T> the value type
12   */
13  public final class OperatorSampleFirst<T> implements Operator<T, T> {
14  
15      private final long windowDurationMs;
16      private final Scheduler scheduler;
17  
18      private static long UNSET = Long.MIN_VALUE;
19  
20      public OperatorSampleFirst(long windowDurationMs, TimeUnit unit, Scheduler scheduler) {
21          this.windowDurationMs = unit.toMillis(windowDurationMs);
22          this.scheduler = scheduler;
23      }
24  
25      @Override
26      public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
27          return new Subscriber<T>(subscriber) {
28  
29              private long nextWindowStartTime = UNSET;
30  
31              @Override
32              public void onStart() {
33                  request(Long.MAX_VALUE);
34              }
35  
36              @Override
37              public void onNext(T t) {
38                  long now = scheduler.now();
39                  if (nextWindowStartTime == UNSET) {
40                      nextWindowStartTime = now + windowDurationMs;
41                      subscriber.onNext(t);
42                  } else if (now >= nextWindowStartTime) {
43                      // ensure that we advance the next window start time to just
44                      // beyond now
45                      long n = (now - nextWindowStartTime) / windowDurationMs + 1;
46                      nextWindowStartTime += n * windowDurationMs;
47                      subscriber.onNext(t);
48                  }
49              }
50  
51              @Override
52              public void onCompleted() {
53                  subscriber.onCompleted();
54              }
55  
56              @Override
57              public void onError(Throwable e) {
58                  subscriber.onError(e);
59              }
60  
61          };
62      }
63  }