View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.util.concurrent.atomic.AtomicBoolean;
4   import java.util.concurrent.atomic.AtomicInteger;
5   import java.util.concurrent.atomic.AtomicLong;
6   
7   import org.reactivestreams.Subscriber;
8   import org.reactivestreams.Subscription;
9   
10  import com.github.davidmoten.guavamini.Preconditions;
11  
12  import io.reactivex.Flowable;
13  import io.reactivex.FlowableSubscriber;
14  import io.reactivex.internal.fuseable.SimplePlainQueue;
15  import io.reactivex.internal.queue.SpscLinkedArrayQueue;
16  import io.reactivex.internal.subscriptions.SubscriptionHelper;
17  import io.reactivex.internal.util.BackpressureHelper;
18  
19  public final class FlowableStringSplitSimple extends Flowable<String> {
20  
21      private final Flowable<String> source;
22      private final String delimiter;
23  
24      public FlowableStringSplitSimple(Flowable<String> source, String delimiter) {
25          Preconditions.checkNotNull(source);
26          Preconditions.checkNotNull(delimiter);
27          Preconditions.checkArgument(delimiter.length() > 0);
28          this.source = source;
29          this.delimiter = delimiter;
30      }
31  
32      @Override
33      protected void subscribeActual(Subscriber<? super String> s) {
34          source.subscribe(new StringSplitSubscriber(s, delimiter));
35      }
36  
37      @SuppressWarnings("serial")
38      private static final class StringSplitSubscriber extends AtomicLong
39              implements FlowableSubscriber<String>, Subscription {
40  
41          private final Subscriber<? super String> actual;
42          // queue of notifications
43          private final transient SimplePlainQueue<String> queue = new SpscLinkedArrayQueue<String>(
44                  16);
45          private final AtomicInteger wip = new AtomicInteger();
46          private final AtomicBoolean once = new AtomicBoolean();
47  
48          private final DelimitedStringLinkedList ss;
49          private volatile boolean cancelled;
50          private Subscription parent;
51          private boolean unbounded;
52  
53          private Throwable error;
54          private volatile boolean done;
55  
56          StringSplitSubscriber(Subscriber<? super String> actual, String delimiter) {
57              this.actual = actual;
58              this.ss = new DelimitedStringLinkedList(delimiter);
59          }
60  
61          @Override
62          public void onSubscribe(Subscription subscription) {
63              this.parent = subscription;
64              actual.onSubscribe(this);
65          }
66  
67          @Override
68          public void cancel() {
69              cancelled = true;
70              parent.cancel();
71          }
72  
73          @Override
74          public void request(long n) {
75              if (SubscriptionHelper.validate(n)) {
76                  BackpressureHelper.add(this, n);
77                  if (once.compareAndSet(false, true)) {
78                      if (n == Long.MAX_VALUE) {
79                          parent.request(Long.MAX_VALUE);
80                          unbounded = true;
81                      } else {
82                          parent.request(1);
83                      }
84                  }
85                  drain();
86              }
87          }
88  
89          @Override
90          public void onNext(String t) {
91              queue.offer(t);
92              drain();
93          }
94  
95          @Override
96          public void onComplete() {
97              done = true;
98              drain();
99          }
100 
101         @Override
102         public void onError(Throwable e) {
103             error = e;
104             done = true;
105             drain();
106         }
107 
108         private void drain() {
109             if (wip.getAndIncrement() != 0) {
110                 return;
111             }
112             int missed = 1;
113             while (true) {
114                 long r = get(); // requested
115                 long e = 0; // emitted
116                 while (e != r) {
117                     if (cancelled) {
118                         return;
119                     }
120                     if (find()) {
121                         e++;
122                     } else {
123                         // must read `done` before poll occurs
124                         boolean d = done;
125                         String t = queue.poll();
126                         if (t == null) {
127                             if (d) {
128                                 Throwable err = error;
129                                 if (err != null) {
130                                     ss.clear();
131                                     actual.onError(err);
132                                     return;
133                                 } else {
134                                     String remaining = ss.remaining();
135                                     final boolean checkCancelled;
136                                     if (remaining != null) {
137                                         ss.clear();
138                                         queue.clear();
139                                         actual.onNext(remaining);
140                                         e++;
141                                         checkCancelled = true;
142                                     } else if (ss.addCalled()) {
143                                         ss.clear();
144                                         queue.clear();
145                                         actual.onNext("");
146                                         e++;
147                                         checkCancelled = true;
148                                     } else {
149                                         checkCancelled = false;
150                                     }
151                                     if (!checkCancelled || !cancelled) {
152                                         actual.onComplete();
153                                     }
154                                     return;
155                                 }
156                             } else if (!unbounded) {
157                                 parent.request(1);
158                             }
159                             break;
160                         } else {
161                             ss.add(t);
162                         }
163                     }
164                 }
165                 if (e > 0 && r != Long.MAX_VALUE) {
166                     this.addAndGet(-e);
167                 }
168                 missed = wip.addAndGet(-missed);
169                 if (missed == 0) {
170                     return;
171                 }
172             }
173         }
174 
175         /**
176          * Returns true if and only if a value emitted.
177          * 
178          * @return true if and only if a value emitted
179          */
180         private boolean find() {
181             if (ss == null) {
182                 return false;
183             }
184             String s = ss.next();
185             if (s != null) {
186                 actual.onNext(s);
187                 return true;
188             } else {
189                 return false;
190             }
191         }
192     }
193 
194 }