1 package com.github.davidmoten.rx.operators;
2
3 import java.io.File;
4 import java.io.FileInputStream;
5 import java.io.IOException;
6 import java.io.InputStream;
7 import java.nio.file.StandardWatchEventKinds;
8 import java.nio.file.WatchEvent;
9 import java.util.concurrent.atomic.AtomicLong;
10
11 import com.github.davidmoten.rx.Bytes;
12 import com.github.davidmoten.rx.subjects.PublishSubjectSingleSubscriber;
13
14 import rx.Observable;
15 import rx.Observable.Operator;
16 import rx.Subscriber;
17 import rx.functions.Action1;
18 import rx.functions.Func0;
19 import rx.functions.Func1;
20 import rx.observers.Subscribers;
21
22
23
24
25
26 public class OperatorFileTailer implements Operator<byte[], Object> {
27
28 private final File file;
29 private final AtomicLong currentPosition = new AtomicLong();
30 private final int maxBytesPerEmission;
31
32
33
34
35
36
37
38
39
40 public OperatorFileTailer(File file, long startPosition, int maxBytesPerEmission) {
41 if (file == null)
42 throw new NullPointerException("file cannot be null");
43 this.file = file;
44 this.currentPosition.set(startPosition);
45 this.maxBytesPerEmission = maxBytesPerEmission;
46 }
47
48
49
50
51
52
53
54 public OperatorFileTailer(File file, long startPosition) {
55 this(file, startPosition, 8192);
56 }
57
58 @Override
59 public Subscriber<? super Object> call(Subscriber<? super byte[]> child) {
60 final PublishSubjectSingleSubscriber<? super Object> subject = PublishSubjectSingleSubscriber
61 .create();
62 Subscriber<? super Object> parent = Subscribers.from(subject);
63 child.add(parent);
64 subject
65
66 .concatMap(reportNewLines(file, currentPosition, maxBytesPerEmission))
67
68 .unsafeSubscribe(child);
69 return parent;
70 }
71
72 private static Func1<Object, Observable<byte[]>> reportNewLines(final File file,
73 final AtomicLong currentPosition, final int maxBytesPerEmission) {
74 return new Func1<Object, Observable<byte[]>>() {
75 @Override
76 public Observable<byte[]> call(Object event) {
77
78 if (event instanceof WatchEvent) {
79 WatchEvent<?> w = (WatchEvent<?>) event;
80 String kind = w.kind().name();
81 if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE.name())) {
82 currentPosition.set(0);
83 }
84 }
85 long length = file.length();
86 if (length > currentPosition.get()) {
87 try {
88 final FileInputStream fis = new FileInputStream(file);
89 fis.skip(currentPosition.get());
90
91
92 return Observable.using(new Func0<InputStream>() {
93
94 @Override
95 public InputStream call() {
96 return fis;
97 }
98 }, new Func1<InputStream, Observable<byte[]>>() {
99
100 @Override
101 public Observable<byte[]> call(InputStream t1) {
102 return Bytes.from(fis, maxBytesPerEmission)
103
104 .doOnNext(new Action1<byte[]>() {
105 @Override
106 public void call(byte[] bytes) {
107 currentPosition.addAndGet(bytes.length);
108 }
109 });
110 }
111 }, new Action1<InputStream>() {
112 @Override
113 public void call(InputStream is) {
114 try {
115 is.close();
116 } catch (IOException e) {
117
118 }
119 }
120 });
121 } catch (IOException e) {
122 return Observable.error(e);
123 }
124 } else
125 return Observable.empty();
126 }
127
128 };
129 }
130
131 }