View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.util.concurrent.atomic.AtomicInteger;
4   
5   import com.github.davidmoten.rx.exceptions.TooManySubscribersException;
6   
7   import rx.Observable;
8   import rx.Observable.Transformer;
9   import rx.functions.Action0;
10  
11  public final class TransformerLimitSubscribers<T> implements Transformer<T, T> {
12  
13      private final AtomicInteger subscriberCount;
14      private final int maxSubscribers;
15  
16      public TransformerLimitSubscribers(AtomicInteger subscriberCount, int maxSubscribers) {
17          this.subscriberCount = subscriberCount;
18          this.maxSubscribers = maxSubscribers;
19      }
20  
21      @Override
22      public Observable<T> call(Observable<T> o) {
23          return o.doOnSubscribe(onSubscribe()).doOnUnsubscribe(onUnsubscribe());
24      }
25  
26      private Action0 onSubscribe() {
27          return new Action0() {
28  
29              @Override
30              public void call() {
31                  if (subscriberCount.incrementAndGet() > maxSubscribers)
32                      throw new TooManySubscribersException();
33              }
34          };
35      }
36  
37      private Action0 onUnsubscribe() {
38          return new Action0() {
39  
40              @Override
41              public void call() {
42                  subscriberCount.decrementAndGet();
43              }
44          };
45      }
46  
47  }