View Javadoc
1   
2   package com.github.davidmoten.rx2.file;
3   
4   import java.io.File;
5   import java.io.FileInputStream;
6   import java.io.IOException;
7   import java.nio.charset.Charset;
8   import java.nio.charset.StandardCharsets;
9   import java.nio.file.ClosedWatchServiceException;
10  import java.nio.file.Path;
11  import java.nio.file.Paths;
12  import java.nio.file.StandardWatchEventKinds;
13  import java.nio.file.WatchEvent;
14  import java.nio.file.WatchEvent.Kind;
15  import java.nio.file.WatchEvent.Modifier;
16  import java.nio.file.WatchKey;
17  import java.nio.file.WatchService;
18  import java.util.ArrayList;
19  import java.util.LinkedList;
20  import java.util.List;
21  import java.util.Optional;
22  import java.util.Queue;
23  import java.util.concurrent.TimeUnit;
24  
25  import com.github.davidmoten.guavamini.Lists;
26  import com.github.davidmoten.guavamini.Preconditions;
27  import com.github.davidmoten.rx2.Bytes;
28  
29  import io.reactivex.BackpressureStrategy;
30  import io.reactivex.Flowable;
31  import io.reactivex.Observable;
32  import io.reactivex.Scheduler;
33  import io.reactivex.functions.Function;
34  import io.reactivex.functions.Predicate;
35  import io.reactivex.observables.GroupedObservable;
36  import io.reactivex.schedulers.Schedulers;
37  
38  /**
39   * Flowable utility methods related to {@link File}.
40   */
41  public final class Files {
42  
43      private static final long DEFAULT_POLLING_INTERVAL_MS = 1000;
44      private static final long DEFAULT_SAMPLE_TIME_MS = 2 * DEFAULT_POLLING_INTERVAL_MS;
45      public static final int DEFAULT_MAX_BYTES_PER_EMISSION = 8192;
46      public static final List<Kind<?>> ALL_KINDS = Lists.newArrayList(StandardWatchEventKinds.ENTRY_CREATE,
47              StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY,
48              StandardWatchEventKinds.OVERFLOW);
49  
50      private Files() {
51          // prevent instantiation
52      }
53  
54      /**
55       * Returns an {@link Flowable} that uses given given Flowable to push modified
56       * events to an Flowable that reads and reports new sequences of bytes to a
57       * subscriber. The NIO {@link WatchService} MODIFY and OVERFLOW events are
58       * sampled according to <code>sampleTimeMs</code> so that lots of discrete
59       * activity on a file (for example a log file with very frequent entries) does
60       * not prompt an inordinate number of file reads to pick up changes. File create
61       * events are not sampled and are always passed through.
62       * 
63       * @param file
64       *            the file to tail
65       * @param startPosition
66       *            start tailing file at position in bytes
67       * @param pollingIntervalMs
68       *            polling time in millis for MODIFY and OVERFLOW events and half of
69       *            sample time for overflow
70       * @param chunkSize
71       *            max array size of each element emitted by the Flowable. Is also
72       *            used as the buffer size for reading from the file. Try
73       *            {@link FileFlowable#DEFAULT_MAX_BYTES_PER_EMISSION} if you don't
74       *            know what to put here.
75       * @param events
76       *            trigger a check for file changes. Use
77       *            {@link Flowable#interval(long, TimeUnit)} for example.
78       * @return Flowable of byte arrays
79       */
80      private static Flowable<byte[]> tailBytes(File file, long startPosition, long sampleTimeMs, int chunkSize,
81              Observable<?> events, BackpressureStrategy backpressureStrategy) {
82          Preconditions.checkNotNull(file);
83          return eventsToBytes(sampleModifyOrOverflowEventsOnly(events, sampleTimeMs), //
84                  backpressureStrategy,
85                  file, startPosition, chunkSize);
86      }
87  
88      /**
89       * Returns an {@link Flowable} of String that uses the given events stream to
90       * trigger checks on file change so that new lines can be read and emitted.
91       * 
92       * @param file
93       *            the file to tail, cannot be null
94       * @param startPosition
95       *            start tailing file at position in bytes
96       * @param chunkSize
97       *            max array size of each element emitted by the Flowable. Is also
98       *            used as the buffer size for reading from the file. Try
99       *            {@link FileFlowable#DEFAULT_MAX_BYTES_PER_EMISSION} if you don't
100      *            know what to put here.
101      * @param charset
102      *            the character set to use to decode the bytes to a string
103      * @param events
104      *            trigger a check for file changes. Use
105      *            {@link Flowable#interval(long, TimeUnit)} for example.
106      * @return Flowable of strings
107      */
108     private static Flowable<String> tailLines(File file, long startPosition, int chunkSize, Charset charset,
109             Observable<?> events, BackpressureStrategy backpressureStrategy) {
110         Preconditions.checkNotNull(file);
111         Preconditions.checkNotNull(charset);
112         Preconditions.checkNotNull(events);
113         return toLines(eventsToBytes(events, backpressureStrategy, file, startPosition, chunkSize), charset);
114     }
115 
116     private static Observable<WatchEvent<?>> events(WatchService watchService, Scheduler scheduler, long intervalMs) {
117         Preconditions.checkNotNull(watchService, "watchService cannot be null");
118         Preconditions.checkNotNull(scheduler, "scheduler cannot be null");
119         Preconditions.checkArgument(intervalMs > 0, "intervalMs must be positive");
120         return Observable.interval(intervalMs, TimeUnit.MILLISECONDS, scheduler) //
121                 .flatMap(x -> {
122                     try {
123                         WatchKey key = watchService.poll();
124                         if (key != null && key.isValid()) {
125                             Observable<WatchEvent<?>> r = Observable.fromIterable(key.pollEvents());
126                             key.reset();
127                             return r;
128                         } else {
129                             return Observable.empty();
130                         }
131                     } catch (ClosedWatchServiceException e) {
132                         // ignore
133                         return Observable.empty();
134                     }
135                 });
136     }
137 
138     private static Observable<WatchEvent<?>> eventsBlocking(WatchService watchService) {
139         Preconditions.checkNotNull(watchService, "watchService cannot be null");
140         return Observable.<WatchEvent<?>, Queue<WatchEvent<?>>>generate(() -> new LinkedList<WatchEvent<?>>(), //
141                 (q, emitter) -> {
142                     try {
143                         while (q.isEmpty()) {
144                             // blocking call
145                             WatchKey key = watchService.take();
146                             if (key.isValid()) {
147                                 q.addAll(key.pollEvents());
148                             }
149                             key.reset();
150                         }
151                         emitter.onNext(q.poll());
152                     } catch (ClosedWatchServiceException e) {
153                         // ignore
154                         emitter.onComplete();
155                     } catch (Throwable e) {
156                         emitter.onError(e);
157                     }
158                 }, q -> q.clear());
159 
160     }
161 
162     /**
163      * If file does not exist at subscribe time then is assumed to not be a
164      * directory. If the file is not a directory (bearing in mind the aforesaid
165      * assumption) then a {@link WatchService} is set up on its parent and
166      * {@link WatchEvent}s of the given kinds are filtered to concern the file in
167      * question. If the file is a directory then a {@link WatchService} is set up on
168      * the directory and all events are passed through of the given kinds.
169      * 
170      * @param file
171      *            file to generate watch events from
172      * @param onWatchStarted
173      *            called when WatchService is created
174      * @param kinds
175      *            kinds of watch events to register for
176      * @return Flowable of watch events
177      */
178     private static Observable<WatchEvent<?>> eventsNonBlocking(File file, Scheduler scheduler, long pollingIntervalMs,
179             List<Kind<?>> kinds, List<Modifier> modifiers) {
180         return Observable.using(() -> watchService(file, kinds, modifiers), //
181                 ws -> events(ws, scheduler, pollingIntervalMs)
182                         // restrict to events related to the file
183                         .filter(onlyRelatedTo(file)), //
184                 ws -> ws.close(), true);
185     }
186 
187     private static Observable<WatchEvent<?>> eventsBlocking(File file, List<Kind<?>> kinds, List<Modifier> modifiers) {
188         return Observable.using(() -> watchService(file, kinds, modifiers), //
189                 ws -> eventsBlocking(ws)
190                         // restrict to events related to the file
191                         .filter(onlyRelatedTo(file)), //
192                 ws -> ws.close(), true);
193     }
194 
195     /**
196      * Creates a {@link WatchService} on subscribe for the given file and event
197      * kinds.
198      * 
199      * @param file
200      *            the file to watch
201      * @param kinds
202      *            event kinds to watch for
203      * @return Flowable of watch events
204      * @throws IOException
205      */
206     private static WatchService watchService(File file, List<Kind<?>> kinds, List<Modifier> modifiers)
207             throws IOException {
208         final Path path = getBasePath(file);
209         WatchService watchService = path.getFileSystem().newWatchService();
210         path.register(watchService, kinds.toArray(new Kind<?>[] {}), modifiers.toArray(new Modifier[] {}));
211         return watchService;
212     }
213 
214     /**
215      * Returns true if and only if the path corresponding to a WatchEvent represents
216      * the given file. This will be the case for Create, Modify, Delete events.
217      * 
218      * @param file
219      *            the file to restrict events to
220      * @return predicate
221      */
222     private final static Predicate<WatchEvent<?>> onlyRelatedTo(final File file) {
223         return new Predicate<WatchEvent<?>>() {
224 
225             @Override
226             public boolean test(WatchEvent<?> event) {
227 
228                 final boolean ok;
229                 if (file.isDirectory())
230                     ok = true;
231                 else if (StandardWatchEventKinds.OVERFLOW.equals(event.kind()))
232                     ok = true;
233                 else {
234                     Object context = event.context();
235                     if (context != null && context instanceof Path) {
236                         Path p = (Path) context;
237                         Path basePath = getBasePath(file);
238                         File pFile = new File(basePath.toFile(), p.toString());
239                         ok = pFile.getAbsolutePath().equals(file.getAbsolutePath());
240                     } else
241                         ok = false;
242                 }
243                 return ok;
244             }
245         };
246     }
247 
248     private static Flowable<String> toLines(Flowable<byte[]> bytes, Charset charset) {
249         return com.github.davidmoten.rx2.Strings.split(com.github.davidmoten.rx2.Strings.decode(bytes, charset), "\n");
250     }
251 
252     private static Observable<Object> sampleModifyOrOverflowEventsOnly(Observable<?> events, final long sampleTimeMs) {
253         return events
254                 // group by true if is modify or overflow, false otherwise
255                 .groupBy(IS_MODIFY_OR_OVERFLOW)
256                 // only sample if is modify or overflow
257                 .flatMap(sampleIfTrue(sampleTimeMs));
258     }
259 
260     private static Function<GroupedObservable<Boolean, ?>, Observable<?>> sampleIfTrue(final long sampleTimeMs) {
261         return group -> { // if is modify or overflow WatchEvent
262             if (group.getKey())
263                 return group.sample(sampleTimeMs, TimeUnit.MILLISECONDS);
264             else
265                 return group;
266         };
267     }
268 
269     private static Function<Object, Boolean> IS_MODIFY_OR_OVERFLOW = event -> {
270         if (event instanceof WatchEvent) {
271             WatchEvent<?> w = (WatchEvent<?>) event;
272             String kind = w.kind().name();
273             if (kind.equals(StandardWatchEventKinds.ENTRY_MODIFY.name())
274                     || kind.equals(StandardWatchEventKinds.OVERFLOW.name())) {
275                 return true;
276             } else
277                 return false;
278         } else
279             return false;
280     };
281 
282     public static WatchEventsBuilder watch(File file) {
283         return new WatchEventsBuilder(file);
284     }
285 
286     public static final class WatchEventsBuilder {
287 
288         private final File file;
289 
290         WatchEventsBuilder(File file) {
291             this.file = file;
292         }
293 
294         /**
295          * Uses {@link WatchService#poll} under the covers.
296          * @return builder
297          */
298         public WatchEventsNonBlockingBuilder nonBlocking() {
299             return new WatchEventsNonBlockingBuilder(file);
300         }
301 
302         /**
303          * Uses blocking {@link WatchService#take} under the covers.
304          * @return builder
305          */
306         public WatchEventsBlockingBuilder blocking() {
307             return new WatchEventsBlockingBuilder(file);
308         }
309 
310     }
311 
312     public static final class WatchEventsNonBlockingBuilder {
313         private final File file;
314         private Optional<Scheduler> scheduler = Optional.empty();
315         private long pollInterval = DEFAULT_POLLING_INTERVAL_MS;
316         private TimeUnit pollIntervalUnit = TimeUnit.MILLISECONDS;
317         private final List<Kind<?>> kinds = new ArrayList<>();
318         private final List<Modifier> modifiers = new ArrayList<>();
319 
320         private WatchEventsNonBlockingBuilder(File file) {
321             Preconditions.checkNotNull(file, "file cannot be null");
322             this.file = file;
323         }
324 
325         public WatchEventsNonBlockingBuilder pollInterval(long interval, TimeUnit unit, Scheduler scheduler) {
326             Preconditions.checkNotNull(unit);
327             Preconditions.checkNotNull(scheduler);
328             this.pollInterval = interval;
329             this.pollIntervalUnit = unit;
330             this.scheduler = Optional.ofNullable(scheduler);
331             return this;
332         }
333 
334         public WatchEventsNonBlockingBuilder pollInterval(long interval, TimeUnit unit) {
335             return pollInterval(interval, unit, Schedulers.io());
336         }
337 
338         /**
339          * If no kind is specified then all {@link StandardWatchEventKinds} are used.
340          * 
341          * @param kind
342          *            kind to add
343          * @return this
344          */
345         public WatchEventsNonBlockingBuilder kind(Kind<?> kind) {
346             Preconditions.checkNotNull(kind);
347             this.kinds.add(kind);
348             return this;
349         }
350 
351         public WatchEventsNonBlockingBuilder modifier(Modifier modifier) {
352             Preconditions.checkNotNull(modifier);
353             this.modifiers.add(modifier);
354             return this;
355         }
356 
357         /**
358          * If no kind is specified then all {@link StandardWatchEventKinds} are used.
359          * 
360          * @param kinds
361          *            kinds to add
362          * @return this
363          */
364         public WatchEventsNonBlockingBuilder kinds(Kind<?>... kinds) {
365             Preconditions.checkNotNull(kinds);
366             for (Kind<?> kind : kinds) {
367                 this.kinds.add(kind);
368             }
369             return this;
370         }
371 
372         public Observable<WatchEvent<?>> build() {
373             List<Kind<?>> kindsCopy = new ArrayList<>(kinds);
374             if (kindsCopy.isEmpty()) {
375                 kindsCopy.add(StandardWatchEventKinds.ENTRY_CREATE);
376                 kindsCopy.add(StandardWatchEventKinds.ENTRY_DELETE);
377                 kindsCopy.add(StandardWatchEventKinds.ENTRY_MODIFY);
378                 kindsCopy.add(StandardWatchEventKinds.OVERFLOW);
379             }
380             return Observable.using( //
381                     () -> watchService(file, kindsCopy, modifiers), //
382                     ws -> Files.events(ws, scheduler.orElse(Schedulers.io()), pollIntervalUnit.toMillis(pollInterval)), //
383                     ws -> ws.close(), //
384                     true);
385         }
386 
387     }
388 
389     public static final class WatchEventsBlockingBuilder {
390         private final File file;
391         private final List<Kind<?>> kinds = new ArrayList<>();
392         private final List<Modifier> modifiers = new ArrayList<>();
393 
394         private WatchEventsBlockingBuilder(File file) {
395             Preconditions.checkNotNull(file);
396             this.file = file;
397         }
398 
399         /**
400          * If no kind is specified then all {@link StandardWatchEventKinds} are used.
401          * 
402          * @param kind
403          *            kind to add
404          * @return this
405          */
406         public WatchEventsBlockingBuilder kind(Kind<?> kind) {
407             Preconditions.checkNotNull(kind);
408             this.kinds.add(kind);
409             return this;
410         }
411 
412         public WatchEventsBlockingBuilder modifier(Modifier modifier) {
413             Preconditions.checkNotNull(modifier);
414             this.modifiers.add(modifier);
415             return this;
416         }
417 
418         /**
419          * If no kind is specified then all {@link StandardWatchEventKinds} are used.
420          * 
421          * @param kinds
422          *            kinds to add
423          * @return this
424          */
425         public WatchEventsBlockingBuilder kinds(Kind<?>... kinds) {
426             Preconditions.checkNotNull(kinds);
427             for (Kind<?> kind : kinds) {
428                 this.kinds.add(kind);
429             }
430             return this;
431         }
432 
433         public Observable<WatchEvent<?>> build() {
434             List<Kind<?>> kindsCopy = new ArrayList<>(kinds);
435             if (kindsCopy.isEmpty()) {
436                 kindsCopy.add(StandardWatchEventKinds.ENTRY_CREATE);
437                 kindsCopy.add(StandardWatchEventKinds.ENTRY_DELETE);
438                 kindsCopy.add(StandardWatchEventKinds.ENTRY_MODIFY);
439                 kindsCopy.add(StandardWatchEventKinds.OVERFLOW);
440             }
441             return Observable.using( //
442                     () -> watchService(file, kindsCopy, modifiers), //
443                     ws -> Files.eventsBlocking(ws), //
444                     ws -> ws.close(), //
445                     true);
446         }
447 
448     }
449 
450     public static TailBytesBuilder tailBytes(File file) {
451         return new TailBytesBuilder(file);
452     }
453 
454     public static TailBytesBuilder tailBytes(String filename) {
455         return tailBytes(new File(filename));
456     }
457 
458     public static TailLinesBuilder tailLines(File file) {
459         return new TailLinesBuilder(file);
460     }
461 
462     public static TailLinesBuilder tailLines(String filename) {
463         return tailLines(new File(filename));
464     }
465 
466     public static final class TailBytesBuilder {
467         private final File file;
468 
469         TailBytesBuilder(File file) {
470             this.file = file;
471         }
472         
473         /**
474          * Uses {@link WatchService#poll} under the covers.
475          * @return builder
476          */
477         public TailBytesNonBlockingBuilder nonBlocking() {
478             return new TailBytesNonBlockingBuilder(file);
479         }
480         
481         /**
482          * Uses blocking {@link WatchService#take} under the covers.
483          * @return builder
484          */
485         public TailBytesBlockingBuilder blocking() {
486             return new TailBytesBlockingBuilder(file);
487         }
488         
489         /**
490          * Specifies a custom source of {@link WatchEvent}s.
491          * @param events custom source of WatchEvents.
492          * @return this
493          */
494         public TailBytesUsingCustomEventsBuilder events(Observable<WatchEvent<?>> events) {
495             return new TailBytesUsingCustomEventsBuilder(file, events);
496         }
497         
498     }
499     
500     public static final class TailBytesUsingCustomEventsBuilder {
501 
502         private long startPosition = 0;
503         private int chunkSize = 8192;
504         private long sampleIntervalMs = DEFAULT_SAMPLE_TIME_MS;
505         private final List<Modifier> modifiers = new ArrayList<>();
506         private final File file;
507         private final Observable<WatchEvent<?>> events;
508         private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
509 
510         public TailBytesUsingCustomEventsBuilder(File file, Observable<WatchEvent<?>> events) {
511             this.file = file;
512             this.events = events;
513         }
514         
515         /**
516          * The startPosition in bytes in the file to commence the tail from. 0 = start
517          * of file. Defaults to 0.
518          * 
519          * @param startPosition
520          *            start position
521          * @return this
522          */
523         public TailBytesUsingCustomEventsBuilder startPosition(long startPosition) {
524             this.startPosition = startPosition;
525             return this;
526         }
527 
528         /**
529          * Emissions from the tailed file will be no bigger than this.
530          * 
531          * @param chunkSize
532          *            chunk size in bytes
533          * @return this
534          */
535         public TailBytesUsingCustomEventsBuilder chunkSize(int chunkSize) {
536             this.chunkSize = chunkSize;
537             return this;
538         }
539 
540         public TailBytesUsingCustomEventsBuilder modifier(Modifier modifier) {
541             Preconditions.checkNotNull(modifier);
542             this.modifiers.add(modifier);
543             return this;
544         }
545         
546         public TailBytesUsingCustomEventsBuilder sampleInterval(long duration, TimeUnit unit) {
547             this.sampleIntervalMs = unit.toMillis(duration);
548             return this;
549         }
550         
551         public TailBytesUsingCustomEventsBuilder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
552             this.backpressureStrategy  = backpressureStrategy;
553             return this;
554         }
555         
556         public Flowable<byte[]> build() {
557             return Files.tailBytes(file, startPosition, sampleIntervalMs, chunkSize, events, backpressureStrategy);
558         }
559         
560     }
561 
562     public static final class TailBytesNonBlockingBuilder {
563 
564         private final File file;
565         private long startPosition = 0;
566         private int chunkSize = 8192;
567         private long pollingIntervalMs = DEFAULT_POLLING_INTERVAL_MS;
568         private Scheduler scheduler = Schedulers.io();
569         private final List<Modifier> modifiers = new ArrayList<>();
570         private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
571 
572         TailBytesNonBlockingBuilder(File file) {
573             Preconditions.checkNotNull(file);
574             this.file = file;
575         }
576         
577         public TailBytesNonBlockingBuilder pollingInterval(long pollingInterval, TimeUnit unit, Scheduler scheduler) {
578             Preconditions.checkNotNull(unit);
579             Preconditions.checkNotNull(scheduler);
580             this.pollingIntervalMs = unit.toMillis(pollingInterval);
581             this.scheduler = scheduler;
582             return this;
583         }
584 
585         public TailBytesNonBlockingBuilder pollingInterval(long pollingInterval, TimeUnit unit) {
586             Preconditions.checkNotNull(unit);
587             return pollingInterval(pollingInterval, unit, Schedulers.io());
588         }
589 
590         /**
591          * The startPosition in bytes in the file to commence the tail from. 0 = start
592          * of file. Defaults to 0.
593          * 
594          * @param startPosition
595          *            start position
596          * @return this
597          */
598         public TailBytesNonBlockingBuilder startPosition(long startPosition) {
599             this.startPosition = startPosition;
600             return this;
601         }
602 
603         /**
604          * Emissions from the tailed file will be no bigger than this.
605          * 
606          * @param chunkSize
607          *            chunk size in bytes
608          * @return this
609          */
610         public TailBytesNonBlockingBuilder chunkSize(int chunkSize) {
611             this.chunkSize = chunkSize;
612             return this;
613         }
614 
615         public TailBytesNonBlockingBuilder modifier(Modifier modifier) {
616             Preconditions.checkNotNull(modifier);
617             this.modifiers.add(modifier);
618             return this;
619         }
620         
621         public TailBytesNonBlockingBuilder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
622             this.backpressureStrategy = backpressureStrategy;
623             return this;
624         }
625         
626         
627         public Flowable<byte[]> build() {
628             Observable<WatchEvent<?>> events = Files.eventsNonBlocking(file, scheduler, pollingIntervalMs, ALL_KINDS, modifiers);
629             return Files.tailBytes(file, startPosition, pollingIntervalMs * 2, chunkSize, events, backpressureStrategy);
630         }
631 
632     }
633 
634     public static final class TailBytesBlockingBuilder {
635 
636         private final File file;
637         private long startPosition = 0;
638         private int chunkSize = 8192;
639         private final List<Modifier> modifiers = new ArrayList<>();
640         private long sampleTimeMs = DEFAULT_SAMPLE_TIME_MS;
641         private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
642 
643         TailBytesBlockingBuilder(File file) {
644             Preconditions.checkNotNull(file);
645             this.file = file;
646         }
647 
648         /**
649          * The startPosition in bytes in the file to commence the tail from. 0 = start
650          * of file. Defaults to 0.
651          * 
652          * @param startPosition
653          *            start position
654          * @return this
655          */
656         public TailBytesBlockingBuilder startPosition(long startPosition) {
657             this.startPosition = startPosition;
658             return this;
659         }
660 
661         public TailBytesBlockingBuilder sampleTime(long time, TimeUnit unit) {
662             Preconditions.checkNotNull(unit);
663             this.sampleTimeMs = unit.toMillis(time);
664             return this;
665         }
666 
667         /**
668          * Emissions from the tailed file will be no bigger than this.
669          * 
670          * @param chunkSize
671          *            chunk size in bytes
672          * @return this
673          */
674         public TailBytesBlockingBuilder chunkSize(int chunkSize) {
675             this.chunkSize = chunkSize;
676             return this;
677         }
678 
679         public TailBytesBlockingBuilder modifier(Modifier modifier) {
680             Preconditions.checkNotNull(modifier);
681             this.modifiers.add(modifier);
682             return this;
683         }
684         
685         public TailBytesBlockingBuilder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
686             this.backpressureStrategy = backpressureStrategy;
687             return this;
688         }
689 
690         public Flowable<byte[]> build() {
691             Observable<WatchEvent<?>> events = Files.eventsBlocking(file, ALL_KINDS, modifiers);
692             return Files.tailBytes(file, startPosition, sampleTimeMs, chunkSize, events, backpressureStrategy);
693         }
694 
695     }
696     
697     public static final class TailLinesBuilder {
698         
699         private final File file;
700 
701         TailLinesBuilder(File file) {
702             this.file  = file;
703         }
704         
705         /**
706          * Uses {@link WatchService#poll} under the covers.
707          * @return builder
708          */
709         public TailLinesNonBlockingBuilder nonBlocking() {
710             return new TailLinesNonBlockingBuilder(file);
711         }
712         
713         /**
714          * Uses blocking {@link WatchService#take} under the covers.
715          * @return builder
716          */
717         public TailLinesBlockingBuilder blocking() {
718             return new TailLinesBlockingBuilder(file);
719         }
720         
721         /**
722          * Specifies a custom source of {@link WatchEvent}s.
723          * @param events custom source of WatchEvents.
724          * @return this
725          */
726         public TailLinesUsingCustomEventsBuilder events(Observable<WatchEvent<?>> events) {
727             Preconditions.checkNotNull(events);
728             return new TailLinesUsingCustomEventsBuilder(file, events);
729         }
730     }
731     
732     public static final class TailLinesUsingCustomEventsBuilder {
733 
734         private final File file;
735         private final Observable<WatchEvent<?>> events;
736         
737         private long startPosition = 0;
738         private int chunkSize = 8192;
739         private Charset charset = StandardCharsets.UTF_8;
740         private final List<Modifier> modifiers = new ArrayList<>();
741         private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
742 
743         TailLinesUsingCustomEventsBuilder(File file, Observable<WatchEvent<?>> events) {
744             this.file = file;
745             this.events = events;
746         }
747         
748         /**
749          * The startPosition in bytes in the file to commence the tail from. 0 = start
750          * of file. Defaults to 0.
751          * 
752          * @param startPosition
753          *            start position
754          * @return this
755          */
756         public TailLinesUsingCustomEventsBuilder startPosition(long startPosition) {
757             this.startPosition = startPosition;
758             return this;
759         }
760 
761         /**
762          * Emissions from the tailed file will be no bigger than this.
763          * 
764          * @param chunkSize
765          *            chunk size in bytes
766          * @return this
767          */
768         public TailLinesUsingCustomEventsBuilder chunkSize(int chunkSize) {
769             this.chunkSize = chunkSize;
770             return this;
771         }
772 
773         /**
774          * The charset of the file. Only used for tailing a text file. Default is UTF-8.
775          * 
776          * @param charset
777          *            charset to decode with
778          * @return this
779          */
780         public TailLinesUsingCustomEventsBuilder charset(Charset charset) {
781             Preconditions.checkNotNull(charset);
782             this.charset = charset;
783             return this;
784         }
785 
786         /**
787          * The charset of the file. Only used for tailing a text file. Default is UTF-8.
788          * 
789          * @param charset
790          *            charset to decode the file with
791          * @return this
792          */
793         public TailLinesUsingCustomEventsBuilder charset(String charset) {
794             Preconditions.checkNotNull(charset);
795             return charset(Charset.forName(charset));
796         }
797 
798         public TailLinesUsingCustomEventsBuilder utf8() {
799             return charset("UTF-8");
800         }
801 
802         public TailLinesUsingCustomEventsBuilder modifier(Modifier modifier) {
803             Preconditions.checkNotNull(modifier);
804             this.modifiers.add(modifier);
805             return this;
806         }
807         
808         public TailLinesUsingCustomEventsBuilder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
809             this.backpressureStrategy  = backpressureStrategy;
810             return this;
811         }
812         
813         public Flowable<String> build() {
814             return Files.tailLines(file, startPosition, chunkSize, charset, events, backpressureStrategy);
815         }
816         
817     }
818 
819     public static final class TailLinesNonBlockingBuilder {
820 
821         private final File file;
822         private long startPosition = 0;
823         private int chunkSize = 8192;
824         private Charset charset = StandardCharsets.UTF_8;
825         private long pollingIntervalMs = DEFAULT_POLLING_INTERVAL_MS;
826         private Scheduler scheduler = Schedulers.io();
827         private final List<Modifier> modifiers = new ArrayList<>();
828         private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
829 
830         TailLinesNonBlockingBuilder(File file) {
831             Preconditions.checkNotNull(file);
832             this.file = file;
833         }
834         
835         public TailLinesNonBlockingBuilder pollingInterval(long pollingInterval, TimeUnit unit, Scheduler scheduler) {
836             Preconditions.checkNotNull(unit);
837             Preconditions.checkNotNull(scheduler);
838             this.pollingIntervalMs = unit.toMillis(pollingInterval);
839             this.scheduler = scheduler;
840             return this;
841         }
842 
843         public TailLinesNonBlockingBuilder pollingInterval(long pollingInterval, TimeUnit unit) {
844             Preconditions.checkNotNull(unit);
845             return pollingInterval(pollingInterval, unit, Schedulers.io());
846         }
847 
848         /**
849          * The startPosition in bytes in the file to commence the tail from. 0 = start
850          * of file. Defaults to 0.
851          * 
852          * @param startPosition
853          *            start position
854          * @return this
855          */
856         public TailLinesNonBlockingBuilder startPosition(long startPosition) {
857             this.startPosition = startPosition;
858             return this;
859         }
860 
861         /**
862          * Emissions from the tailed file will be no bigger than this.
863          * 
864          * @param chunkSize
865          *            chunk size in bytes
866          * @return this
867          */
868         public TailLinesNonBlockingBuilder chunkSize(int chunkSize) {
869             this.chunkSize = chunkSize;
870             return this;
871         }
872 
873         /**
874          * The charset of the file. Only used for tailing a text file. Default is UTF-8.
875          * 
876          * @param charset
877          *            charset to decode with
878          * @return this
879          */
880         public TailLinesNonBlockingBuilder charset(Charset charset) {
881             Preconditions.checkNotNull(charset);
882             this.charset = charset;
883             return this;
884         }
885 
886         /**
887          * The charset of the file. Only used for tailing a text file. Default is UTF-8.
888          * 
889          * @param charset
890          *            charset to decode the file with
891          * @return this
892          */
893         public TailLinesNonBlockingBuilder charset(String charset) {
894             Preconditions.checkNotNull(charset);
895             return charset(Charset.forName(charset));
896         }
897 
898         public TailLinesNonBlockingBuilder utf8() {
899             return charset("UTF-8");
900         }
901 
902         public TailLinesNonBlockingBuilder modifier(Modifier modifier) {
903             Preconditions.checkNotNull(modifier);
904             this.modifiers.add(modifier);
905             return this;
906         }
907         
908         public TailLinesNonBlockingBuilder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
909             this.backpressureStrategy   = backpressureStrategy;
910             return this;
911         }
912         
913         public Flowable<String> build() {
914             Observable<WatchEvent<?>> events = Files.eventsNonBlocking(file, scheduler, pollingIntervalMs, ALL_KINDS, modifiers);
915             return Files.tailLines(file, startPosition, chunkSize, charset, events, backpressureStrategy);
916         }
917     }
918 
919     public static final class TailLinesBlockingBuilder {
920 
921         private final File file;
922         private long startPosition = 0;
923         private int chunkSize = 8192;
924         private Charset charset = StandardCharsets.UTF_8;
925         private final List<Modifier> modifiers = new ArrayList<>();
926         private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
927 
928         TailLinesBlockingBuilder(File file) {
929             Preconditions.checkNotNull(file);
930             this.file = file;
931         }
932 
933         /**
934          * The startPosition in bytes in the file to commence the tail from. 0 = start
935          * of file. Defaults to 0.
936          * 
937          * @param startPosition
938          *            start position
939          * @return this
940          */
941         public TailLinesBlockingBuilder startPosition(long startPosition) {
942             this.startPosition = startPosition;
943             return this;
944         }
945 
946         /**
947          * Emissions from the tailed file will be no bigger than this.
948          * 
949          * @param chunkSize
950          *            chunk size in bytes
951          * @return this
952          */
953         public TailLinesBlockingBuilder chunkSize(int chunkSize) {
954             this.chunkSize = chunkSize;
955             return this;
956         }
957 
958         /**
959          * The charset of the file. Only used for tailing a text file. Default is UTF-8.
960          * 
961          * @param charset
962          *            charset to decode with
963          * @return this
964          */
965         public TailLinesBlockingBuilder charset(Charset charset) {
966             Preconditions.checkNotNull(charset);
967             this.charset = charset;
968             return this;
969         }
970 
971         /**
972          * The charset of the file. Only used for tailing a text file. Default is UTF-8.
973          * 
974          * @param charset
975          *            charset to decode the file with
976          * @return this
977          */
978         public TailLinesBlockingBuilder charset(String charset) {
979             Preconditions.checkNotNull(charset);
980             return charset(Charset.forName(charset));
981         }
982 
983         public TailLinesBlockingBuilder utf8() {
984             return charset("UTF-8");
985         }
986 
987         public TailLinesBlockingBuilder modifier(Modifier modifier) {
988             Preconditions.checkNotNull(modifier);
989             this.modifiers.add(modifier);
990             return this;
991         }
992         
993         public TailLinesBlockingBuilder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
994             this.backpressureStrategy  = backpressureStrategy;
995             return this;
996         }
997 
998         public Flowable<String> build() {
999             Observable<WatchEvent<?>> events = Files.eventsBlocking(file, ALL_KINDS, modifiers);
1000             return Files.tailLines(file, startPosition, chunkSize, charset, events, backpressureStrategy);
1001         }
1002     }
1003 
1004     private static final class State {
1005         long position;
1006     }
1007 
1008     private static Flowable<byte[]> eventsToBytes(Observable<?> events, BackpressureStrategy backpressureStrategy, File file, long startPosition, int chunkSize) {
1009         return Flowable.defer(() -> {
1010             State state = new State();
1011             state.position = startPosition;
1012             // TODO allow user to specify BackpressureStrategy
1013             return events.toFlowable(backpressureStrategy) //
1014                     .flatMap(event -> eventToBytes(event, file, state, chunkSize));
1015         });
1016     }
1017 
1018     private static Flowable<byte[]> eventToBytes(Object event, File file, State state, int chunkSize) {
1019         if (event instanceof WatchEvent) {
1020             WatchEvent<?> w = (WatchEvent<?>) event;
1021             String kind = w.kind().name();
1022             if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE.name())) {
1023                 // if file has just been created then start from the start of the new file
1024                 state.position = 0;
1025             } else if (kind.equals(StandardWatchEventKinds.ENTRY_DELETE.name())) {
1026                 return Flowable.error(new IOException("file has been deleted"));
1027             }
1028             // we hope that ENTRY_CREATE and ENTRY_DELETE events never get wrapped up into
1029             // ENTRY_OVERFLOW!
1030         }
1031         long length = file.length();
1032         if (length > state.position) {
1033             // apply using method to ensure fis is closed on
1034             // termination or unsubscription
1035             return Flowable.using( //
1036                     () -> new FileInputStream(file), //
1037                     fis -> {
1038                         fis.skip(state.position);
1039                         return Bytes.from(fis, chunkSize) //
1040                                 .doOnNext(x -> state.position += x.length);
1041                     }, //
1042                     fis -> fis.close(), //
1043                     true);
1044         } else {
1045             return Flowable.empty();
1046         }
1047     }
1048 
1049     private final static Path getBasePath(final File file) {
1050         final Path path;
1051         if (file.exists() && file.isDirectory())
1052             path = Paths.get(file.toURI());
1053         else
1054             path = Paths.get(file.getParentFile().toURI());
1055         return path;
1056     }
1057 
1058 }