1 package com.github.davidmoten.rx2;
2
3 import java.io.File;
4 import java.io.FileInputStream;
5 import java.io.FileNotFoundException;
6 import java.io.IOException;
7 import java.io.InputStream;
8 import java.io.InputStreamReader;
9 import java.io.Reader;
10 import java.nio.charset.Charset;
11 import java.nio.charset.CharsetDecoder;
12 import java.nio.charset.CodingErrorAction;
13 import java.util.Arrays;
14 import java.util.List;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.atomic.AtomicBoolean;
17 import java.util.regex.Pattern;
18
19 import org.reactivestreams.Publisher;
20
21 import com.github.davidmoten.guavamini.Preconditions;
22 import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
23 import com.github.davidmoten.rx2.internal.flowable.FlowableStringInputStream;
24 import com.github.davidmoten.rx2.internal.flowable.FlowableStringSplitSimple;
25 import com.github.davidmoten.rx2.internal.flowable.TransformerDecode;
26 import com.github.davidmoten.rx2.internal.flowable.TransformerStringSplit;
27
28 import io.reactivex.BackpressureStrategy;
29 import io.reactivex.Emitter;
30 import io.reactivex.Flowable;
31 import io.reactivex.FlowableTransformer;
32 import io.reactivex.Maybe;
33 import io.reactivex.annotations.Beta;
34 import io.reactivex.annotations.Experimental;
35 import io.reactivex.functions.BiConsumer;
36 import io.reactivex.functions.Consumer;
37 import io.reactivex.functions.Function;
38 import io.reactivex.functions.Predicate;
39
40 public final class Strings {
41
42 private Strings() {
43
44 }
45
46 public static final Charset UTF_8 = Charset.forName("UTF-8");
47
48 private static final int DEFAULT_REQUEST_SIZE = 1;
49 private static final int DEFAULT_BUFFER_SIZE = 8192;
50
51
52
53
54 @VisibleForTesting
55 static Function<Object, String> TRIM = new Function<Object, String>() {
56
57 @Override
58 public String apply(Object input) throws Exception {
59 if (input == null)
60 return null;
61 else
62 return input.toString().trim();
63 }
64 };
65
66 @SuppressWarnings("unchecked")
67 public static <T> Function<T, String> trim() {
68 return (Function<T, String>) TRIM;
69 }
70
71 public static Flowable<String> from(final Reader reader, final int bufferSize) {
72 return Flowable.generate(new Consumer<Emitter<String>>() {
73 final char[] buffer = new char[bufferSize];
74
75 @Override
76 public void accept(Emitter<String> emitter) throws Exception {
77 int count = reader.read(buffer);
78 if (count == -1) {
79 emitter.onComplete();
80 } else {
81 emitter.onNext(String.valueOf(buffer, 0, count));
82 }
83 }
84 });
85 }
86
87 public static Flowable<String> from(Reader reader) {
88 return from(reader, DEFAULT_BUFFER_SIZE);
89 }
90
91 public static Flowable<String> from(InputStream is) {
92 return from(is, UTF_8);
93 }
94
95 public static Flowable<String> from(InputStream is, Charset charset) {
96 return from(is, charset, DEFAULT_BUFFER_SIZE);
97 }
98
99 public static Flowable<String> from(InputStream is, Charset charset, int bufferSize) {
100 return from(new InputStreamReader(is, charset), bufferSize);
101 }
102
103 public static Flowable<String> split(Flowable<String> source, String pattern) {
104 return source.compose(Strings.split(pattern, BackpressureStrategy.BUFFER, 1));
105 }
106
107 public static Maybe<String> concat(Flowable<String> source) {
108 return concat(source, "");
109 }
110
111 public static Maybe<String> concat(Flowable<String> source, final String delimiter) {
112 return join(source, delimiter);
113 }
114
115 public static Flowable<String> strings(Flowable<?> source) {
116 return source.map(new Function<Object, String>() {
117 @Override
118 public String apply(Object t) throws Exception {
119 return String.valueOf(t);
120 }
121 });
122 }
123
124 public static Flowable<String> from(File file) {
125 return from(file, UTF_8);
126 }
127
128 public static Flowable<String> from(final File file, final Charset charset) {
129 Preconditions.checkNotNull(file);
130 Preconditions.checkNotNull(charset);
131 Callable<Reader> resourceFactory = new Callable<Reader>() {
132 @Override
133 public Reader call() throws FileNotFoundException {
134 return new InputStreamReader(new FileInputStream(file), charset);
135 }
136 };
137 return from(resourceFactory);
138 }
139
140 public static Flowable<String> fromClasspath(final Class<?> cls, final String resource, final Charset charset) {
141 Preconditions.checkNotNull(resource);
142 Preconditions.checkNotNull(charset);
143 Callable<Reader> resourceFactory = new Callable<Reader>() {
144 @Override
145 public Reader call() {
146 return new InputStreamReader(cls.getResourceAsStream(resource), charset);
147 }
148 };
149 return from(resourceFactory);
150 }
151
152 public static Flowable<String> fromClasspath(final String resource, final Charset charset) {
153 return fromClasspath(Strings.class, resource, charset);
154 }
155
156 public static Flowable<String> fromClasspath(final String resource) {
157 return fromClasspath(resource, Utf8Holder.INSTANCE);
158 }
159
160 public static Flowable<String> from(final Callable<Reader> readerFactory) {
161 Function<Reader, Flowable<String>> flowableFactory = new Function<Reader, Flowable<String>>() {
162 @Override
163 public Flowable<String> apply(Reader reader) {
164 return from(reader);
165 }
166 };
167 return Flowable.using(readerFactory, flowableFactory, DisposeActionHolder.INSTANCE, true);
168 }
169
170 public static Maybe<String> join(Flowable<String> source) {
171 return join(source, "");
172 }
173
174 public static FlowableTransformer<byte[], String> decode(CharsetDecoder decoder) {
175 return decode(decoder, BackpressureStrategy.BUFFER, DEFAULT_REQUEST_SIZE);
176 }
177
178 public static FlowableTransformer<byte[], String> decode(CharsetDecoder decoder,
179 BackpressureStrategy backpressureStrategy, int requestBatchSize) {
180 return TransformerDecode.decode(decoder, BackpressureStrategy.BUFFER, requestBatchSize);
181 }
182
183 public static Flowable<String> decode(Flowable<byte[]> source, CharsetDecoder decoder) {
184 return source.compose(Strings.decode(decoder));
185 }
186
187 public static Flowable<String> decode(Flowable<byte[]> source, Charset charset) {
188 return decode(source, charset.newDecoder().onMalformedInput(CodingErrorAction.REPLACE)
189 .onUnmappableCharacter(CodingErrorAction.REPLACE));
190 }
191
192 public static Flowable<String> decode(Flowable<byte[]> source, String charset) {
193 return decode(source, Charset.forName(charset));
194 }
195
196 public static Maybe<String> join(final Flowable<String> source, final String delimiter) {
197
198 return Maybe.defer(new Callable<Maybe<String>>() {
199 final AtomicBoolean afterFirst = new AtomicBoolean(false);
200 final AtomicBoolean isEmpty = new AtomicBoolean(true);
201
202 @Override
203 public Maybe<String> call() {
204 return source.collect(new Callable<StringBuilder>() {
205 @Override
206 public StringBuilder call() {
207 return new StringBuilder();
208 }
209 }, new BiConsumer<StringBuilder, String>() {
210
211 @Override
212 public void accept(StringBuilder b, String s) throws Exception {
213 if (!afterFirst.compareAndSet(false, true)) {
214 b.append(delimiter);
215 }
216 b.append(s);
217 isEmpty.set(false);
218
219 }
220 }).flatMapMaybe(new Function<StringBuilder, Maybe<String>>() {
221
222 @Override
223 public Maybe<String> apply(StringBuilder b) {
224 if (isEmpty.get())
225 return Maybe.empty();
226 else
227 return Maybe.just(b.toString());
228 }
229 });
230
231 }
232 });
233 }
234
235 public static Flowable<List<String>> splitLinesSkipComments(InputStream is, Charset charset, final String delimiter,
236 final String commentPrefix) {
237 return from(is, charset)
238 .compose(Strings.split("\n", BackpressureStrategy.BUFFER, 1))
239 .filter(new Predicate<String>() {
240 @Override
241 public boolean test(String line) {
242 return !line.startsWith(commentPrefix);
243 }
244 })
245 .map(SplitLinesHolder.trim)
246 .filter(SplitLinesHolder.notEmpty)
247 .map(new Function<String, List<String>>() {
248 @Override
249 public List<String> apply(String line) {
250 return Arrays.asList(line.split(delimiter));
251 }
252 });
253 }
254
255 private static class Utf8Holder {
256 static final Charset INSTANCE = Charset.forName("UTF-8");
257 }
258
259 private static class DisposeActionHolder {
260 static final Consumer<Reader> INSTANCE = new Consumer<Reader>() {
261 @Override
262 public void accept(Reader reader) throws IOException {
263 reader.close();
264 }
265 };
266 }
267
268 private static class SplitLinesHolder {
269 static final Function<String, String> trim = new Function<String, String>() {
270 @Override
271 public String apply(String line) {
272 return line.trim();
273 }
274 };
275 static final Predicate<String> notEmpty = new Predicate<String>() {
276 @Override
277 public boolean test(String line) {
278 return !line.isEmpty();
279 }
280 };
281 }
282
283 public static FlowableTransformer<String, String> split(String pattern) {
284 return split(pattern, BackpressureStrategy.BUFFER, 128);
285 }
286
287 public static FlowableTransformer<String, String> split(Pattern pattern) {
288 return split(pattern, BackpressureStrategy.BUFFER, 128);
289 }
290
291 public static FlowableTransformer<String, String> split(String pattern, BackpressureStrategy backpressureStrategy,
292 int requestBatchSize) {
293 return TransformerStringSplit.split(pattern, null, backpressureStrategy, requestBatchSize);
294 }
295
296 public static FlowableTransformer<String, String> split(Pattern pattern, BackpressureStrategy backpressureStrategy,
297 int batchSize) {
298 return TransformerStringSplit.split(null, pattern, backpressureStrategy, batchSize);
299 }
300
301 public static Function<Flowable<String>, Maybe<String>> join(final String delimiter) {
302 return new Function<Flowable<String>, Maybe<String>>() {
303
304 @Override
305 public Maybe<String> apply(Flowable<String> source) throws Exception {
306 return Strings.join(source, delimiter);
307 }
308
309 };
310 }
311
312 public static Function<Flowable<String>, Maybe<String>> join() {
313 return join("");
314 }
315
316 public static Function<Flowable<String>, Maybe<String>> concat(final String delimiter) {
317 return join(delimiter);
318 }
319
320 public static Function<Flowable<String>, Maybe<String>> concat() {
321 return concat("");
322 }
323
324 public static <T> FlowableTransformer<T, String> strings() {
325 return new FlowableTransformer<T, String>() {
326
327 @Override
328 public Publisher<String> apply(Flowable<T> source) {
329 return Strings.strings(source);
330 }
331
332 };
333 }
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359 @Experimental
360 @Beta
361 public static <T> FlowableTransformer<String, String> splitSimple(final String delimiter) {
362 return new FlowableTransformer<String, String>() {
363
364 @Override
365 public Publisher<String> apply(Flowable<String> source) {
366 return new FlowableStringSplitSimple(source, delimiter);
367 }
368 };
369 }
370
371
372
373
374
375
376
377
378
379
380 public static InputStream toInputStream(Publisher<String> publisher, Charset charset) {
381 return FlowableStringInputStream.createInputStream(publisher, charset);
382 }
383
384
385
386
387
388
389
390
391
392
393 public static InputStream toInputStream(Publisher<String> publisher) {
394 return toInputStream(publisher, Charset.forName("UTF-8"));
395 }
396
397 }