1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.util.concurrent.atomic.AtomicInteger;
4
5 import com.github.davidmoten.rx.exceptions.TooManySubscribersException;
6
7 import rx.Observable;
8 import rx.Observable.Transformer;
9 import rx.functions.Action0;
10
11 public final class TransformerLimitSubscribers<T> implements Transformer<T, T> {
12
13 private final AtomicInteger subscriberCount;
14 private final int maxSubscribers;
15
16 public TransformerLimitSubscribers(AtomicInteger subscriberCount, int maxSubscribers) {
17 this.subscriberCount = subscriberCount;
18 this.maxSubscribers = maxSubscribers;
19 }
20
21 @Override
22 public Observable<T> call(Observable<T> o) {
23 return o.doOnSubscribe(onSubscribe()).doOnUnsubscribe(onUnsubscribe());
24 }
25
26 private Action0 onSubscribe() {
27 return new Action0() {
28
29 @Override
30 public void call() {
31 if (subscriberCount.incrementAndGet() > maxSubscribers)
32 throw new TooManySubscribersException();
33 }
34 };
35 }
36
37 private Action0 onUnsubscribe() {
38 return new Action0() {
39
40 @Override
41 public void call() {
42 subscriberCount.decrementAndGet();
43 }
44 };
45 }
46
47 }