1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.util.concurrent.atomic.AtomicLong;
4
5 import org.reactivestreams.Subscriber;
6 import org.reactivestreams.Subscription;
7
8 import io.reactivex.Flowable;
9 import io.reactivex.internal.subscriptions.SubscriptionHelper;
10 import io.reactivex.internal.util.BackpressureHelper;
11
12 public final class FlowableRepeat<T> extends Flowable<T> {
13
14 private final T value;
15 private final long count;
16
17 public FlowableRepeat(T value, long count) {
18 this.value = value;
19 this.count = count;
20 }
21
22 @Override
23 protected void subscribeActual(org.reactivestreams.Subscriber<? super T> child) {
24 RepeatSubscription<T> sub = new RepeatSubscription<T>(child, value, count);
25 child.onSubscribe(sub);
26 }
27
28 @SuppressWarnings("serial")
29 private static class RepeatSubscription<T> extends AtomicLong implements Subscription {
30
31 private final Subscriber<? super T> child;
32 private final T value;
33 private final long count;
34
35 private volatile boolean cancelled;
36 private long counter;
37
38 RepeatSubscription(Subscriber<? super T> child, T value, long count) {
39 this.child = child;
40 this.value = value;
41 this.count = count;
42 this.counter = count;
43 }
44
45 @Override
46 public void request(long n) {
47 if (SubscriptionHelper.validate(n)) {
48 if (BackpressureHelper.add(this, n) == 0) {
49 long requested = n;
50 long emitted = 0;
51 do {
52 emitted = requested;
53 while (requested-- > 0 && !cancelled && (count == -1 || counter-- > 0)) {
54 child.onNext(value);
55 }
56 } while ((requested = this.addAndGet(-emitted)) > 0);
57 if (count >= 0 && !cancelled) {
58 child.onComplete();
59 }
60 }
61 }
62 }
63
64 @Override
65 public void cancel() {
66 this.cancelled = true;
67 }
68
69 }
70
71 }