View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.util.concurrent.atomic.AtomicLong;
4   
5   import rx.Observable.OnSubscribe;
6   import rx.Producer;
7   import rx.Subscriber;
8   import rx.internal.operators.BackpressureUtils;
9   
10  public final class OnSubscribeRepeating<T> implements OnSubscribe<T> {
11  
12      private final T value;
13  
14      public OnSubscribeRepeating(T value) {
15          this.value = value;
16      }
17  
18      @Override
19      public void call(Subscriber<? super T> subscriber) {
20          RepeatingProducer<T> producer = new RepeatingProducer<T>(subscriber, value);
21          subscriber.setProducer(producer);
22      }
23  
24      @SuppressWarnings("serial")
25      private static final class RepeatingProducer<T> extends AtomicLong implements Producer {
26  
27          private final Subscriber<? super T> subscriber;
28          private final T v;
29  
30          public RepeatingProducer(Subscriber<? super T> subscriber, T v) {
31              this.subscriber = subscriber;
32              this.v = v;
33          }
34  
35          @Override
36          public void request(long n) {
37              if (n < 0) {
38                  throw new IllegalArgumentException("reuest must be >=0");
39              } else if (n == 0) {
40                  return;
41              } else if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
42                  long requested = n;
43                  long emitted = 0;
44                  do {
45                      emitted = requested;
46                      while (requested-- > 0 && !subscriber.isUnsubscribed()) {
47                          subscriber.onNext(v);
48                      }
49                  } while ((requested = this.addAndGet(-emitted)) > 0);
50              }
51          }
52  
53      }
54  
55  }