View Javadoc
1   package com.github.davidmoten.rx2;
2   
3   import java.io.File;
4   import java.io.FileInputStream;
5   import java.io.FileNotFoundException;
6   import java.io.IOException;
7   import java.io.InputStream;
8   import java.io.InputStreamReader;
9   import java.io.Reader;
10  import java.nio.charset.Charset;
11  import java.nio.charset.CharsetDecoder;
12  import java.nio.charset.CodingErrorAction;
13  import java.util.Arrays;
14  import java.util.List;
15  import java.util.concurrent.Callable;
16  import java.util.concurrent.atomic.AtomicBoolean;
17  import java.util.regex.Pattern;
18  
19  import org.reactivestreams.Publisher;
20  
21  import com.github.davidmoten.guavamini.Preconditions;
22  import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
23  import com.github.davidmoten.rx2.internal.flowable.FlowableStringInputStream;
24  import com.github.davidmoten.rx2.internal.flowable.FlowableStringSplitSimple;
25  import com.github.davidmoten.rx2.internal.flowable.TransformerDecode;
26  import com.github.davidmoten.rx2.internal.flowable.TransformerStringSplit;
27  
28  import io.reactivex.BackpressureStrategy;
29  import io.reactivex.Emitter;
30  import io.reactivex.Flowable;
31  import io.reactivex.FlowableTransformer;
32  import io.reactivex.Maybe;
33  import io.reactivex.annotations.Beta;
34  import io.reactivex.annotations.Experimental;
35  import io.reactivex.functions.BiConsumer;
36  import io.reactivex.functions.Consumer;
37  import io.reactivex.functions.Function;
38  import io.reactivex.functions.Predicate;
39  
40  public final class Strings {
41  
42      private Strings() {
43          // prevent instantiation
44      }
45  
46      public static final Charset UTF_8 = Charset.forName("UTF-8");
47  
48      private static final int DEFAULT_REQUEST_SIZE = 1;
49      private static final int DEFAULT_BUFFER_SIZE = 8192;
50  
51      /**
52       * Returns null if input is null otherwise returns input.toString().trim().
53       */
54      @VisibleForTesting
55      static Function<Object, String> TRIM = new Function<Object, String>() {
56  
57          @Override
58          public String apply(Object input) throws Exception {
59              if (input == null)
60                  return null;
61              else
62                  return input.toString().trim();
63          }
64      };
65  
66      @SuppressWarnings("unchecked")
67      public static <T> Function<T, String> trim() {
68          return (Function<T, String>) TRIM;
69      }
70  
71      public static Flowable<String> from(final Reader reader, final int bufferSize) {
72          return Flowable.generate(new Consumer<Emitter<String>>() {
73              final char[] buffer = new char[bufferSize];
74  
75              @Override
76              public void accept(Emitter<String> emitter) throws Exception {
77                  int count = reader.read(buffer);
78                  if (count == -1) {
79                      emitter.onComplete();
80                  } else {
81                      emitter.onNext(String.valueOf(buffer, 0, count));
82                  }
83              }
84          });
85      }
86  
87      public static Flowable<String> from(Reader reader) {
88          return from(reader, DEFAULT_BUFFER_SIZE);
89      }
90  
91      public static Flowable<String> from(InputStream is) {
92          return from(is, UTF_8);
93      }
94  
95      public static Flowable<String> from(InputStream is, Charset charset) {
96          return from(is, charset, DEFAULT_BUFFER_SIZE);
97      }
98  
99      public static Flowable<String> from(InputStream is, Charset charset, int bufferSize) {
100         return from(new InputStreamReader(is, charset), bufferSize);
101     }
102 
103     public static Flowable<String> split(Flowable<String> source, String pattern) {
104         return source.compose(Strings.split(pattern, BackpressureStrategy.BUFFER, 1));
105     }
106 
107     public static Maybe<String> concat(Flowable<String> source) {
108         return concat(source, "");
109     }
110 
111     public static Maybe<String> concat(Flowable<String> source, final String delimiter) {
112         return join(source, delimiter);
113     }
114 
115     public static Flowable<String> strings(Flowable<?> source) {
116         return source.map(new Function<Object, String>() {
117             @Override
118             public String apply(Object t) throws Exception {
119                 return String.valueOf(t);
120             }
121         });
122     }
123 
124     public static Flowable<String> from(File file) {
125         return from(file, UTF_8);
126     }
127 
128     public static Flowable<String> from(final File file, final Charset charset) {
129         Preconditions.checkNotNull(file);
130         Preconditions.checkNotNull(charset);
131         Callable<Reader> resourceFactory = new Callable<Reader>() {
132             @Override
133             public Reader call() throws FileNotFoundException {
134                 return new InputStreamReader(new FileInputStream(file), charset);
135             }
136         };
137         return from(resourceFactory);
138     }
139 
140     public static Flowable<String> fromClasspath(final Class<?> cls, final String resource, final Charset charset) {
141         Preconditions.checkNotNull(resource);
142         Preconditions.checkNotNull(charset);
143         Callable<Reader> resourceFactory = new Callable<Reader>() {
144             @Override
145             public Reader call() {
146                 return new InputStreamReader(cls.getResourceAsStream(resource), charset);
147             }
148         };
149         return from(resourceFactory);
150     }
151 
152     public static Flowable<String> fromClasspath(final String resource, final Charset charset) {
153         return fromClasspath(Strings.class, resource, charset);
154     }
155 
156     public static Flowable<String> fromClasspath(final String resource) {
157         return fromClasspath(resource, Utf8Holder.INSTANCE);
158     }
159 
160     public static Flowable<String> from(final Callable<Reader> readerFactory) {
161         Function<Reader, Flowable<String>> flowableFactory = new Function<Reader, Flowable<String>>() {
162             @Override
163             public Flowable<String> apply(Reader reader) {
164                 return from(reader);
165             }
166         };
167         return Flowable.using(readerFactory, flowableFactory, DisposeActionHolder.INSTANCE, true);
168     }
169 
170     public static Maybe<String> join(Flowable<String> source) {
171         return join(source, "");
172     }
173 
174     public static FlowableTransformer<byte[], String> decode(CharsetDecoder decoder) {
175         return decode(decoder, BackpressureStrategy.BUFFER, DEFAULT_REQUEST_SIZE);
176     }
177 
178     public static FlowableTransformer<byte[], String> decode(CharsetDecoder decoder,
179             BackpressureStrategy backpressureStrategy, int requestBatchSize) {
180         return TransformerDecode.decode(decoder, BackpressureStrategy.BUFFER, requestBatchSize);
181     }
182 
183     public static Flowable<String> decode(Flowable<byte[]> source, CharsetDecoder decoder) {
184         return source.compose(Strings.decode(decoder));
185     }
186 
187     public static Flowable<String> decode(Flowable<byte[]> source, Charset charset) {
188         return decode(source, charset.newDecoder().onMalformedInput(CodingErrorAction.REPLACE)
189                 .onUnmappableCharacter(CodingErrorAction.REPLACE));
190     }
191 
192     public static Flowable<String> decode(Flowable<byte[]> source, String charset) {
193         return decode(source, Charset.forName(charset));
194     }
195 
196     public static Maybe<String> join(final Flowable<String> source, final String delimiter) {
197 
198         return Maybe.defer(new Callable<Maybe<String>>() {
199             final AtomicBoolean afterFirst = new AtomicBoolean(false);
200             final AtomicBoolean isEmpty = new AtomicBoolean(true);
201 
202             @Override
203             public Maybe<String> call() {
204                 return source.collect(new Callable<StringBuilder>() {
205                     @Override
206                     public StringBuilder call() {
207                         return new StringBuilder();
208                     }
209                 }, new BiConsumer<StringBuilder, String>() {
210 
211                     @Override
212                     public void accept(StringBuilder b, String s) throws Exception {
213                         if (!afterFirst.compareAndSet(false, true)) {
214                             b.append(delimiter);
215                         }
216                         b.append(s);
217                         isEmpty.set(false);
218 
219                     }
220                 }).flatMapMaybe(new Function<StringBuilder, Maybe<String>>() {
221 
222                     @Override
223                     public Maybe<String> apply(StringBuilder b) {
224                         if (isEmpty.get())
225                             return Maybe.empty();
226                         else
227                             return Maybe.just(b.toString());
228                     }
229                 });
230 
231             }
232         });
233     }
234 
235     public static Flowable<List<String>> splitLinesSkipComments(InputStream is, Charset charset, final String delimiter,
236             final String commentPrefix) {
237         return from(is, charset) //
238                 .compose(Strings.split("\n", BackpressureStrategy.BUFFER, 1)) //
239                 .filter(new Predicate<String>() {
240                     @Override
241                     public boolean test(String line) {
242                         return !line.startsWith(commentPrefix);
243                     }
244                 }) //
245                 .map(SplitLinesHolder.trim) //
246                 .filter(SplitLinesHolder.notEmpty) //
247                 .map(new Function<String, List<String>>() {
248                     @Override
249                     public List<String> apply(String line) {
250                         return Arrays.asList(line.split(delimiter));
251                     }
252                 });
253     }
254 
255     private static class Utf8Holder {
256         static final Charset INSTANCE = Charset.forName("UTF-8");
257     }
258 
259     private static class DisposeActionHolder {
260         static final Consumer<Reader> INSTANCE = new Consumer<Reader>() {
261             @Override
262             public void accept(Reader reader) throws IOException {
263                 reader.close();
264             }
265         };
266     }
267 
268     private static class SplitLinesHolder {
269         static final Function<String, String> trim = new Function<String, String>() {
270             @Override
271             public String apply(String line) {
272                 return line.trim();
273             }
274         };
275         static final Predicate<String> notEmpty = new Predicate<String>() {
276             @Override
277             public boolean test(String line) {
278                 return !line.isEmpty();
279             }
280         };
281     }
282 
283     public static FlowableTransformer<String, String> split(String pattern) {
284         return split(pattern, BackpressureStrategy.BUFFER, 128);
285     }
286 
287     public static FlowableTransformer<String, String> split(Pattern pattern) {
288         return split(pattern, BackpressureStrategy.BUFFER, 128);
289     }
290 
291     public static FlowableTransformer<String, String> split(String pattern, BackpressureStrategy backpressureStrategy,
292             int requestBatchSize) {
293         return TransformerStringSplit.split(pattern, null, backpressureStrategy, requestBatchSize);
294     }
295 
296     public static FlowableTransformer<String, String> split(Pattern pattern, BackpressureStrategy backpressureStrategy,
297             int batchSize) {
298         return TransformerStringSplit.split(null, pattern, backpressureStrategy, batchSize);
299     }
300 
301     public static Function<Flowable<String>, Maybe<String>> join(final String delimiter) {
302         return new Function<Flowable<String>, Maybe<String>>() {
303 
304             @Override
305             public Maybe<String> apply(Flowable<String> source) throws Exception {
306                 return Strings.join(source, delimiter);
307             }
308 
309         };
310     }
311 
312     public static Function<Flowable<String>, Maybe<String>> join() {
313         return join("");
314     }
315 
316     public static Function<Flowable<String>, Maybe<String>> concat(final String delimiter) {
317         return join(delimiter);
318     }
319 
320     public static Function<Flowable<String>, Maybe<String>> concat() {
321         return concat("");
322     }
323 
324     public static <T> FlowableTransformer<T, String> strings() {
325         return new FlowableTransformer<T, String>() {
326 
327             @Override
328             public Publisher<String> apply(Flowable<T> source) {
329                 return Strings.strings(source);
330             }
331 
332         };
333     }
334 
335     /**
336      * Splits on a string delimiter, not a pattern. Is slower than RxJavaString
337      * 1.1.1 implementation on benchmarks below but requests minimally from
338      * upstream and is potentially much faster when the stream is significantly truncated
339      * (for example by downstream {@code .take(), .takeUntil(), elementAt()}.
340      * 
341      * <pre>
342      * Benchmark                                  Mode  Cnt       Score       Error  Units
343      * Benchmarks.splitRxJavaString              thrpt   10     983.128 ±    23.833  ops/s
344      * Benchmarks.splitRxJavaStringTake5         thrpt   10    1033.090 ±    33.083  ops/s
345      * Benchmarks.splitSimple                    thrpt   10     113.727 ±     2.122  ops/s
346      * Benchmarks.splitSimpleTake5               thrpt   10  867527.265 ± 27168.498  ops/s
347      * Benchmarks.splitStandard                  thrpt   10     108.880 ±     4.428  ops/s
348      * Benchmarks.splitStandardTake5             thrpt   10    1217.798 ±    44.237  ops/s
349      * Benchmarks.splitStandardWithPattern       thrpt   10     102.882 ±     5.083  ops/s
350      * Benchmarks.splitStandardWithPatternTake5  thrpt   10    1054.024 ±    27.906  ops/s
351      * </pre>
352      * 
353      * @param delimiter
354      *            string delimiter
355      * @param <T>
356      *            type being streamed
357      * @return stream split by delimiter
358      */
359     @Experimental
360     @Beta
361     public static <T> FlowableTransformer<String, String> splitSimple(final String delimiter) {
362         return new FlowableTransformer<String, String>() {
363 
364             @Override
365             public Publisher<String> apply(Flowable<String> source) {
366                 return new FlowableStringSplitSimple(source, delimiter);
367             }
368         };
369     }
370 
371     /**
372      * Returns an {@link InputStream} that offers the concatenated String data 
373      * emitted by a subscription to the given publisher using the given character set.
374      *  
375      * @param publisher the source of the String data
376      * @param charset the character set of the bytes to be read in the InputStream 
377      * @return offers the concatenated String data emitted by a subscription to 
378      *     the given publisher using the given character set
379      */
380     public static InputStream toInputStream(Publisher<String> publisher, Charset charset) {
381         return FlowableStringInputStream.createInputStream(publisher, charset);
382     }
383 
384     /**
385      * Returns an {@link InputStream} that offers the concatenated String data 
386      * emitted by a subscription to the given publisher using the  character set
387      * UTF-8 for the bytes read through the InputStream.
388      *  
389      * @param publisher the source of the String data
390      * @return offers the concatenated String data emitted by a subscription to 
391      *     the given publisher using the UTF-8 character set
392      */
393     public static InputStream toInputStream(Publisher<String> publisher) {
394         return toInputStream(publisher, Charset.forName("UTF-8"));
395     }
396 
397 }