1 package com.github.davidmoten.rx;
2
3 import java.io.File;
4 import java.nio.charset.Charset;
5 import java.nio.file.Path;
6 import java.nio.file.Paths;
7 import java.nio.file.StandardWatchEventKinds;
8 import java.nio.file.WatchEvent;
9 import java.nio.file.WatchEvent.Kind;
10 import java.nio.file.WatchService;
11 import java.util.concurrent.TimeUnit;
12
13 import com.github.davidmoten.rx.operators.OperatorFileTailer;
14 import com.github.davidmoten.rx.operators.OperatorWatchServiceEvents;
15
16 import rx.Observable;
17 import rx.functions.Action0;
18 import rx.functions.Action1;
19 import rx.functions.Func0;
20 import rx.functions.Func1;
21 import rx.observables.GroupedObservable;
22
23
24
25
26 public final class FileObservable {
27
28 public static final int DEFAULT_MAX_BYTES_PER_EMISSION = 8192;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 public final static Observable<byte[]> tailFile(File file, long startPosition,
53 long sampleTimeMs, int chunkSize) {
54 Observable<Object> events = from(file, StandardWatchEventKinds.ENTRY_CREATE,
55 StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.OVERFLOW)
56
57
58 .cast(Object.class)
59
60
61 .startWith(new Object());
62 return tailFile(file, startPosition, sampleTimeMs, chunkSize, events);
63 }
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 public final static Observable<byte[]> tailFile(File file, long startPosition,
89 long sampleTimeMs, int chunkSize, Observable<?> events) {
90 return sampleModifyOrOverflowEventsOnly(events, sampleTimeMs)
91
92 .lift(new OperatorFileTailer(file, startPosition, chunkSize));
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 public final static Observable<String> tailTextFile(File file, long startPosition,
116 long sampleTimeMs, Charset charset) {
117 return toLines(tailFile(file, startPosition, sampleTimeMs, DEFAULT_MAX_BYTES_PER_EMISSION),
118 charset);
119 }
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 public final static Observable<String> tailTextFile(File file, long startPosition,
138 int chunkSize, Charset charset, Observable<?> events) {
139 return toLines(events.lift(new OperatorFileTailer(file, startPosition, chunkSize))
140 .onBackpressureBuffer(), charset);
141 }
142
143
144
145
146
147
148
149
150
151 public final static Observable<WatchEvent<?>> from(WatchService watchService) {
152 return Observable.just(watchService).lift(new OperatorWatchServiceEvents())
153 .onBackpressureBuffer();
154 }
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171 @SafeVarargs
172 public final static Observable<WatchEvent<?>> from(final File file, Kind<?>... kinds) {
173 return from(file, null, kinds);
174 }
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191 public final static Observable<WatchEvent<?>> from(final File file,
192 final Action0 onWatchStarted, Kind<?>... kinds) {
193 return watchService(file, kinds)
194
195 .doOnNext(new Action1<WatchService>() {
196 @Override
197 public void call(WatchService w) {
198 if (onWatchStarted != null)
199 onWatchStarted.call();
200 }
201 })
202
203 .flatMap(TO_WATCH_EVENTS)
204
205 .filter(onlyRelatedTo(file));
206 }
207
208
209
210
211
212
213
214
215
216
217
218 @SafeVarargs
219 public final static Observable<WatchService> watchService(final File file,
220 final Kind<?>... kinds) {
221 return Observable.defer(new Func0<Observable<WatchService>>() {
222
223 @Override
224 public Observable<WatchService> call() {
225 try {
226 final Path path = getBasePath(file);
227 WatchService watchService = path.getFileSystem().newWatchService();
228 path.register(watchService, kinds);
229 return Observable.just(watchService);
230 } catch (Exception e) {
231 return Observable.error(e);
232 }
233 }
234 });
235
236 }
237
238 private final static Path getBasePath(final File file) {
239 final Path path;
240 if (file.exists() && file.isDirectory())
241 path = Paths.get(file.toURI());
242 else
243 path = Paths.get(file.getParentFile().toURI());
244 return path;
245 }
246
247
248
249
250
251
252
253
254
255
256 private final static Func1<WatchEvent<?>, Boolean> onlyRelatedTo(final File file) {
257 return new Func1<WatchEvent<?>, Boolean>() {
258
259 @Override
260 public Boolean call(WatchEvent<?> event) {
261
262 final boolean ok;
263 if (file.isDirectory())
264 ok = true;
265 else if (StandardWatchEventKinds.OVERFLOW.equals(event.kind()))
266 ok = true;
267 else {
268 Object context = event.context();
269 if (context != null && context instanceof Path) {
270 Path p = (Path) context;
271 Path basePath = getBasePath(file);
272 File pFile = new File(basePath.toFile(), p.toString());
273 ok = pFile.getAbsolutePath().equals(file.getAbsolutePath());
274 } else
275 ok = false;
276 }
277 return ok;
278 }
279 };
280 }
281
282 private static Observable<String> toLines(Observable<byte[]> bytes, Charset charset) {
283 return Strings.split(Strings.decode(bytes, charset), "\n");
284 }
285
286 private final static Func1<WatchService, Observable<WatchEvent<?>>> TO_WATCH_EVENTS = new Func1<WatchService, Observable<WatchEvent<?>>>() {
287
288 @Override
289 public Observable<WatchEvent<?>> call(WatchService watchService) {
290 return from(watchService);
291 }
292 };
293
294 private static Observable<Object> sampleModifyOrOverflowEventsOnly(Observable<?> events,
295 final long sampleTimeMs) {
296 return events
297
298 .groupBy(IS_MODIFY_OR_OVERFLOW)
299
300 .flatMap(sampleIfTrue(sampleTimeMs));
301 }
302
303 private static Func1<GroupedObservable<Boolean, ?>, Observable<?>> sampleIfTrue(
304 final long sampleTimeMs) {
305 return new Func1<GroupedObservable<Boolean, ?>, Observable<?>>() {
306
307 @Override
308 public Observable<?> call(GroupedObservable<Boolean, ?> group) {
309
310 if (group.getKey())
311 return group.sample(sampleTimeMs, TimeUnit.MILLISECONDS);
312 else
313 return group;
314 }
315 };
316 }
317
318 private static Func1<Object, Boolean> IS_MODIFY_OR_OVERFLOW = new Func1<Object, Boolean>() {
319
320 @Override
321 public Boolean call(Object event) {
322 if (event instanceof WatchEvent) {
323 WatchEvent<?> w = (WatchEvent<?>) event;
324 String kind = w.kind().name();
325 if (kind.equals(StandardWatchEventKinds.ENTRY_MODIFY.name())
326 || kind.equals(StandardWatchEventKinds.OVERFLOW.name())) {
327 return true;
328 } else
329 return false;
330 } else
331 return false;
332 }
333 };
334
335 public static Builder tailer() {
336 return new Builder();
337 }
338
339 public static class Builder {
340
341 private File file = null;
342 private long startPosition = 0;
343 private long sampleTimeMs = 500;
344 private int chunkSize = 8192;
345 private Charset charset = Charset.defaultCharset();
346 private Observable<?> source = null;
347 private Action0 onWatchStarted = new Action0() {
348 @Override
349 public void call() {
350
351 }
352 };
353
354 private Builder() {
355 }
356
357
358
359
360
361
362
363 public Builder file(File file) {
364 this.file = file;
365 return this;
366 }
367
368 public Builder file(String filename) {
369 return file(new File(filename));
370 }
371
372 public Builder onWatchStarted(Action0 onWatchStarted) {
373 this.onWatchStarted = onWatchStarted;
374 return this;
375 }
376
377
378
379
380
381
382
383
384 public Builder startPosition(long startPosition) {
385 this.startPosition = startPosition;
386 return this;
387 }
388
389
390
391
392
393
394
395
396
397
398
399 public Builder sampleTimeMs(long sampleTimeMs) {
400 this.sampleTimeMs = sampleTimeMs;
401 return this;
402 }
403
404
405
406
407
408
409
410 public Builder chunkSize(int chunkSize) {
411 this.chunkSize = chunkSize;
412 return this;
413 }
414
415
416
417
418
419
420
421 public Builder charset(Charset charset) {
422 this.charset = charset;
423 return this;
424 }
425
426
427
428
429
430
431
432 public Builder charset(String charset) {
433 return charset(Charset.forName(charset));
434 }
435
436 public Builder utf8() {
437 return charset("UTF-8");
438 }
439
440 public Builder source(Observable<?> source) {
441 this.source = source;
442 return this;
443 }
444
445 public Observable<byte[]> tail() {
446
447 return tailFile(file, startPosition, sampleTimeMs, chunkSize, getSource());
448 }
449
450 public Observable<String> tailText() {
451 return tailTextFile(file, startPosition, chunkSize, charset, getSource());
452 }
453
454 private Observable<?> getSource() {
455 if (source == null)
456 return from(file, onWatchStarted, StandardWatchEventKinds.ENTRY_CREATE,
457 StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.OVERFLOW);
458 else
459 return source;
460
461 }
462
463 }
464
465 }