1 package com.github.davidmoten.rx.internal.operators;
2
3 import rx.Observable.Operator;
4 import rx.Subscriber;
5
6 public final class OperatorUnsubscribeEagerly<T> implements Operator<T, T> {
7
8 private OperatorUnsubscribeEagerly() {
9
10 }
11
12 private static final class Singleton {
13 private static final OperatorUnsubscribeEagerly<?> INSTANCE = new OperatorUnsubscribeEagerly<Object>();
14 }
15
16 @SuppressWarnings("unchecked")
17 public static final <T> OperatorUnsubscribeEagerly<T> instance() {
18 return (OperatorUnsubscribeEagerly<T>) Singleton.INSTANCE;
19 }
20
21 @Override
22 public Subscriber<? super T> call(final Subscriber<? super T> child) {
23 Subscriber<T> parent = new Subscriber<T>() {
24
25 @Override
26 public void onCompleted() {
27 unsubscribe();
28 child.onCompleted();
29 }
30
31 @Override
32 public void onError(Throwable e) {
33 unsubscribe();
34 child.onError(e);
35 }
36
37 @Override
38 public void onNext(T t) {
39 child.onNext(t);
40 }
41
42 };
43 child.add(parent);
44 return parent;
45 }
46
47 }