1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.util.concurrent.atomic.AtomicBoolean;
4 import java.util.concurrent.atomic.AtomicInteger;
5 import java.util.concurrent.atomic.AtomicLong;
6
7 import org.reactivestreams.Subscriber;
8 import org.reactivestreams.Subscription;
9
10 import com.github.davidmoten.guavamini.Preconditions;
11
12 import io.reactivex.Flowable;
13 import io.reactivex.FlowableSubscriber;
14 import io.reactivex.internal.fuseable.SimplePlainQueue;
15 import io.reactivex.internal.queue.SpscLinkedArrayQueue;
16 import io.reactivex.internal.subscriptions.SubscriptionHelper;
17 import io.reactivex.internal.util.BackpressureHelper;
18
19 public final class FlowableStringSplitSimple extends Flowable<String> {
20
21 private final Flowable<String> source;
22 private final String delimiter;
23
24 public FlowableStringSplitSimple(Flowable<String> source, String delimiter) {
25 Preconditions.checkNotNull(source);
26 Preconditions.checkNotNull(delimiter);
27 Preconditions.checkArgument(delimiter.length() > 0);
28 this.source = source;
29 this.delimiter = delimiter;
30 }
31
32 @Override
33 protected void subscribeActual(Subscriber<? super String> s) {
34 source.subscribe(new StringSplitSubscriber(s, delimiter));
35 }
36
37 @SuppressWarnings("serial")
38 private static final class StringSplitSubscriber extends AtomicLong
39 implements FlowableSubscriber<String>, Subscription {
40
41 private final Subscriber<? super String> actual;
42
43 private final transient SimplePlainQueue<String> queue = new SpscLinkedArrayQueue<String>(
44 16);
45 private final AtomicInteger wip = new AtomicInteger();
46 private final AtomicBoolean once = new AtomicBoolean();
47
48 private final DelimitedStringLinkedList ss;
49 private volatile boolean cancelled;
50 private Subscription parent;
51 private boolean unbounded;
52
53 private Throwable error;
54 private volatile boolean done;
55
56 StringSplitSubscriber(Subscriber<? super String> actual, String delimiter) {
57 this.actual = actual;
58 this.ss = new DelimitedStringLinkedList(delimiter);
59 }
60
61 @Override
62 public void onSubscribe(Subscription subscription) {
63 this.parent = subscription;
64 actual.onSubscribe(this);
65 }
66
67 @Override
68 public void cancel() {
69 cancelled = true;
70 parent.cancel();
71 }
72
73 @Override
74 public void request(long n) {
75 if (SubscriptionHelper.validate(n)) {
76 BackpressureHelper.add(this, n);
77 if (once.compareAndSet(false, true)) {
78 if (n == Long.MAX_VALUE) {
79 parent.request(Long.MAX_VALUE);
80 unbounded = true;
81 } else {
82 parent.request(1);
83 }
84 }
85 drain();
86 }
87 }
88
89 @Override
90 public void onNext(String t) {
91 queue.offer(t);
92 drain();
93 }
94
95 @Override
96 public void onComplete() {
97 done = true;
98 drain();
99 }
100
101 @Override
102 public void onError(Throwable e) {
103 error = e;
104 done = true;
105 drain();
106 }
107
108 private void drain() {
109 if (wip.getAndIncrement() != 0) {
110 return;
111 }
112 int missed = 1;
113 while (true) {
114 long r = get();
115 long e = 0;
116 while (e != r) {
117 if (cancelled) {
118 return;
119 }
120 if (find()) {
121 e++;
122 } else {
123
124 boolean d = done;
125 String t = queue.poll();
126 if (t == null) {
127 if (d) {
128 Throwable err = error;
129 if (err != null) {
130 ss.clear();
131 actual.onError(err);
132 return;
133 } else {
134 String remaining = ss.remaining();
135 final boolean checkCancelled;
136 if (remaining != null) {
137 ss.clear();
138 queue.clear();
139 actual.onNext(remaining);
140 e++;
141 checkCancelled = true;
142 } else if (ss.addCalled()) {
143 ss.clear();
144 queue.clear();
145 actual.onNext("");
146 e++;
147 checkCancelled = true;
148 } else {
149 checkCancelled = false;
150 }
151 if (!checkCancelled || !cancelled) {
152 actual.onComplete();
153 }
154 return;
155 }
156 } else if (!unbounded) {
157 parent.request(1);
158 }
159 break;
160 } else {
161 ss.add(t);
162 }
163 }
164 }
165 if (e > 0 && r != Long.MAX_VALUE) {
166 this.addAndGet(-e);
167 }
168 missed = wip.addAndGet(-missed);
169 if (missed == 0) {
170 return;
171 }
172 }
173 }
174
175
176
177
178
179
180 private boolean find() {
181 if (ss == null) {
182 return false;
183 }
184 String s = ss.next();
185 if (s != null) {
186 actual.onNext(s);
187 return true;
188 } else {
189 return false;
190 }
191 }
192 }
193
194 }