View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.util.concurrent.atomic.AtomicLong;
4   
5   import org.reactivestreams.Subscriber;
6   import org.reactivestreams.Subscription;
7   
8   import io.reactivex.Flowable;
9   import io.reactivex.internal.subscriptions.SubscriptionHelper;
10  import io.reactivex.internal.util.BackpressureHelper;
11  
12  public final class FlowableRepeat<T> extends Flowable<T> {
13  
14      private final T value;
15      private final long count;
16  
17      public FlowableRepeat(T value, long count) {
18          this.value = value;
19          this.count = count;
20      }
21  
22      @Override
23      protected void subscribeActual(org.reactivestreams.Subscriber<? super T> child) {
24          RepeatSubscription<T> sub = new RepeatSubscription<T>(child, value, count);
25          child.onSubscribe(sub);
26      }
27  
28      @SuppressWarnings("serial")
29      private static class RepeatSubscription<T> extends AtomicLong implements Subscription {
30  
31          private final Subscriber<? super T> child;
32          private final T value;
33          private final long count;
34  
35          private volatile boolean cancelled;
36          private long counter;
37  
38          RepeatSubscription(Subscriber<? super T> child, T value, long count) {
39              this.child = child;
40              this.value = value;
41              this.count = count;
42              this.counter = count;
43          }
44  
45          @Override
46          public void request(long n) {
47              if (SubscriptionHelper.validate(n)) {
48                  if (BackpressureHelper.add(this, n) == 0) {
49                      long requested = n;
50                      long emitted = 0;
51                      do {
52                          emitted = requested;
53                          while (requested-- > 0 && !cancelled && (count == -1 || counter-- > 0)) {
54                              child.onNext(value);
55                          }
56                      } while ((requested = this.addAndGet(-emitted)) > 0);
57                      if (count >= 0 && !cancelled) {
58                          child.onComplete();
59                      }
60                  }
61              }
62          }
63  
64          @Override
65          public void cancel() {
66              this.cancelled = true;
67          }
68  
69      }
70  
71  }