View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import rx.Observable.Operator;
4   import rx.Subscriber;
5   
6   public final class OperatorUnsubscribeEagerly<T> implements Operator<T, T> {
7   
8       private OperatorUnsubscribeEagerly() {
9           // no instantiation outside of this class
10      }
11  
12      private static final class Singleton {
13          private static final OperatorUnsubscribeEagerly<?> INSTANCE = new OperatorUnsubscribeEagerly<Object>();
14      }
15  
16      @SuppressWarnings("unchecked")
17      public static final <T> OperatorUnsubscribeEagerly<T> instance() {
18          return (OperatorUnsubscribeEagerly<T>) Singleton.INSTANCE;
19      }
20  
21      @Override
22      public Subscriber<? super T> call(final Subscriber<? super T> child) {
23          Subscriber<T> parent = new Subscriber<T>() {
24  
25              @Override
26              public void onCompleted() {
27                  unsubscribe();
28                  child.onCompleted();
29              }
30  
31              @Override
32              public void onError(Throwable e) {
33                  unsubscribe();
34                  child.onError(e);
35              }
36  
37              @Override
38              public void onNext(T t) {
39                  child.onNext(t);
40              }
41  
42          };
43          child.add(parent);
44          return parent;
45      }
46  
47  }