View Javadoc
1   package com.github.davidmoten.rx;
2   
3   import java.io.File;
4   import java.io.FileInputStream;
5   import java.io.FileNotFoundException;
6   import java.io.IOException;
7   import java.io.InputStream;
8   import java.io.InputStreamReader;
9   import java.io.Reader;
10  import java.nio.charset.Charset;
11  import java.nio.charset.CharsetDecoder;
12  import java.nio.charset.CodingErrorAction;
13  import java.util.Arrays;
14  import java.util.List;
15  import java.util.concurrent.atomic.AtomicBoolean;
16  
17  import com.github.davidmoten.rx.internal.operators.OnSubscribeReader;
18  import com.github.davidmoten.util.Preconditions;
19  
20  import rx.Observable;
21  import rx.functions.Action1;
22  import rx.functions.Action2;
23  import rx.functions.Func0;
24  import rx.functions.Func1;
25  
26  public final class Strings {
27  
28      private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
29  
30      /**
31       * Returns null if input is null otherwise returns input.toString().trim().
32       */
33      private static Func1<Object, String> TRIM = new Func1<Object, String>() {
34  
35          @Override
36          public String call(Object input) {
37              if (input == null)
38                  return null;
39              else
40                  return input.toString().trim();
41          }
42      };
43  
44      @SuppressWarnings("unchecked")
45      public static <T> Func1<T, String> trim() {
46          return (Func1<T, String>) TRIM;
47      }
48  
49      public static Observable<String> from(Reader reader, int bufferSize) {
50          return Observable.create(new OnSubscribeReader(reader, bufferSize));
51      }
52  
53      public static Observable<String> from(Reader reader) {
54          return from(reader, 8192);
55      }
56  
57      public static Observable<String> from(InputStream is) {
58          return from(new InputStreamReader(is));
59      }
60  
61      public static Observable<String> from(InputStream is, Charset charset) {
62          return from(new InputStreamReader(is, charset));
63      }
64  
65      public static Observable<String> from(InputStream is, Charset charset, int bufferSize) {
66          return from(new InputStreamReader(is, charset), bufferSize);
67      }
68  
69      public static Observable<String> split(Observable<String> source, String pattern) {
70          return source.compose(Transformers.split(pattern));
71      }
72  
73      public static Observable<String> concat(Observable<String> source) {
74          return join(source, "");
75      }
76  
77      public static Observable<String> concat(Observable<String> source, final String delimiter) {
78          return join(source, delimiter);
79      }
80  
81      public static Observable<String> strings(Observable<?> source) {
82          return source.map(new Func1<Object, String>() {
83              @Override
84              public String call(Object t) {
85                  return String.valueOf(t);
86              }
87          });
88      }
89  
90      public static Observable<String> from(File file) {
91          return from(file, DEFAULT_CHARSET);
92      }
93  
94      public static Observable<String> from(final File file, final Charset charset) {
95          Preconditions.checkNotNull(file);
96          Preconditions.checkNotNull(charset);
97          Func0<Reader> resourceFactory = new Func0<Reader>() {
98              @Override
99              public Reader call() {
100                 try {
101                     return new InputStreamReader(new FileInputStream(file), charset);
102                 } catch (FileNotFoundException e) {
103                     throw new RuntimeException(e);
104                 }
105             }
106         };
107         return from(resourceFactory);
108     }
109 
110     public static Observable<String> fromClasspath(final String resource, final Charset charset) {
111         Preconditions.checkNotNull(resource);
112         Preconditions.checkNotNull(charset);
113         Func0<Reader> resourceFactory = new Func0<Reader>() {
114             @Override
115             public Reader call() {
116                 return new InputStreamReader(Strings.class.getResourceAsStream(resource), charset);
117             }
118         };
119         return from(resourceFactory);
120     }
121 
122     public static Observable<String> fromClasspath(final String resource) {
123         return fromClasspath(resource, Utf8Holder.INSTANCE);
124     }
125 
126     private static class Utf8Holder {
127         static final Charset INSTANCE = Charset.forName("UTF-8");
128     }
129 
130     public static Observable<String> from(final Func0<Reader> readerFactory) {
131         Func1<Reader, Observable<String>> observableFactory = new Func1<Reader, Observable<String>>() {
132             @Override
133             public Observable<String> call(Reader reader) {
134                 return from(reader);
135             }
136         };
137         return Observable.using(readerFactory, observableFactory, DisposeActionHolder.INSTANCE,
138                 true);
139     }
140 
141     private static class DisposeActionHolder {
142         static final Action1<Reader> INSTANCE = new Action1<Reader>() {
143             @Override
144             public void call(Reader reader) {
145                 try {
146                     reader.close();
147                 } catch (IOException e) {
148                     e.printStackTrace();
149                 }
150             }
151         };
152     }
153 
154     public static Observable<String> join(Observable<String> source) {
155         return join(source, "");
156     }
157 
158     public static Observable<String> decode(Observable<byte[]> source, CharsetDecoder decoder) {
159         return source.compose(Transformers.decode(decoder));
160     }
161 
162     public static Observable<String> decode(Observable<byte[]> source, Charset charset) {
163         return decode(source, charset.newDecoder().onMalformedInput(CodingErrorAction.REPLACE)
164                 .onUnmappableCharacter(CodingErrorAction.REPLACE));
165     }
166 
167     public static Observable<String> decode(Observable<byte[]> source, String charset) {
168         return decode(source, Charset.forName(charset));
169     }
170 
171     public static Observable<String> join(final Observable<String> source, final String delimiter) {
172 
173         return Observable.defer(new Func0<Observable<String>>() {
174             final AtomicBoolean afterFirst = new AtomicBoolean(false);
175             final AtomicBoolean isEmpty = new AtomicBoolean(true);
176 
177             @Override
178             public Observable<String> call() {
179                 return source.collect(new Func0<StringBuilder>() {
180                     @Override
181                     public StringBuilder call() {
182                         return new StringBuilder();
183                     }
184                 }, new Action2<StringBuilder, String>() {
185 
186                     @Override
187                     public void call(StringBuilder b, String s) {
188                         if (!afterFirst.compareAndSet(false, true)) {
189                             b.append(delimiter);
190                         }
191                         b.append(s);
192                         isEmpty.set(false);
193                     }
194                 }).flatMap(new Func1<StringBuilder, Observable<String>>() {
195 
196                     @Override
197                     public Observable<String> call(StringBuilder b) {
198                         if (isEmpty.get())
199                             return Observable.empty();
200                         else
201                             return Observable.just(b.toString());
202                     }
203                 });
204 
205             }
206         });
207     }
208 
209     public static Observable<List<String>> splitLines(InputStream is, Charset charset,
210             final String delimiter, final String commentPrefix) {
211         return from(is, charset).compose(Transformers.split("\n")) //
212                 .filter(new Func1<String, Boolean>() {
213                     @Override
214                     public Boolean call(String line) {
215                         return !line.startsWith(commentPrefix);
216                     }
217                 }) //
218                 .map(SplitLinesHolder.trim) //
219                 .filter(SplitLinesHolder.notEmpty) //
220                 .map(new Func1<String, List<String>>() {
221                     @Override
222                     public List<String> call(String line) {
223                         return Arrays.asList(line.split(delimiter));
224                     }
225                 });
226     }
227     
228     public static Observable<List<String>> splitLines(InputStream is, String delimiter) {
229         return splitLines(is, DEFAULT_CHARSET, delimiter, "#");
230     }
231 
232     private static class SplitLinesHolder {
233         static final Func1<String, String> trim = new Func1<String, String>() {
234             @Override
235             public String call(String line) {
236                 return line.trim();
237             }
238         };
239         static final Func1<String, Boolean> notEmpty = new Func1<String, Boolean>() {
240             @Override
241             public Boolean call(String line) {
242                 return !line.isEmpty();
243             }
244         };
245     }
246 
247 }