1 package com.github.davidmoten.rx.internal.operators; 2 3 import java.util.concurrent.atomic.AtomicInteger; 4 5 import com.github.davidmoten.rx.exceptions.TooManySubscribersException; 6 7 import rx.Observable; 8 import rx.Observable.Transformer; 9 import rx.functions.Action0; 10 11 public final class TransformerLimitSubscribers<T> implements Transformer<T, T> { 12 13 private final AtomicInteger subscriberCount; 14 private final int maxSubscribers; 15 16 public TransformerLimitSubscribers(AtomicInteger subscriberCount, int maxSubscribers) { 17 this.subscriberCount = subscriberCount; 18 this.maxSubscribers = maxSubscribers; 19 } 20 21 @Override 22 public Observable<T> call(Observable<T> o) { 23 return o.doOnSubscribe(onSubscribe()).doOnUnsubscribe(onUnsubscribe()); 24 } 25 26 private Action0 onSubscribe() { 27 return new Action0() { 28 29 @Override 30 public void call() { 31 if (subscriberCount.incrementAndGet() > maxSubscribers) 32 throw new TooManySubscribersException(); 33 } 34 }; 35 } 36 37 private Action0 onUnsubscribe() { 38 return new Action0() { 39 40 @Override 41 public void call() { 42 subscriberCount.decrementAndGet(); 43 } 44 }; 45 } 46 47 }