1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.util.concurrent.TimeUnit;
4
5 import rx.Observable.Operator;
6 import rx.Scheduler;
7 import rx.Subscriber;
8
9
10
11
12
13 public final class OperatorSampleFirst<T> implements Operator<T, T> {
14
15 private final long windowDurationMs;
16 private final Scheduler scheduler;
17
18 private static long UNSET = Long.MIN_VALUE;
19
20 public OperatorSampleFirst(long windowDurationMs, TimeUnit unit, Scheduler scheduler) {
21 this.windowDurationMs = unit.toMillis(windowDurationMs);
22 this.scheduler = scheduler;
23 }
24
25 @Override
26 public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
27 return new Subscriber<T>(subscriber) {
28
29 private long nextWindowStartTime = UNSET;
30
31 @Override
32 public void onStart() {
33 request(Long.MAX_VALUE);
34 }
35
36 @Override
37 public void onNext(T t) {
38 long now = scheduler.now();
39 if (nextWindowStartTime == UNSET) {
40 nextWindowStartTime = now + windowDurationMs;
41 subscriber.onNext(t);
42 } else if (now >= nextWindowStartTime) {
43
44
45 long n = (now - nextWindowStartTime) / windowDurationMs + 1;
46 nextWindowStartTime += n * windowDurationMs;
47 subscriber.onNext(t);
48 }
49 }
50
51 @Override
52 public void onCompleted() {
53 subscriber.onCompleted();
54 }
55
56 @Override
57 public void onError(Throwable e) {
58 subscriber.onError(e);
59 }
60
61 };
62 }
63 }