1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.util.concurrent.atomic.AtomicLong;
4
5 import rx.Observable.OnSubscribe;
6 import rx.Producer;
7 import rx.Subscriber;
8 import rx.internal.operators.BackpressureUtils;
9
10 public final class OnSubscribeRepeating<T> implements OnSubscribe<T> {
11
12 private final T value;
13
14 public OnSubscribeRepeating(T value) {
15 this.value = value;
16 }
17
18 @Override
19 public void call(Subscriber<? super T> subscriber) {
20 RepeatingProducer<T> producer = new RepeatingProducer<T>(subscriber, value);
21 subscriber.setProducer(producer);
22 }
23
24 @SuppressWarnings("serial")
25 private static final class RepeatingProducer<T> extends AtomicLong implements Producer {
26
27 private final Subscriber<? super T> subscriber;
28 private final T v;
29
30 public RepeatingProducer(Subscriber<? super T> subscriber, T v) {
31 this.subscriber = subscriber;
32 this.v = v;
33 }
34
35 @Override
36 public void request(long n) {
37 if (n < 0) {
38 throw new IllegalArgumentException("reuest must be >=0");
39 } else if (n == 0) {
40 return;
41 } else if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
42 long requested = n;
43 long emitted = 0;
44 do {
45 emitted = requested;
46 while (requested-- > 0 && !subscriber.isUnsubscribed()) {
47 subscriber.onNext(v);
48 }
49 } while ((requested = this.addAndGet(-emitted)) > 0);
50 }
51 }
52
53 }
54
55 }