1
2 package com.github.davidmoten.rx2.file;
3
4 import java.io.File;
5 import java.io.FileInputStream;
6 import java.io.IOException;
7 import java.nio.charset.Charset;
8 import java.nio.charset.StandardCharsets;
9 import java.nio.file.ClosedWatchServiceException;
10 import java.nio.file.Path;
11 import java.nio.file.Paths;
12 import java.nio.file.StandardWatchEventKinds;
13 import java.nio.file.WatchEvent;
14 import java.nio.file.WatchEvent.Kind;
15 import java.nio.file.WatchEvent.Modifier;
16 import java.nio.file.WatchKey;
17 import java.nio.file.WatchService;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.Optional;
22 import java.util.Queue;
23 import java.util.concurrent.TimeUnit;
24
25 import com.github.davidmoten.guavamini.Lists;
26 import com.github.davidmoten.guavamini.Preconditions;
27 import com.github.davidmoten.rx2.Bytes;
28
29 import io.reactivex.BackpressureStrategy;
30 import io.reactivex.Flowable;
31 import io.reactivex.Observable;
32 import io.reactivex.Scheduler;
33 import io.reactivex.functions.Function;
34 import io.reactivex.functions.Predicate;
35 import io.reactivex.observables.GroupedObservable;
36 import io.reactivex.schedulers.Schedulers;
37
38
39
40
41 public final class Files {
42
43 private static final long DEFAULT_POLLING_INTERVAL_MS = 1000;
44 private static final long DEFAULT_SAMPLE_TIME_MS = 2 * DEFAULT_POLLING_INTERVAL_MS;
45 public static final int DEFAULT_MAX_BYTES_PER_EMISSION = 8192;
46 public static final List<Kind<?>> ALL_KINDS = Lists.newArrayList(StandardWatchEventKinds.ENTRY_CREATE,
47 StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY,
48 StandardWatchEventKinds.OVERFLOW);
49
50 private Files() {
51
52 }
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 private static Flowable<byte[]> tailBytes(File file, long startPosition, long sampleTimeMs, int chunkSize,
81 Observable<?> events, BackpressureStrategy backpressureStrategy) {
82 Preconditions.checkNotNull(file);
83 return eventsToBytes(sampleModifyOrOverflowEventsOnly(events, sampleTimeMs),
84 backpressureStrategy,
85 file, startPosition, chunkSize);
86 }
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 private static Flowable<String> tailLines(File file, long startPosition, int chunkSize, Charset charset,
109 Observable<?> events, BackpressureStrategy backpressureStrategy) {
110 Preconditions.checkNotNull(file);
111 Preconditions.checkNotNull(charset);
112 Preconditions.checkNotNull(events);
113 return toLines(eventsToBytes(events, backpressureStrategy, file, startPosition, chunkSize), charset);
114 }
115
116 private static Observable<WatchEvent<?>> events(WatchService watchService, Scheduler scheduler, long intervalMs) {
117 Preconditions.checkNotNull(watchService, "watchService cannot be null");
118 Preconditions.checkNotNull(scheduler, "scheduler cannot be null");
119 Preconditions.checkArgument(intervalMs > 0, "intervalMs must be positive");
120 return Observable.interval(intervalMs, TimeUnit.MILLISECONDS, scheduler)
121 .flatMap(x -> {
122 try {
123 WatchKey key = watchService.poll();
124 if (key != null && key.isValid()) {
125 Observable<WatchEvent<?>> r = Observable.fromIterable(key.pollEvents());
126 key.reset();
127 return r;
128 } else {
129 return Observable.empty();
130 }
131 } catch (ClosedWatchServiceException e) {
132
133 return Observable.empty();
134 }
135 });
136 }
137
138 private static Observable<WatchEvent<?>> eventsBlocking(WatchService watchService) {
139 Preconditions.checkNotNull(watchService, "watchService cannot be null");
140 return Observable.<WatchEvent<?>, Queue<WatchEvent<?>>>generate(() -> new LinkedList<WatchEvent<?>>(),
141 (q, emitter) -> {
142 try {
143 while (q.isEmpty()) {
144
145 WatchKey key = watchService.take();
146 if (key.isValid()) {
147 q.addAll(key.pollEvents());
148 }
149 key.reset();
150 }
151 emitter.onNext(q.poll());
152 } catch (ClosedWatchServiceException e) {
153
154 emitter.onComplete();
155 } catch (Throwable e) {
156 emitter.onError(e);
157 }
158 }, q -> q.clear());
159
160 }
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178 private static Observable<WatchEvent<?>> eventsNonBlocking(File file, Scheduler scheduler, long pollingIntervalMs,
179 List<Kind<?>> kinds, List<Modifier> modifiers) {
180 return Observable.using(() -> watchService(file, kinds, modifiers),
181 ws -> events(ws, scheduler, pollingIntervalMs)
182
183 .filter(onlyRelatedTo(file)),
184 ws -> ws.close(), true);
185 }
186
187 private static Observable<WatchEvent<?>> eventsBlocking(File file, List<Kind<?>> kinds, List<Modifier> modifiers) {
188 return Observable.using(() -> watchService(file, kinds, modifiers),
189 ws -> eventsBlocking(ws)
190
191 .filter(onlyRelatedTo(file)),
192 ws -> ws.close(), true);
193 }
194
195
196
197
198
199
200
201
202
203
204
205
206 private static WatchService watchService(File file, List<Kind<?>> kinds, List<Modifier> modifiers)
207 throws IOException {
208 final Path path = getBasePath(file);
209 WatchService watchService = path.getFileSystem().newWatchService();
210 path.register(watchService, kinds.toArray(new Kind<?>[] {}), modifiers.toArray(new Modifier[] {}));
211 return watchService;
212 }
213
214
215
216
217
218
219
220
221
222 private final static Predicate<WatchEvent<?>> onlyRelatedTo(final File file) {
223 return new Predicate<WatchEvent<?>>() {
224
225 @Override
226 public boolean test(WatchEvent<?> event) {
227
228 final boolean ok;
229 if (file.isDirectory())
230 ok = true;
231 else if (StandardWatchEventKinds.OVERFLOW.equals(event.kind()))
232 ok = true;
233 else {
234 Object context = event.context();
235 if (context != null && context instanceof Path) {
236 Path p = (Path) context;
237 Path basePath = getBasePath(file);
238 File pFile = new File(basePath.toFile(), p.toString());
239 ok = pFile.getAbsolutePath().equals(file.getAbsolutePath());
240 } else
241 ok = false;
242 }
243 return ok;
244 }
245 };
246 }
247
248 private static Flowable<String> toLines(Flowable<byte[]> bytes, Charset charset) {
249 return com.github.davidmoten.rx2.Strings.split(com.github.davidmoten.rx2.Strings.decode(bytes, charset), "\n");
250 }
251
252 private static Observable<Object> sampleModifyOrOverflowEventsOnly(Observable<?> events, final long sampleTimeMs) {
253 return events
254
255 .groupBy(IS_MODIFY_OR_OVERFLOW)
256
257 .flatMap(sampleIfTrue(sampleTimeMs));
258 }
259
260 private static Function<GroupedObservable<Boolean, ?>, Observable<?>> sampleIfTrue(final long sampleTimeMs) {
261 return group -> {
262 if (group.getKey())
263 return group.sample(sampleTimeMs, TimeUnit.MILLISECONDS);
264 else
265 return group;
266 };
267 }
268
269 private static Function<Object, Boolean> IS_MODIFY_OR_OVERFLOW = event -> {
270 if (event instanceof WatchEvent) {
271 WatchEvent<?> w = (WatchEvent<?>) event;
272 String kind = w.kind().name();
273 if (kind.equals(StandardWatchEventKinds.ENTRY_MODIFY.name())
274 || kind.equals(StandardWatchEventKinds.OVERFLOW.name())) {
275 return true;
276 } else
277 return false;
278 } else
279 return false;
280 };
281
282 public static WatchEventsBuilder watch(File file) {
283 return new WatchEventsBuilder(file);
284 }
285
286 public static final class WatchEventsBuilder {
287
288 private final File file;
289
290 WatchEventsBuilder(File file) {
291 this.file = file;
292 }
293
294
295
296
297
298 public WatchEventsNonBlockingBuilder nonBlocking() {
299 return new WatchEventsNonBlockingBuilder(file);
300 }
301
302
303
304
305
306 public WatchEventsBlockingBuilder blocking() {
307 return new WatchEventsBlockingBuilder(file);
308 }
309
310 }
311
312 public static final class WatchEventsNonBlockingBuilder {
313 private final File file;
314 private Optional<Scheduler> scheduler = Optional.empty();
315 private long pollInterval = DEFAULT_POLLING_INTERVAL_MS;
316 private TimeUnit pollIntervalUnit = TimeUnit.MILLISECONDS;
317 private final List<Kind<?>> kinds = new ArrayList<>();
318 private final List<Modifier> modifiers = new ArrayList<>();
319
320 private WatchEventsNonBlockingBuilder(File file) {
321 Preconditions.checkNotNull(file, "file cannot be null");
322 this.file = file;
323 }
324
325 public WatchEventsNonBlockingBuilder pollInterval(long interval, TimeUnit unit, Scheduler scheduler) {
326 Preconditions.checkNotNull(unit);
327 Preconditions.checkNotNull(scheduler);
328 this.pollInterval = interval;
329 this.pollIntervalUnit = unit;
330 this.scheduler = Optional.ofNullable(scheduler);
331 return this;
332 }
333
334 public WatchEventsNonBlockingBuilder pollInterval(long interval, TimeUnit unit) {
335 return pollInterval(interval, unit, Schedulers.io());
336 }
337
338
339
340
341
342
343
344
345 public WatchEventsNonBlockingBuilder kind(Kind<?> kind) {
346 Preconditions.checkNotNull(kind);
347 this.kinds.add(kind);
348 return this;
349 }
350
351 public WatchEventsNonBlockingBuilder modifier(Modifier modifier) {
352 Preconditions.checkNotNull(modifier);
353 this.modifiers.add(modifier);
354 return this;
355 }
356
357
358
359
360
361
362
363
364 public WatchEventsNonBlockingBuilder kinds(Kind<?>... kinds) {
365 Preconditions.checkNotNull(kinds);
366 for (Kind<?> kind : kinds) {
367 this.kinds.add(kind);
368 }
369 return this;
370 }
371
372 public Observable<WatchEvent<?>> build() {
373 List<Kind<?>> kindsCopy = new ArrayList<>(kinds);
374 if (kindsCopy.isEmpty()) {
375 kindsCopy.add(StandardWatchEventKinds.ENTRY_CREATE);
376 kindsCopy.add(StandardWatchEventKinds.ENTRY_DELETE);
377 kindsCopy.add(StandardWatchEventKinds.ENTRY_MODIFY);
378 kindsCopy.add(StandardWatchEventKinds.OVERFLOW);
379 }
380 return Observable.using(
381 () -> watchService(file, kindsCopy, modifiers),
382 ws -> Files.events(ws, scheduler.orElse(Schedulers.io()), pollIntervalUnit.toMillis(pollInterval)),
383 ws -> ws.close(),
384 true);
385 }
386
387 }
388
389 public static final class WatchEventsBlockingBuilder {
390 private final File file;
391 private final List<Kind<?>> kinds = new ArrayList<>();
392 private final List<Modifier> modifiers = new ArrayList<>();
393
394 private WatchEventsBlockingBuilder(File file) {
395 Preconditions.checkNotNull(file);
396 this.file = file;
397 }
398
399
400
401
402
403
404
405
406 public WatchEventsBlockingBuilder kind(Kind<?> kind) {
407 Preconditions.checkNotNull(kind);
408 this.kinds.add(kind);
409 return this;
410 }
411
412 public WatchEventsBlockingBuilder modifier(Modifier modifier) {
413 Preconditions.checkNotNull(modifier);
414 this.modifiers.add(modifier);
415 return this;
416 }
417
418
419
420
421
422
423
424
425 public WatchEventsBlockingBuilder kinds(Kind<?>... kinds) {
426 Preconditions.checkNotNull(kinds);
427 for (Kind<?> kind : kinds) {
428 this.kinds.add(kind);
429 }
430 return this;
431 }
432
433 public Observable<WatchEvent<?>> build() {
434 List<Kind<?>> kindsCopy = new ArrayList<>(kinds);
435 if (kindsCopy.isEmpty()) {
436 kindsCopy.add(StandardWatchEventKinds.ENTRY_CREATE);
437 kindsCopy.add(StandardWatchEventKinds.ENTRY_DELETE);
438 kindsCopy.add(StandardWatchEventKinds.ENTRY_MODIFY);
439 kindsCopy.add(StandardWatchEventKinds.OVERFLOW);
440 }
441 return Observable.using(
442 () -> watchService(file, kindsCopy, modifiers),
443 ws -> Files.eventsBlocking(ws),
444 ws -> ws.close(),
445 true);
446 }
447
448 }
449
450 public static TailBytesBuilder tailBytes(File file) {
451 return new TailBytesBuilder(file);
452 }
453
454 public static TailBytesBuilder tailBytes(String filename) {
455 return tailBytes(new File(filename));
456 }
457
458 public static TailLinesBuilder tailLines(File file) {
459 return new TailLinesBuilder(file);
460 }
461
462 public static TailLinesBuilder tailLines(String filename) {
463 return tailLines(new File(filename));
464 }
465
466 public static final class TailBytesBuilder {
467 private final File file;
468
469 TailBytesBuilder(File file) {
470 this.file = file;
471 }
472
473
474
475
476
477 public TailBytesNonBlockingBuilder nonBlocking() {
478 return new TailBytesNonBlockingBuilder(file);
479 }
480
481
482
483
484
485 public TailBytesBlockingBuilder blocking() {
486 return new TailBytesBlockingBuilder(file);
487 }
488
489
490
491
492
493
494 public TailBytesUsingCustomEventsBuilder events(Observable<WatchEvent<?>> events) {
495 return new TailBytesUsingCustomEventsBuilder(file, events);
496 }
497
498 }
499
500 public static final class TailBytesUsingCustomEventsBuilder {
501
502 private long startPosition = 0;
503 private int chunkSize = 8192;
504 private long sampleIntervalMs = DEFAULT_SAMPLE_TIME_MS;
505 private final List<Modifier> modifiers = new ArrayList<>();
506 private final File file;
507 private final Observable<WatchEvent<?>> events;
508 private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
509
510 public TailBytesUsingCustomEventsBuilder(File file, Observable<WatchEvent<?>> events) {
511 this.file = file;
512 this.events = events;
513 }
514
515
516
517
518
519
520
521
522
523 public TailBytesUsingCustomEventsBuilder startPosition(long startPosition) {
524 this.startPosition = startPosition;
525 return this;
526 }
527
528
529
530
531
532
533
534
535 public TailBytesUsingCustomEventsBuilder chunkSize(int chunkSize) {
536 this.chunkSize = chunkSize;
537 return this;
538 }
539
540 public TailBytesUsingCustomEventsBuilder modifier(Modifier modifier) {
541 Preconditions.checkNotNull(modifier);
542 this.modifiers.add(modifier);
543 return this;
544 }
545
546 public TailBytesUsingCustomEventsBuilder sampleInterval(long duration, TimeUnit unit) {
547 this.sampleIntervalMs = unit.toMillis(duration);
548 return this;
549 }
550
551 public TailBytesUsingCustomEventsBuilder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
552 this.backpressureStrategy = backpressureStrategy;
553 return this;
554 }
555
556 public Flowable<byte[]> build() {
557 return Files.tailBytes(file, startPosition, sampleIntervalMs, chunkSize, events, backpressureStrategy);
558 }
559
560 }
561
562 public static final class TailBytesNonBlockingBuilder {
563
564 private final File file;
565 private long startPosition = 0;
566 private int chunkSize = 8192;
567 private long pollingIntervalMs = DEFAULT_POLLING_INTERVAL_MS;
568 private Scheduler scheduler = Schedulers.io();
569 private final List<Modifier> modifiers = new ArrayList<>();
570 private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
571
572 TailBytesNonBlockingBuilder(File file) {
573 Preconditions.checkNotNull(file);
574 this.file = file;
575 }
576
577 public TailBytesNonBlockingBuilder pollingInterval(long pollingInterval, TimeUnit unit, Scheduler scheduler) {
578 Preconditions.checkNotNull(unit);
579 Preconditions.checkNotNull(scheduler);
580 this.pollingIntervalMs = unit.toMillis(pollingInterval);
581 this.scheduler = scheduler;
582 return this;
583 }
584
585 public TailBytesNonBlockingBuilder pollingInterval(long pollingInterval, TimeUnit unit) {
586 Preconditions.checkNotNull(unit);
587 return pollingInterval(pollingInterval, unit, Schedulers.io());
588 }
589
590
591
592
593
594
595
596
597
598 public TailBytesNonBlockingBuilder startPosition(long startPosition) {
599 this.startPosition = startPosition;
600 return this;
601 }
602
603
604
605
606
607
608
609
610 public TailBytesNonBlockingBuilder chunkSize(int chunkSize) {
611 this.chunkSize = chunkSize;
612 return this;
613 }
614
615 public TailBytesNonBlockingBuilder modifier(Modifier modifier) {
616 Preconditions.checkNotNull(modifier);
617 this.modifiers.add(modifier);
618 return this;
619 }
620
621 public TailBytesNonBlockingBuilder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
622 this.backpressureStrategy = backpressureStrategy;
623 return this;
624 }
625
626
627 public Flowable<byte[]> build() {
628 Observable<WatchEvent<?>> events = Files.eventsNonBlocking(file, scheduler, pollingIntervalMs, ALL_KINDS, modifiers);
629 return Files.tailBytes(file, startPosition, pollingIntervalMs * 2, chunkSize, events, backpressureStrategy);
630 }
631
632 }
633
634 public static final class TailBytesBlockingBuilder {
635
636 private final File file;
637 private long startPosition = 0;
638 private int chunkSize = 8192;
639 private final List<Modifier> modifiers = new ArrayList<>();
640 private long sampleTimeMs = DEFAULT_SAMPLE_TIME_MS;
641 private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
642
643 TailBytesBlockingBuilder(File file) {
644 Preconditions.checkNotNull(file);
645 this.file = file;
646 }
647
648
649
650
651
652
653
654
655
656 public TailBytesBlockingBuilder startPosition(long startPosition) {
657 this.startPosition = startPosition;
658 return this;
659 }
660
661 public TailBytesBlockingBuilder sampleTime(long time, TimeUnit unit) {
662 Preconditions.checkNotNull(unit);
663 this.sampleTimeMs = unit.toMillis(time);
664 return this;
665 }
666
667
668
669
670
671
672
673
674 public TailBytesBlockingBuilder chunkSize(int chunkSize) {
675 this.chunkSize = chunkSize;
676 return this;
677 }
678
679 public TailBytesBlockingBuilder modifier(Modifier modifier) {
680 Preconditions.checkNotNull(modifier);
681 this.modifiers.add(modifier);
682 return this;
683 }
684
685 public TailBytesBlockingBuilder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
686 this.backpressureStrategy = backpressureStrategy;
687 return this;
688 }
689
690 public Flowable<byte[]> build() {
691 Observable<WatchEvent<?>> events = Files.eventsBlocking(file, ALL_KINDS, modifiers);
692 return Files.tailBytes(file, startPosition, sampleTimeMs, chunkSize, events, backpressureStrategy);
693 }
694
695 }
696
697 public static final class TailLinesBuilder {
698
699 private final File file;
700
701 TailLinesBuilder(File file) {
702 this.file = file;
703 }
704
705
706
707
708
709 public TailLinesNonBlockingBuilder nonBlocking() {
710 return new TailLinesNonBlockingBuilder(file);
711 }
712
713
714
715
716
717 public TailLinesBlockingBuilder blocking() {
718 return new TailLinesBlockingBuilder(file);
719 }
720
721
722
723
724
725
726 public TailLinesUsingCustomEventsBuilder events(Observable<WatchEvent<?>> events) {
727 Preconditions.checkNotNull(events);
728 return new TailLinesUsingCustomEventsBuilder(file, events);
729 }
730 }
731
732 public static final class TailLinesUsingCustomEventsBuilder {
733
734 private final File file;
735 private final Observable<WatchEvent<?>> events;
736
737 private long startPosition = 0;
738 private int chunkSize = 8192;
739 private Charset charset = StandardCharsets.UTF_8;
740 private final List<Modifier> modifiers = new ArrayList<>();
741 private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
742
743 TailLinesUsingCustomEventsBuilder(File file, Observable<WatchEvent<?>> events) {
744 this.file = file;
745 this.events = events;
746 }
747
748
749
750
751
752
753
754
755
756 public TailLinesUsingCustomEventsBuilder startPosition(long startPosition) {
757 this.startPosition = startPosition;
758 return this;
759 }
760
761
762
763
764
765
766
767
768 public TailLinesUsingCustomEventsBuilder chunkSize(int chunkSize) {
769 this.chunkSize = chunkSize;
770 return this;
771 }
772
773
774
775
776
777
778
779
780 public TailLinesUsingCustomEventsBuilder charset(Charset charset) {
781 Preconditions.checkNotNull(charset);
782 this.charset = charset;
783 return this;
784 }
785
786
787
788
789
790
791
792
793 public TailLinesUsingCustomEventsBuilder charset(String charset) {
794 Preconditions.checkNotNull(charset);
795 return charset(Charset.forName(charset));
796 }
797
798 public TailLinesUsingCustomEventsBuilder utf8() {
799 return charset("UTF-8");
800 }
801
802 public TailLinesUsingCustomEventsBuilder modifier(Modifier modifier) {
803 Preconditions.checkNotNull(modifier);
804 this.modifiers.add(modifier);
805 return this;
806 }
807
808 public TailLinesUsingCustomEventsBuilder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
809 this.backpressureStrategy = backpressureStrategy;
810 return this;
811 }
812
813 public Flowable<String> build() {
814 return Files.tailLines(file, startPosition, chunkSize, charset, events, backpressureStrategy);
815 }
816
817 }
818
819 public static final class TailLinesNonBlockingBuilder {
820
821 private final File file;
822 private long startPosition = 0;
823 private int chunkSize = 8192;
824 private Charset charset = StandardCharsets.UTF_8;
825 private long pollingIntervalMs = DEFAULT_POLLING_INTERVAL_MS;
826 private Scheduler scheduler = Schedulers.io();
827 private final List<Modifier> modifiers = new ArrayList<>();
828 private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
829
830 TailLinesNonBlockingBuilder(File file) {
831 Preconditions.checkNotNull(file);
832 this.file = file;
833 }
834
835 public TailLinesNonBlockingBuilder pollingInterval(long pollingInterval, TimeUnit unit, Scheduler scheduler) {
836 Preconditions.checkNotNull(unit);
837 Preconditions.checkNotNull(scheduler);
838 this.pollingIntervalMs = unit.toMillis(pollingInterval);
839 this.scheduler = scheduler;
840 return this;
841 }
842
843 public TailLinesNonBlockingBuilder pollingInterval(long pollingInterval, TimeUnit unit) {
844 Preconditions.checkNotNull(unit);
845 return pollingInterval(pollingInterval, unit, Schedulers.io());
846 }
847
848
849
850
851
852
853
854
855
856 public TailLinesNonBlockingBuilder startPosition(long startPosition) {
857 this.startPosition = startPosition;
858 return this;
859 }
860
861
862
863
864
865
866
867
868 public TailLinesNonBlockingBuilder chunkSize(int chunkSize) {
869 this.chunkSize = chunkSize;
870 return this;
871 }
872
873
874
875
876
877
878
879
880 public TailLinesNonBlockingBuilder charset(Charset charset) {
881 Preconditions.checkNotNull(charset);
882 this.charset = charset;
883 return this;
884 }
885
886
887
888
889
890
891
892
893 public TailLinesNonBlockingBuilder charset(String charset) {
894 Preconditions.checkNotNull(charset);
895 return charset(Charset.forName(charset));
896 }
897
898 public TailLinesNonBlockingBuilder utf8() {
899 return charset("UTF-8");
900 }
901
902 public TailLinesNonBlockingBuilder modifier(Modifier modifier) {
903 Preconditions.checkNotNull(modifier);
904 this.modifiers.add(modifier);
905 return this;
906 }
907
908 public TailLinesNonBlockingBuilder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
909 this.backpressureStrategy = backpressureStrategy;
910 return this;
911 }
912
913 public Flowable<String> build() {
914 Observable<WatchEvent<?>> events = Files.eventsNonBlocking(file, scheduler, pollingIntervalMs, ALL_KINDS, modifiers);
915 return Files.tailLines(file, startPosition, chunkSize, charset, events, backpressureStrategy);
916 }
917 }
918
919 public static final class TailLinesBlockingBuilder {
920
921 private final File file;
922 private long startPosition = 0;
923 private int chunkSize = 8192;
924 private Charset charset = StandardCharsets.UTF_8;
925 private final List<Modifier> modifiers = new ArrayList<>();
926 private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER;
927
928 TailLinesBlockingBuilder(File file) {
929 Preconditions.checkNotNull(file);
930 this.file = file;
931 }
932
933
934
935
936
937
938
939
940
941 public TailLinesBlockingBuilder startPosition(long startPosition) {
942 this.startPosition = startPosition;
943 return this;
944 }
945
946
947
948
949
950
951
952
953 public TailLinesBlockingBuilder chunkSize(int chunkSize) {
954 this.chunkSize = chunkSize;
955 return this;
956 }
957
958
959
960
961
962
963
964
965 public TailLinesBlockingBuilder charset(Charset charset) {
966 Preconditions.checkNotNull(charset);
967 this.charset = charset;
968 return this;
969 }
970
971
972
973
974
975
976
977
978 public TailLinesBlockingBuilder charset(String charset) {
979 Preconditions.checkNotNull(charset);
980 return charset(Charset.forName(charset));
981 }
982
983 public TailLinesBlockingBuilder utf8() {
984 return charset("UTF-8");
985 }
986
987 public TailLinesBlockingBuilder modifier(Modifier modifier) {
988 Preconditions.checkNotNull(modifier);
989 this.modifiers.add(modifier);
990 return this;
991 }
992
993 public TailLinesBlockingBuilder backpressureStrategy(BackpressureStrategy backpressureStrategy) {
994 this.backpressureStrategy = backpressureStrategy;
995 return this;
996 }
997
998 public Flowable<String> build() {
999 Observable<WatchEvent<?>> events = Files.eventsBlocking(file, ALL_KINDS, modifiers);
1000 return Files.tailLines(file, startPosition, chunkSize, charset, events, backpressureStrategy);
1001 }
1002 }
1003
1004 private static final class State {
1005 long position;
1006 }
1007
1008 private static Flowable<byte[]> eventsToBytes(Observable<?> events, BackpressureStrategy backpressureStrategy, File file, long startPosition, int chunkSize) {
1009 return Flowable.defer(() -> {
1010 State state = new State();
1011 state.position = startPosition;
1012
1013 return events.toFlowable(backpressureStrategy)
1014 .flatMap(event -> eventToBytes(event, file, state, chunkSize));
1015 });
1016 }
1017
1018 private static Flowable<byte[]> eventToBytes(Object event, File file, State state, int chunkSize) {
1019 if (event instanceof WatchEvent) {
1020 WatchEvent<?> w = (WatchEvent<?>) event;
1021 String kind = w.kind().name();
1022 if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE.name())) {
1023
1024 state.position = 0;
1025 } else if (kind.equals(StandardWatchEventKinds.ENTRY_DELETE.name())) {
1026 return Flowable.error(new IOException("file has been deleted"));
1027 }
1028
1029
1030 }
1031 long length = file.length();
1032 if (length > state.position) {
1033
1034
1035 return Flowable.using(
1036 () -> new FileInputStream(file),
1037 fis -> {
1038 fis.skip(state.position);
1039 return Bytes.from(fis, chunkSize)
1040 .doOnNext(x -> state.position += x.length);
1041 },
1042 fis -> fis.close(),
1043 true);
1044 } else {
1045 return Flowable.empty();
1046 }
1047 }
1048
1049 private final static Path getBasePath(final File file) {
1050 final Path path;
1051 if (file.exists() && file.isDirectory())
1052 path = Paths.get(file.toURI());
1053 else
1054 path = Paths.get(file.getParentFile().toURI());
1055 return path;
1056 }
1057
1058 }