1 package com.github.davidmoten.rx;
2
3 import java.io.File;
4 import java.io.FileInputStream;
5 import java.io.FileNotFoundException;
6 import java.io.IOException;
7 import java.io.InputStream;
8 import java.io.InputStreamReader;
9 import java.io.Reader;
10 import java.nio.charset.Charset;
11 import java.nio.charset.CharsetDecoder;
12 import java.nio.charset.CodingErrorAction;
13 import java.util.Arrays;
14 import java.util.List;
15 import java.util.concurrent.atomic.AtomicBoolean;
16
17 import com.github.davidmoten.rx.internal.operators.OnSubscribeReader;
18 import com.github.davidmoten.util.Preconditions;
19
20 import rx.Observable;
21 import rx.functions.Action1;
22 import rx.functions.Action2;
23 import rx.functions.Func0;
24 import rx.functions.Func1;
25
26 public final class Strings {
27
28 private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
29
30
31
32
33 private static Func1<Object, String> TRIM = new Func1<Object, String>() {
34
35 @Override
36 public String call(Object input) {
37 if (input == null)
38 return null;
39 else
40 return input.toString().trim();
41 }
42 };
43
44 @SuppressWarnings("unchecked")
45 public static <T> Func1<T, String> trim() {
46 return (Func1<T, String>) TRIM;
47 }
48
49 public static Observable<String> from(Reader reader, int bufferSize) {
50 return Observable.create(new OnSubscribeReader(reader, bufferSize));
51 }
52
53 public static Observable<String> from(Reader reader) {
54 return from(reader, 8192);
55 }
56
57 public static Observable<String> from(InputStream is) {
58 return from(new InputStreamReader(is));
59 }
60
61 public static Observable<String> from(InputStream is, Charset charset) {
62 return from(new InputStreamReader(is, charset));
63 }
64
65 public static Observable<String> from(InputStream is, Charset charset, int bufferSize) {
66 return from(new InputStreamReader(is, charset), bufferSize);
67 }
68
69 public static Observable<String> split(Observable<String> source, String pattern) {
70 return source.compose(Transformers.split(pattern));
71 }
72
73 public static Observable<String> concat(Observable<String> source) {
74 return join(source, "");
75 }
76
77 public static Observable<String> concat(Observable<String> source, final String delimiter) {
78 return join(source, delimiter);
79 }
80
81 public static Observable<String> strings(Observable<?> source) {
82 return source.map(new Func1<Object, String>() {
83 @Override
84 public String call(Object t) {
85 return String.valueOf(t);
86 }
87 });
88 }
89
90 public static Observable<String> from(File file) {
91 return from(file, DEFAULT_CHARSET);
92 }
93
94 public static Observable<String> from(final File file, final Charset charset) {
95 Preconditions.checkNotNull(file);
96 Preconditions.checkNotNull(charset);
97 Func0<Reader> resourceFactory = new Func0<Reader>() {
98 @Override
99 public Reader call() {
100 try {
101 return new InputStreamReader(new FileInputStream(file), charset);
102 } catch (FileNotFoundException e) {
103 throw new RuntimeException(e);
104 }
105 }
106 };
107 return from(resourceFactory);
108 }
109
110 public static Observable<String> fromClasspath(final String resource, final Charset charset) {
111 Preconditions.checkNotNull(resource);
112 Preconditions.checkNotNull(charset);
113 Func0<Reader> resourceFactory = new Func0<Reader>() {
114 @Override
115 public Reader call() {
116 return new InputStreamReader(Strings.class.getResourceAsStream(resource), charset);
117 }
118 };
119 return from(resourceFactory);
120 }
121
122 public static Observable<String> fromClasspath(final String resource) {
123 return fromClasspath(resource, Utf8Holder.INSTANCE);
124 }
125
126 private static class Utf8Holder {
127 static final Charset INSTANCE = Charset.forName("UTF-8");
128 }
129
130 public static Observable<String> from(final Func0<Reader> readerFactory) {
131 Func1<Reader, Observable<String>> observableFactory = new Func1<Reader, Observable<String>>() {
132 @Override
133 public Observable<String> call(Reader reader) {
134 return from(reader);
135 }
136 };
137 return Observable.using(readerFactory, observableFactory, DisposeActionHolder.INSTANCE,
138 true);
139 }
140
141 private static class DisposeActionHolder {
142 static final Action1<Reader> INSTANCE = new Action1<Reader>() {
143 @Override
144 public void call(Reader reader) {
145 try {
146 reader.close();
147 } catch (IOException e) {
148 e.printStackTrace();
149 }
150 }
151 };
152 }
153
154 public static Observable<String> join(Observable<String> source) {
155 return join(source, "");
156 }
157
158 public static Observable<String> decode(Observable<byte[]> source, CharsetDecoder decoder) {
159 return source.compose(Transformers.decode(decoder));
160 }
161
162 public static Observable<String> decode(Observable<byte[]> source, Charset charset) {
163 return decode(source, charset.newDecoder().onMalformedInput(CodingErrorAction.REPLACE)
164 .onUnmappableCharacter(CodingErrorAction.REPLACE));
165 }
166
167 public static Observable<String> decode(Observable<byte[]> source, String charset) {
168 return decode(source, Charset.forName(charset));
169 }
170
171 public static Observable<String> join(final Observable<String> source, final String delimiter) {
172
173 return Observable.defer(new Func0<Observable<String>>() {
174 final AtomicBoolean afterFirst = new AtomicBoolean(false);
175 final AtomicBoolean isEmpty = new AtomicBoolean(true);
176
177 @Override
178 public Observable<String> call() {
179 return source.collect(new Func0<StringBuilder>() {
180 @Override
181 public StringBuilder call() {
182 return new StringBuilder();
183 }
184 }, new Action2<StringBuilder, String>() {
185
186 @Override
187 public void call(StringBuilder b, String s) {
188 if (!afterFirst.compareAndSet(false, true)) {
189 b.append(delimiter);
190 }
191 b.append(s);
192 isEmpty.set(false);
193 }
194 }).flatMap(new Func1<StringBuilder, Observable<String>>() {
195
196 @Override
197 public Observable<String> call(StringBuilder b) {
198 if (isEmpty.get())
199 return Observable.empty();
200 else
201 return Observable.just(b.toString());
202 }
203 });
204
205 }
206 });
207 }
208
209 public static Observable<List<String>> splitLines(InputStream is, Charset charset,
210 final String delimiter, final String commentPrefix) {
211 return from(is, charset).compose(Transformers.split("\n"))
212 .filter(new Func1<String, Boolean>() {
213 @Override
214 public Boolean call(String line) {
215 return !line.startsWith(commentPrefix);
216 }
217 })
218 .map(SplitLinesHolder.trim)
219 .filter(SplitLinesHolder.notEmpty)
220 .map(new Func1<String, List<String>>() {
221 @Override
222 public List<String> call(String line) {
223 return Arrays.asList(line.split(delimiter));
224 }
225 });
226 }
227
228 public static Observable<List<String>> splitLines(InputStream is, String delimiter) {
229 return splitLines(is, DEFAULT_CHARSET, delimiter, "#");
230 }
231
232 private static class SplitLinesHolder {
233 static final Func1<String, String> trim = new Func1<String, String>() {
234 @Override
235 public String call(String line) {
236 return line.trim();
237 }
238 };
239 static final Func1<String, Boolean> notEmpty = new Func1<String, Boolean>() {
240 @Override
241 public Boolean call(String line) {
242 return !line.isEmpty();
243 }
244 };
245 }
246
247 }