1 package com.github.davidmoten.rx.internal.operators; 2 3 import java.util.concurrent.atomic.AtomicLong; 4 5 import rx.Observable.OnSubscribe; 6 import rx.Producer; 7 import rx.Subscriber; 8 import rx.internal.operators.BackpressureUtils; 9 10 public final class OnSubscribeRepeating<T> implements OnSubscribe<T> { 11 12 private final T value; 13 14 public OnSubscribeRepeating(T value) { 15 this.value = value; 16 } 17 18 @Override 19 public void call(Subscriber<? super T> subscriber) { 20 RepeatingProducer<T> producer = new RepeatingProducer<T>(subscriber, value); 21 subscriber.setProducer(producer); 22 } 23 24 @SuppressWarnings("serial") 25 private static final class RepeatingProducer<T> extends AtomicLong implements Producer { 26 27 private final Subscriber<? super T> subscriber; 28 private final T v; 29 30 public RepeatingProducer(Subscriber<? super T> subscriber, T v) { 31 this.subscriber = subscriber; 32 this.v = v; 33 } 34 35 @Override 36 public void request(long n) { 37 if (n < 0) { 38 throw new IllegalArgumentException("reuest must be >=0"); 39 } else if (n == 0) { 40 return; 41 } else if (BackpressureUtils.getAndAddRequest(this, n) == 0) { 42 long requested = n; 43 long emitted = 0; 44 do { 45 emitted = requested; 46 while (requested-- > 0 && !subscriber.isUnsubscribed()) { 47 subscriber.onNext(v); 48 } 49 } while ((requested = this.addAndGet(-emitted)) > 0); 50 } 51 } 52 53 } 54 55 }