1 package com.github.davidmoten.rx.subjects;
2
3 import java.util.concurrent.atomic.AtomicReference;
4
5 import rx.Subscriber;
6 import rx.subjects.Subject;
7
8
9
10
11
12
13
14
15
16 public final class PublishSubjectSingleSubscriber<T> extends Subject<T, T> {
17
18
19 static final String ONLY_ONE_SUBSCRIPTION_IS_ALLOWED = "only one subscription is allowed";
20
21 private final SingleSubscribeOnSubscribe<T> onSubscribe;
22
23 private PublishSubjectSingleSubscriber(SingleSubscribeOnSubscribe<T> onSubscribe) {
24 super(onSubscribe);
25 this.onSubscribe = onSubscribe;
26 }
27
28 private PublishSubjectSingleSubscriber() {
29 this(new SingleSubscribeOnSubscribe<T>());
30 }
31
32
33
34
35
36
37
38
39 public static <T> PublishSubjectSingleSubscriber<T> create() {
40 return new PublishSubjectSingleSubscriber<T>();
41 }
42
43 @Override
44 public void onCompleted() {
45 Subscriber<? super T> sub = onSubscribe.subscriber.get();
46 if (sub != null) {
47 sub.onCompleted();
48 }
49 }
50
51 @Override
52 public void onError(Throwable e) {
53 Subscriber<? super T> sub = onSubscribe.subscriber.get();
54 if (sub != null) {
55 sub.onError(e);
56 }
57 }
58
59 @Override
60 public void onNext(T t) {
61 Subscriber<? super T> sub = onSubscribe.subscriber.get();
62 if (sub != null) {
63 sub.onNext(t);
64 }
65 }
66
67 private static class SingleSubscribeOnSubscribe<T> implements OnSubscribe<T> {
68
69 final AtomicReference<Subscriber<? super T>> subscriber = new AtomicReference<Subscriber<? super T>>();
70
71 @Override
72 public void call(Subscriber<? super T> sub) {
73 if (!subscriber.compareAndSet(null, sub))
74 throw new RuntimeException(ONLY_ONE_SUBSCRIPTION_IS_ALLOWED);
75 }
76
77 }
78
79 @Override
80 public boolean hasObservers() {
81 return onSubscribe.subscriber.get() != null;
82 }
83
84 }