View Javadoc
1   package com.github.davidmoten.rx.subjects;
2   
3   import java.util.concurrent.atomic.AtomicReference;
4   
5   import rx.Subscriber;
6   import rx.subjects.Subject;
7   
8   /**
9    * A {@link Subject} that supports a maximum of one {@link Subscriber}. When
10   * there is no subscriber any notifications (<code>onNext</code>,
11   * <code>onError</code>, <code>onCompleted</code>) are ignored.
12   * 
13   * @param <T>
14   *            type of items being emitted/observed by this subject
15   */
16  public final class PublishSubjectSingleSubscriber<T> extends Subject<T, T> {
17  
18      // Visible for testing
19      static final String ONLY_ONE_SUBSCRIPTION_IS_ALLOWED = "only one subscription is allowed";
20  
21      private final SingleSubscribeOnSubscribe<T> onSubscribe;
22  
23      private PublishSubjectSingleSubscriber(SingleSubscribeOnSubscribe<T> onSubscribe) {
24          super(onSubscribe);
25          this.onSubscribe = onSubscribe;
26      }
27  
28      private PublishSubjectSingleSubscriber() {
29          this(new SingleSubscribeOnSubscribe<T>());
30      }
31  
32      /**
33       * Returns a new instance of a {@link PublishSubjectSingleSubscriber}.
34       * 
35       * @return the new instance
36       * @param <T>
37       *            type of items being emitted/observed by this subject
38       */
39      public static <T> PublishSubjectSingleSubscriber<T> create() {
40          return new PublishSubjectSingleSubscriber<T>();
41      }
42  
43      @Override
44      public void onCompleted() {
45      	Subscriber<? super T> sub = onSubscribe.subscriber.get();
46          if (sub != null) {
47              sub.onCompleted();
48          }
49      }
50  
51      @Override
52      public void onError(Throwable e) {
53      	Subscriber<? super T> sub = onSubscribe.subscriber.get();
54          if (sub != null) {
55              sub.onError(e);
56          }
57      }
58  
59      @Override
60      public void onNext(T t) {
61          Subscriber<? super T> sub = onSubscribe.subscriber.get();
62          if (sub != null) {
63              sub.onNext(t);
64          }
65      }
66  
67      private static class SingleSubscribeOnSubscribe<T> implements OnSubscribe<T> {
68  
69          final AtomicReference<Subscriber<? super T>> subscriber = new AtomicReference<Subscriber<? super T>>();
70  
71          @Override
72          public void call(Subscriber<? super T> sub) {
73              if (!subscriber.compareAndSet(null, sub))
74                  throw new RuntimeException(ONLY_ONE_SUBSCRIPTION_IS_ALLOWED);
75          }
76  
77      }
78  
79      @Override
80      public boolean hasObservers() {
81          return onSubscribe.subscriber.get() != null;
82      }
83  
84  }