View Javadoc
1   package com.github.davidmoten.rx;
2   
3   import java.io.File;
4   import java.nio.charset.Charset;
5   import java.nio.file.Path;
6   import java.nio.file.Paths;
7   import java.nio.file.StandardWatchEventKinds;
8   import java.nio.file.WatchEvent;
9   import java.nio.file.WatchEvent.Kind;
10  import java.nio.file.WatchService;
11  import java.util.concurrent.TimeUnit;
12  
13  import com.github.davidmoten.rx.operators.OperatorFileTailer;
14  import com.github.davidmoten.rx.operators.OperatorWatchServiceEvents;
15  
16  import rx.Observable;
17  import rx.functions.Action0;
18  import rx.functions.Action1;
19  import rx.functions.Func0;
20  import rx.functions.Func1;
21  import rx.observables.GroupedObservable;
22  
23  /**
24   * Observable utility methods related to {@link File}.
25   */
26  public final class FileObservable {
27  
28      public static final int DEFAULT_MAX_BYTES_PER_EMISSION = 8192;
29  
30      /**
31       * Returns an {@link Observable} that uses NIO {@link WatchService} (and a
32       * dedicated thread) to push modified events to an observable that reads and
33       * reports new sequences of bytes to a subscriber. The NIO
34       * {@link WatchService} events are sampled according to
35       * <code>sampleTimeMs</code> so that lots of discrete activity on a file
36       * (for example a log file with very frequent entries) does not prompt an
37       * inordinate number of file reads to pick up changes.
38       * 
39       * @param file
40       *            the file to tail
41       * @param startPosition
42       *            start tailing file at position in bytes
43       * @param sampleTimeMs
44       *            sample time in millis
45       * @param chunkSize
46       *            max array size of each element emitted by the Observable. Is
47       *            also used as the buffer size for reading from the file. Try
48       *            {@link FileObservable#DEFAULT_MAX_BYTES_PER_EMISSION} if you
49       *            don't know what to put here.
50       * @return
51       */
52      public final static Observable<byte[]> tailFile(File file, long startPosition,
53              long sampleTimeMs, int chunkSize) {
54          Observable<Object> events = from(file, StandardWatchEventKinds.ENTRY_CREATE,
55                  StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.OVERFLOW)
56                          // don't care about the event details, just that there
57                          // is one
58                          .cast(Object.class)
59                          // get lines once on subscription so we tail the lines
60                          // in the file at startup
61                          .startWith(new Object());
62          return tailFile(file, startPosition, sampleTimeMs, chunkSize, events);
63      }
64  
65      /**
66       * Returns an {@link Observable} that uses given given observable to push
67       * modified events to an observable that reads and reports new sequences of
68       * bytes to a subscriber. The NIO {@link WatchService} MODIFY and OVERFLOW
69       * events are sampled according to <code>sampleTimeMs</code> so that lots of
70       * discrete activity on a file (for example a log file with very frequent
71       * entries) does not prompt an inordinate number of file reads to pick up
72       * changes. File create events are not sampled and are always passed
73       * through.
74       * 
75       * @param file
76       *            the file to tail
77       * @param startPosition
78       *            start tailing file at position in bytes
79       * @param sampleTimeMs
80       *            sample time in millis for MODIFY and OVERFLOW events
81       * @param chunkSize
82       *            max array size of each element emitted by the Observable. Is
83       *            also used as the buffer size for reading from the file. Try
84       *            {@link FileObservable#DEFAULT_MAX_BYTES_PER_EMISSION} if you
85       *            don't know what to put here.
86       * @return
87       */
88      public final static Observable<byte[]> tailFile(File file, long startPosition,
89              long sampleTimeMs, int chunkSize, Observable<?> events) {
90          return sampleModifyOrOverflowEventsOnly(events, sampleTimeMs)
91                  // tail file triggered by events
92                  .lift(new OperatorFileTailer(file, startPosition, chunkSize));
93      }
94  
95      /**
96       * Returns an {@link Observable} that uses NIO {@link WatchService} (and a
97       * dedicated thread) to push modified events to an observable that reads and
98       * reports new lines to a subscriber. The NIO WatchService MODIFY and
99       * OVERFLOW events are sampled according to <code>sampleTimeMs</code> so
100      * that lots of discrete activity on a file (for example a log file with
101      * very frequent entries) does not prompt an inordinate number of file reads
102      * to pick up changes. File create events are not sampled and are always
103      * passed through.
104      * 
105      * @param file
106      *            the file to tail
107      * @param startPosition
108      *            start tailing file at position in bytes
109      * @param sampleTimeMs
110      *            sample time in millis for MODIFY and OVERFLOW events
111      * @param charset
112      *            the character set to use to decode the bytes to a string
113      * @return
114      */
115     public final static Observable<String> tailTextFile(File file, long startPosition,
116             long sampleTimeMs, Charset charset) {
117         return toLines(tailFile(file, startPosition, sampleTimeMs, DEFAULT_MAX_BYTES_PER_EMISSION),
118                 charset);
119     }
120 
121     /**
122      * Returns an {@link Observable} of String that uses the given events stream
123      * to trigger checks on file change so that new lines can be read and
124      * emitted.
125      * 
126      * @param file
127      *            the file to tail, cannot be null
128      * @param startPosition
129      *            start tailing file at position in bytes
130      * @param charset
131      *            the character set to use to decode the bytes to a string
132      * @param events
133      *            trigger a check for file changes. Use
134      *            {@link Observable#interval(long, TimeUnit)} for example.
135      * @return
136      */
137     public final static Observable<String> tailTextFile(File file, long startPosition,
138             int chunkSize, Charset charset, Observable<?> events) {
139         return toLines(events.lift(new OperatorFileTailer(file, startPosition, chunkSize))
140                 .onBackpressureBuffer(), charset);
141     }
142 
143     /**
144      * Returns an {@link Observable} of {@link WatchEvent}s from a
145      * {@link WatchService}.
146      * 
147      * @param watchService
148      *            {@link WatchService} to generate events for
149      * @return
150      */
151     public final static Observable<WatchEvent<?>> from(WatchService watchService) {
152         return Observable.just(watchService).lift(new OperatorWatchServiceEvents())
153                 .onBackpressureBuffer();
154     }
155 
156     /**
157      * If file does not exist at subscribe time then is assumed to not be a
158      * directory. If the file is not a directory (bearing in mind the aforesaid
159      * assumption) then a {@link WatchService} is set up on its parent and
160      * {@link WatchEvent}s of the given kinds are filtered to concern the file
161      * in question. If the file is a directory then a {@link WatchService} is
162      * set up on the directory and all events are passed through of the given
163      * kinds.
164      * 
165      * @param file
166      *            file to watch
167      * @param kinds
168      *            event kinds to watch for and emit
169      * @return
170      */
171     @SafeVarargs
172     public final static Observable<WatchEvent<?>> from(final File file, Kind<?>... kinds) {
173         return from(file, null, kinds);
174     }
175 
176     /**
177      * If file does not exist at subscribe time then is assumed to not be a
178      * directory. If the file is not a directory (bearing in mind the aforesaid
179      * assumption) then a {@link WatchService} is set up on its parent and
180      * {@link WatchEvent}s of the given kinds are filtered to concern the file
181      * in question. If the file is a directory then a {@link WatchService} is
182      * set up on the directory and all events are passed through of the given
183      * kinds.
184      * 
185      * @param file
186      * @param onWatchStarted
187      *            called when WatchService is created
188      * @param kinds
189      * @return
190      */
191     public final static Observable<WatchEvent<?>> from(final File file,
192             final Action0 onWatchStarted, Kind<?>... kinds) {
193         return watchService(file, kinds)
194                 // when watch service created call onWatchStarted
195                 .doOnNext(new Action1<WatchService>() {
196                     @Override
197                     public void call(WatchService w) {
198                         if (onWatchStarted != null)
199                             onWatchStarted.call();
200                     }
201                 })
202                 // emit events from the WatchService
203                 .flatMap(TO_WATCH_EVENTS)
204                 // restrict to events related to the file
205                 .filter(onlyRelatedTo(file));
206     }
207 
208     /**
209      * Creates a {@link WatchService} on subscribe for the given file and event
210      * kinds.
211      * 
212      * @param file
213      *            the file to watch
214      * @param kinds
215      *            event kinds to watch for
216      * @return
217      */
218     @SafeVarargs
219     public final static Observable<WatchService> watchService(final File file,
220             final Kind<?>... kinds) {
221         return Observable.defer(new Func0<Observable<WatchService>>() {
222 
223             @Override
224             public Observable<WatchService> call() {
225                 try {
226                     final Path path = getBasePath(file);
227                     WatchService watchService = path.getFileSystem().newWatchService();
228                     path.register(watchService, kinds);
229                     return Observable.just(watchService);
230                 } catch (Exception e) {
231                     return Observable.error(e);
232                 }
233             }
234         });
235 
236     }
237 
238     private final static Path getBasePath(final File file) {
239         final Path path;
240         if (file.exists() && file.isDirectory())
241             path = Paths.get(file.toURI());
242         else
243             path = Paths.get(file.getParentFile().toURI());
244         return path;
245     }
246 
247     /**
248      * Returns true if and only if the path corresponding to a WatchEvent
249      * represents the given file. This will be the case for Create, Modify,
250      * Delete events.
251      * 
252      * @param file
253      *            the file to restrict events to
254      * @return
255      */
256     private final static Func1<WatchEvent<?>, Boolean> onlyRelatedTo(final File file) {
257         return new Func1<WatchEvent<?>, Boolean>() {
258 
259             @Override
260             public Boolean call(WatchEvent<?> event) {
261 
262                 final boolean ok;
263                 if (file.isDirectory())
264                     ok = true;
265                 else if (StandardWatchEventKinds.OVERFLOW.equals(event.kind()))
266                     ok = true;
267                 else {
268                     Object context = event.context();
269                     if (context != null && context instanceof Path) {
270                         Path p = (Path) context;
271                         Path basePath = getBasePath(file);
272                         File pFile = new File(basePath.toFile(), p.toString());
273                         ok = pFile.getAbsolutePath().equals(file.getAbsolutePath());
274                     } else
275                         ok = false;
276                 }
277                 return ok;
278             }
279         };
280     }
281 
282     private static Observable<String> toLines(Observable<byte[]> bytes, Charset charset) {
283         return Strings.split(Strings.decode(bytes, charset), "\n");
284     }
285 
286     private final static Func1<WatchService, Observable<WatchEvent<?>>> TO_WATCH_EVENTS = new Func1<WatchService, Observable<WatchEvent<?>>>() {
287 
288         @Override
289         public Observable<WatchEvent<?>> call(WatchService watchService) {
290             return from(watchService);
291         }
292     };
293 
294     private static Observable<Object> sampleModifyOrOverflowEventsOnly(Observable<?> events,
295             final long sampleTimeMs) {
296         return events
297                 // group by true if is modify or overflow, false otherwise
298                 .groupBy(IS_MODIFY_OR_OVERFLOW)
299                 // only sample if is modify or overflow
300                 .flatMap(sampleIfTrue(sampleTimeMs));
301     }
302 
303     private static Func1<GroupedObservable<Boolean, ?>, Observable<?>> sampleIfTrue(
304             final long sampleTimeMs) {
305         return new Func1<GroupedObservable<Boolean, ?>, Observable<?>>() {
306 
307             @Override
308             public Observable<?> call(GroupedObservable<Boolean, ?> group) {
309                 // if is modify or overflow WatchEvent
310                 if (group.getKey())
311                     return group.sample(sampleTimeMs, TimeUnit.MILLISECONDS);
312                 else
313                     return group;
314             }
315         };
316     }
317 
318     private static Func1<Object, Boolean> IS_MODIFY_OR_OVERFLOW = new Func1<Object, Boolean>() {
319 
320         @Override
321         public Boolean call(Object event) {
322             if (event instanceof WatchEvent) {
323                 WatchEvent<?> w = (WatchEvent<?>) event;
324                 String kind = w.kind().name();
325                 if (kind.equals(StandardWatchEventKinds.ENTRY_MODIFY.name())
326                         || kind.equals(StandardWatchEventKinds.OVERFLOW.name())) {
327                     return true;
328                 } else
329                     return false;
330             } else
331                 return false;
332         }
333     };
334 
335     public static Builder tailer() {
336         return new Builder();
337     }
338 
339     public static class Builder {
340 
341         private File file = null;
342         private long startPosition = 0;
343         private long sampleTimeMs = 500;
344         private int chunkSize = 8192;
345         private Charset charset = Charset.defaultCharset();
346         private Observable<?> source = null;
347         private Action0 onWatchStarted = new Action0() {
348             @Override
349             public void call() {
350                 // do nothing
351             }
352         };
353 
354         private Builder() {
355         }
356 
357         /**
358          * The file to tail.
359          * 
360          * @param file
361          * @return this
362          */
363         public Builder file(File file) {
364             this.file = file;
365             return this;
366         }
367 
368         public Builder file(String filename) {
369             return file(new File(filename));
370         }
371 
372         public Builder onWatchStarted(Action0 onWatchStarted) {
373             this.onWatchStarted = onWatchStarted;
374             return this;
375         }
376 
377         /**
378          * The startPosition in bytes in the file to commence the tail from. 0 =
379          * start of file. Defaults to 0.
380          * 
381          * @param startPosition
382          * @return this
383          */
384         public Builder startPosition(long startPosition) {
385             this.startPosition = startPosition;
386             return this;
387         }
388 
389         /**
390          * Specifies sampling to apply to the source observable (which could be
391          * very busy if a lot of writes are occurring for example). Sampling is
392          * only applied to file updates (MODIFY and OVERFLOW), file creation
393          * events are always passed through. File deletion events are ignored
394          * (in fact are not requested of NIO).
395          * 
396          * @param sampleTimeMs
397          * @return this
398          */
399         public Builder sampleTimeMs(long sampleTimeMs) {
400             this.sampleTimeMs = sampleTimeMs;
401             return this;
402         }
403 
404         /**
405          * Emissions from the tailed file will be no bigger than this.
406          * 
407          * @param chunkSize
408          * @return this
409          */
410         public Builder chunkSize(int chunkSize) {
411             this.chunkSize = chunkSize;
412             return this;
413         }
414 
415         /**
416          * The charset of the file. Only used for tailing a text file.
417          * 
418          * @param charset
419          * @return this
420          */
421         public Builder charset(Charset charset) {
422             this.charset = charset;
423             return this;
424         }
425 
426         /**
427          * The charset of the file. Only used for tailing a text file.
428          * 
429          * @param charset
430          * @return this
431          */
432         public Builder charset(String charset) {
433             return charset(Charset.forName(charset));
434         }
435 
436         public Builder utf8() {
437             return charset("UTF-8");
438         }
439 
440         public Builder source(Observable<?> source) {
441             this.source = source;
442             return this;
443         }
444 
445         public Observable<byte[]> tail() {
446 
447             return tailFile(file, startPosition, sampleTimeMs, chunkSize, getSource());
448         }
449 
450         public Observable<String> tailText() {
451             return tailTextFile(file, startPosition, chunkSize, charset, getSource());
452         }
453 
454         private Observable<?> getSource() {
455             if (source == null)
456                 return from(file, onWatchStarted, StandardWatchEventKinds.ENTRY_CREATE,
457                         StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.OVERFLOW);
458             else
459                 return source;
460 
461         }
462 
463     }
464 
465 }