1 package com.github.davidmoten.rx;
2
3 import java.io.File;
4 import java.nio.charset.Charset;
5 import java.nio.file.Path;
6 import java.nio.file.Paths;
7 import java.nio.file.StandardWatchEventKinds;
8 import java.nio.file.WatchEvent;
9 import java.nio.file.WatchEvent.Kind;
10 import java.nio.file.WatchService;
11 import java.util.concurrent.TimeUnit;
12
13 import com.github.davidmoten.rx.operators.OperatorFileTailer;
14 import com.github.davidmoten.rx.operators.OperatorWatchServiceEvents;
15
16 import rx.Observable;
17 import rx.functions.Action0;
18 import rx.functions.Action1;
19 import rx.functions.Func0;
20 import rx.functions.Func1;
21 import rx.observables.GroupedObservable;
22
23 /**
24 * Observable utility methods related to {@link File}.
25 */
26 public final class FileObservable {
27
28 public static final int DEFAULT_MAX_BYTES_PER_EMISSION = 8192;
29
30 /**
31 * Returns an {@link Observable} that uses NIO {@link WatchService} (and a
32 * dedicated thread) to push modified events to an observable that reads and
33 * reports new sequences of bytes to a subscriber. The NIO
34 * {@link WatchService} events are sampled according to
35 * <code>sampleTimeMs</code> so that lots of discrete activity on a file
36 * (for example a log file with very frequent entries) does not prompt an
37 * inordinate number of file reads to pick up changes.
38 *
39 * @param file
40 * the file to tail
41 * @param startPosition
42 * start tailing file at position in bytes
43 * @param sampleTimeMs
44 * sample time in millis
45 * @param chunkSize
46 * max array size of each element emitted by the Observable. Is
47 * also used as the buffer size for reading from the file. Try
48 * {@link FileObservable#DEFAULT_MAX_BYTES_PER_EMISSION} if you
49 * don't know what to put here.
50 * @return
51 */
52 public final static Observable<byte[]> tailFile(File file, long startPosition,
53 long sampleTimeMs, int chunkSize) {
54 Observable<Object> events = from(file, StandardWatchEventKinds.ENTRY_CREATE,
55 StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.OVERFLOW)
56 // don't care about the event details, just that there
57 // is one
58 .cast(Object.class)
59 // get lines once on subscription so we tail the lines
60 // in the file at startup
61 .startWith(new Object());
62 return tailFile(file, startPosition, sampleTimeMs, chunkSize, events);
63 }
64
65 /**
66 * Returns an {@link Observable} that uses given given observable to push
67 * modified events to an observable that reads and reports new sequences of
68 * bytes to a subscriber. The NIO {@link WatchService} MODIFY and OVERFLOW
69 * events are sampled according to <code>sampleTimeMs</code> so that lots of
70 * discrete activity on a file (for example a log file with very frequent
71 * entries) does not prompt an inordinate number of file reads to pick up
72 * changes. File create events are not sampled and are always passed
73 * through.
74 *
75 * @param file
76 * the file to tail
77 * @param startPosition
78 * start tailing file at position in bytes
79 * @param sampleTimeMs
80 * sample time in millis for MODIFY and OVERFLOW events
81 * @param chunkSize
82 * max array size of each element emitted by the Observable. Is
83 * also used as the buffer size for reading from the file. Try
84 * {@link FileObservable#DEFAULT_MAX_BYTES_PER_EMISSION} if you
85 * don't know what to put here.
86 * @return
87 */
88 public final static Observable<byte[]> tailFile(File file, long startPosition,
89 long sampleTimeMs, int chunkSize, Observable<?> events) {
90 return sampleModifyOrOverflowEventsOnly(events, sampleTimeMs)
91 // tail file triggered by events
92 .lift(new OperatorFileTailer(file, startPosition, chunkSize));
93 }
94
95 /**
96 * Returns an {@link Observable} that uses NIO {@link WatchService} (and a
97 * dedicated thread) to push modified events to an observable that reads and
98 * reports new lines to a subscriber. The NIO WatchService MODIFY and
99 * OVERFLOW events are sampled according to <code>sampleTimeMs</code> so
100 * that lots of discrete activity on a file (for example a log file with
101 * very frequent entries) does not prompt an inordinate number of file reads
102 * to pick up changes. File create events are not sampled and are always
103 * passed through.
104 *
105 * @param file
106 * the file to tail
107 * @param startPosition
108 * start tailing file at position in bytes
109 * @param sampleTimeMs
110 * sample time in millis for MODIFY and OVERFLOW events
111 * @param charset
112 * the character set to use to decode the bytes to a string
113 * @return
114 */
115 public final static Observable<String> tailTextFile(File file, long startPosition,
116 long sampleTimeMs, Charset charset) {
117 return toLines(tailFile(file, startPosition, sampleTimeMs, DEFAULT_MAX_BYTES_PER_EMISSION),
118 charset);
119 }
120
121 /**
122 * Returns an {@link Observable} of String that uses the given events stream
123 * to trigger checks on file change so that new lines can be read and
124 * emitted.
125 *
126 * @param file
127 * the file to tail, cannot be null
128 * @param startPosition
129 * start tailing file at position in bytes
130 * @param charset
131 * the character set to use to decode the bytes to a string
132 * @param events
133 * trigger a check for file changes. Use
134 * {@link Observable#interval(long, TimeUnit)} for example.
135 * @return
136 */
137 public final static Observable<String> tailTextFile(File file, long startPosition,
138 int chunkSize, Charset charset, Observable<?> events) {
139 return toLines(events.lift(new OperatorFileTailer(file, startPosition, chunkSize))
140 .onBackpressureBuffer(), charset);
141 }
142
143 /**
144 * Returns an {@link Observable} of {@link WatchEvent}s from a
145 * {@link WatchService}.
146 *
147 * @param watchService
148 * {@link WatchService} to generate events for
149 * @return
150 */
151 public final static Observable<WatchEvent<?>> from(WatchService watchService) {
152 return Observable.just(watchService).lift(new OperatorWatchServiceEvents())
153 .onBackpressureBuffer();
154 }
155
156 /**
157 * If file does not exist at subscribe time then is assumed to not be a
158 * directory. If the file is not a directory (bearing in mind the aforesaid
159 * assumption) then a {@link WatchService} is set up on its parent and
160 * {@link WatchEvent}s of the given kinds are filtered to concern the file
161 * in question. If the file is a directory then a {@link WatchService} is
162 * set up on the directory and all events are passed through of the given
163 * kinds.
164 *
165 * @param file
166 * file to watch
167 * @param kinds
168 * event kinds to watch for and emit
169 * @return
170 */
171 @SafeVarargs
172 public final static Observable<WatchEvent<?>> from(final File file, Kind<?>... kinds) {
173 return from(file, null, kinds);
174 }
175
176 /**
177 * If file does not exist at subscribe time then is assumed to not be a
178 * directory. If the file is not a directory (bearing in mind the aforesaid
179 * assumption) then a {@link WatchService} is set up on its parent and
180 * {@link WatchEvent}s of the given kinds are filtered to concern the file
181 * in question. If the file is a directory then a {@link WatchService} is
182 * set up on the directory and all events are passed through of the given
183 * kinds.
184 *
185 * @param file
186 * @param onWatchStarted
187 * called when WatchService is created
188 * @param kinds
189 * @return
190 */
191 public final static Observable<WatchEvent<?>> from(final File file,
192 final Action0 onWatchStarted, Kind<?>... kinds) {
193 return watchService(file, kinds)
194 // when watch service created call onWatchStarted
195 .doOnNext(new Action1<WatchService>() {
196 @Override
197 public void call(WatchService w) {
198 if (onWatchStarted != null)
199 onWatchStarted.call();
200 }
201 })
202 // emit events from the WatchService
203 .flatMap(TO_WATCH_EVENTS)
204 // restrict to events related to the file
205 .filter(onlyRelatedTo(file));
206 }
207
208 /**
209 * Creates a {@link WatchService} on subscribe for the given file and event
210 * kinds.
211 *
212 * @param file
213 * the file to watch
214 * @param kinds
215 * event kinds to watch for
216 * @return
217 */
218 @SafeVarargs
219 public final static Observable<WatchService> watchService(final File file,
220 final Kind<?>... kinds) {
221 return Observable.defer(new Func0<Observable<WatchService>>() {
222
223 @Override
224 public Observable<WatchService> call() {
225 try {
226 final Path path = getBasePath(file);
227 WatchService watchService = path.getFileSystem().newWatchService();
228 path.register(watchService, kinds);
229 return Observable.just(watchService);
230 } catch (Exception e) {
231 return Observable.error(e);
232 }
233 }
234 });
235
236 }
237
238 private final static Path getBasePath(final File file) {
239 final Path path;
240 if (file.exists() && file.isDirectory())
241 path = Paths.get(file.toURI());
242 else
243 path = Paths.get(file.getParentFile().toURI());
244 return path;
245 }
246
247 /**
248 * Returns true if and only if the path corresponding to a WatchEvent
249 * represents the given file. This will be the case for Create, Modify,
250 * Delete events.
251 *
252 * @param file
253 * the file to restrict events to
254 * @return
255 */
256 private final static Func1<WatchEvent<?>, Boolean> onlyRelatedTo(final File file) {
257 return new Func1<WatchEvent<?>, Boolean>() {
258
259 @Override
260 public Boolean call(WatchEvent<?> event) {
261
262 final boolean ok;
263 if (file.isDirectory())
264 ok = true;
265 else if (StandardWatchEventKinds.OVERFLOW.equals(event.kind()))
266 ok = true;
267 else {
268 Object context = event.context();
269 if (context != null && context instanceof Path) {
270 Path p = (Path) context;
271 Path basePath = getBasePath(file);
272 File pFile = new File(basePath.toFile(), p.toString());
273 ok = pFile.getAbsolutePath().equals(file.getAbsolutePath());
274 } else
275 ok = false;
276 }
277 return ok;
278 }
279 };
280 }
281
282 private static Observable<String> toLines(Observable<byte[]> bytes, Charset charset) {
283 return Strings.split(Strings.decode(bytes, charset), "\n");
284 }
285
286 private final static Func1<WatchService, Observable<WatchEvent<?>>> TO_WATCH_EVENTS = new Func1<WatchService, Observable<WatchEvent<?>>>() {
287
288 @Override
289 public Observable<WatchEvent<?>> call(WatchService watchService) {
290 return from(watchService);
291 }
292 };
293
294 private static Observable<Object> sampleModifyOrOverflowEventsOnly(Observable<?> events,
295 final long sampleTimeMs) {
296 return events
297 // group by true if is modify or overflow, false otherwise
298 .groupBy(IS_MODIFY_OR_OVERFLOW)
299 // only sample if is modify or overflow
300 .flatMap(sampleIfTrue(sampleTimeMs));
301 }
302
303 private static Func1<GroupedObservable<Boolean, ?>, Observable<?>> sampleIfTrue(
304 final long sampleTimeMs) {
305 return new Func1<GroupedObservable<Boolean, ?>, Observable<?>>() {
306
307 @Override
308 public Observable<?> call(GroupedObservable<Boolean, ?> group) {
309 // if is modify or overflow WatchEvent
310 if (group.getKey())
311 return group.sample(sampleTimeMs, TimeUnit.MILLISECONDS);
312 else
313 return group;
314 }
315 };
316 }
317
318 private static Func1<Object, Boolean> IS_MODIFY_OR_OVERFLOW = new Func1<Object, Boolean>() {
319
320 @Override
321 public Boolean call(Object event) {
322 if (event instanceof WatchEvent) {
323 WatchEvent<?> w = (WatchEvent<?>) event;
324 String kind = w.kind().name();
325 if (kind.equals(StandardWatchEventKinds.ENTRY_MODIFY.name())
326 || kind.equals(StandardWatchEventKinds.OVERFLOW.name())) {
327 return true;
328 } else
329 return false;
330 } else
331 return false;
332 }
333 };
334
335 public static Builder tailer() {
336 return new Builder();
337 }
338
339 public static class Builder {
340
341 private File file = null;
342 private long startPosition = 0;
343 private long sampleTimeMs = 500;
344 private int chunkSize = 8192;
345 private Charset charset = Charset.defaultCharset();
346 private Observable<?> source = null;
347 private Action0 onWatchStarted = new Action0() {
348 @Override
349 public void call() {
350 // do nothing
351 }
352 };
353
354 private Builder() {
355 }
356
357 /**
358 * The file to tail.
359 *
360 * @param file
361 * @return this
362 */
363 public Builder file(File file) {
364 this.file = file;
365 return this;
366 }
367
368 public Builder file(String filename) {
369 return file(new File(filename));
370 }
371
372 public Builder onWatchStarted(Action0 onWatchStarted) {
373 this.onWatchStarted = onWatchStarted;
374 return this;
375 }
376
377 /**
378 * The startPosition in bytes in the file to commence the tail from. 0 =
379 * start of file. Defaults to 0.
380 *
381 * @param startPosition
382 * @return this
383 */
384 public Builder startPosition(long startPosition) {
385 this.startPosition = startPosition;
386 return this;
387 }
388
389 /**
390 * Specifies sampling to apply to the source observable (which could be
391 * very busy if a lot of writes are occurring for example). Sampling is
392 * only applied to file updates (MODIFY and OVERFLOW), file creation
393 * events are always passed through. File deletion events are ignored
394 * (in fact are not requested of NIO).
395 *
396 * @param sampleTimeMs
397 * @return this
398 */
399 public Builder sampleTimeMs(long sampleTimeMs) {
400 this.sampleTimeMs = sampleTimeMs;
401 return this;
402 }
403
404 /**
405 * Emissions from the tailed file will be no bigger than this.
406 *
407 * @param chunkSize
408 * @return this
409 */
410 public Builder chunkSize(int chunkSize) {
411 this.chunkSize = chunkSize;
412 return this;
413 }
414
415 /**
416 * The charset of the file. Only used for tailing a text file.
417 *
418 * @param charset
419 * @return this
420 */
421 public Builder charset(Charset charset) {
422 this.charset = charset;
423 return this;
424 }
425
426 /**
427 * The charset of the file. Only used for tailing a text file.
428 *
429 * @param charset
430 * @return this
431 */
432 public Builder charset(String charset) {
433 return charset(Charset.forName(charset));
434 }
435
436 public Builder utf8() {
437 return charset("UTF-8");
438 }
439
440 public Builder source(Observable<?> source) {
441 this.source = source;
442 return this;
443 }
444
445 public Observable<byte[]> tail() {
446
447 return tailFile(file, startPosition, sampleTimeMs, chunkSize, getSource());
448 }
449
450 public Observable<String> tailText() {
451 return tailTextFile(file, startPosition, chunkSize, charset, getSource());
452 }
453
454 private Observable<?> getSource() {
455 if (source == null)
456 return from(file, onWatchStarted, StandardWatchEventKinds.ENTRY_CREATE,
457 StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.OVERFLOW);
458 else
459 return source;
460
461 }
462
463 }
464
465 }