1
2
3
4
5
6
7
8
9
10
11
12
13 package com.github.davidmoten.rx2.flowable;
14
15 import java.util.Arrays;
16 import java.util.List;
17 import java.util.Queue;
18 import java.util.concurrent.ConcurrentLinkedQueue;
19 import java.util.concurrent.atomic.AtomicLong;
20
21 import org.reactivestreams.Subscriber;
22 import org.reactivestreams.Subscription;
23
24 import io.reactivex.Flowable;
25 import io.reactivex.internal.subscriptions.SubscriptionHelper;
26 import io.reactivex.internal.util.BackpressureHelper;
27
28
29
30
31
32
33
34
35
36
37
38 public final class Burst<T> extends Flowable<T> {
39
40 private final List<T> items;
41 private final Throwable error;
42
43 private Burst(Throwable error, List<T> items) {
44 if (items.isEmpty()) {
45 throw new IllegalArgumentException("items cannot be empty");
46 }
47 for (T item : items) {
48 if (item == null) {
49 throw new IllegalArgumentException("items cannot include null");
50 }
51 }
52 this.error = error;
53 this.items = items;
54 }
55
56 @Override
57 protected void subscribeActual(final Subscriber<? super T> subscriber) {
58 subscriber.onSubscribe(new Subscription() {
59
60 final Queue<T> q = new ConcurrentLinkedQueue<T>(items);
61 final AtomicLong requested = new AtomicLong();
62 volatile boolean cancelled;
63
64 @Override
65 public void request(long n) {
66 if (cancelled) {
67
68 return;
69 }
70 if (SubscriptionHelper.validate(n)) {
71
72
73 if (BackpressureHelper.add(requested, n) == 0) {
74 if (q.isEmpty()) {
75 return;
76 }
77 while (!q.isEmpty() && requested.get() > 0) {
78 T item = q.poll();
79 requested.decrementAndGet();
80 subscriber.onNext(item);
81 }
82 if (q.isEmpty()) {
83 if (error != null) {
84 subscriber.onError(error);
85 } else {
86 subscriber.onComplete();
87 }
88 }
89 }
90 }
91 }
92
93 @Override
94 public void cancel() {
95 cancelled = true;
96 }
97 });
98
99 }
100
101 @SuppressWarnings("unchecked")
102 public static <T> Builder<T> item(T item) {
103 return items(item);
104 }
105
106 public static <T> Builder<T> items(T... items) {
107 return new Builder<T>(Arrays.asList(items));
108 }
109
110 public static final class Builder<T> {
111
112 private final List<T> items;
113 private Throwable error;
114
115 private Builder(List<T> items) {
116 this.items = items;
117 }
118
119 public Flowable<T> error(Throwable e) {
120 this.error = e;
121 return create();
122 }
123
124 public Flowable<T> create() {
125 return new Burst<T>(error, items);
126 }
127
128 }
129
130 }