View Javadoc
1   package com.github.davidmoten.rx.operators;
2   
3   import java.io.File;
4   import java.io.FileInputStream;
5   import java.io.IOException;
6   import java.io.InputStream;
7   import java.nio.file.StandardWatchEventKinds;
8   import java.nio.file.WatchEvent;
9   import java.util.concurrent.atomic.AtomicLong;
10  
11  import com.github.davidmoten.rx.Bytes;
12  import com.github.davidmoten.rx.subjects.PublishSubjectSingleSubscriber;
13  
14  import rx.Observable;
15  import rx.Observable.Operator;
16  import rx.Subscriber;
17  import rx.functions.Action1;
18  import rx.functions.Func0;
19  import rx.functions.Func1;
20  import rx.observers.Subscribers;
21  
22  /**
23   * Reacts to source events by emitting new lines written to a file since the
24   * last source event.
25   */
26  public class OperatorFileTailer implements Operator<byte[], Object> {
27  
28      private final File file;
29      private final AtomicLong currentPosition = new AtomicLong();
30      private final int maxBytesPerEmission;
31  
32      /**
33       * Constructor.
34       * 
35       * @param file
36       *            text file to tail
37       * @param startPosition
38       *            start tailing the file after this many bytes
39       */
40      public OperatorFileTailer(File file, long startPosition, int maxBytesPerEmission) {
41          if (file == null)
42              throw new NullPointerException("file cannot be null");
43          this.file = file;
44          this.currentPosition.set(startPosition);
45          this.maxBytesPerEmission = maxBytesPerEmission;
46      }
47  
48      /**
49       * Constructor. Emits byte arrays of up to 8*1024 bytes.
50       * 
51       * @param file
52       * @param startPosition
53       */
54      public OperatorFileTailer(File file, long startPosition) {
55          this(file, startPosition, 8192);
56      }
57  
58      @Override
59      public Subscriber<? super Object> call(Subscriber<? super byte[]> child) {
60          final PublishSubjectSingleSubscriber<? super Object> subject = PublishSubjectSingleSubscriber
61                  .create();
62          Subscriber<? super Object> parent = Subscribers.from(subject);
63          child.add(parent);
64          subject
65                  // report new lines for each event
66                  .concatMap(reportNewLines(file, currentPosition, maxBytesPerEmission))
67                  // subscribe
68                  .unsafeSubscribe(child);
69          return parent;
70      }
71  
72      private static Func1<Object, Observable<byte[]>> reportNewLines(final File file,
73              final AtomicLong currentPosition, final int maxBytesPerEmission) {
74          return new Func1<Object, Observable<byte[]>>() {
75              @Override
76              public Observable<byte[]> call(Object event) {
77                  // reset current position if file is moved or deleted
78                  if (event instanceof WatchEvent) {
79                      WatchEvent<?> w = (WatchEvent<?>) event;
80                      String kind = w.kind().name();
81                      if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE.name())) {
82                          currentPosition.set(0);
83                      }
84                  }
85                  long length = file.length();
86                  if (length > currentPosition.get()) {
87                      try {
88                          final FileInputStream fis = new FileInputStream(file);
89                          fis.skip(currentPosition.get());
90                          // apply using method to ensure fis is closed on
91                          // termination or unsubscription
92                          return Observable.using(new Func0<InputStream>() {
93  
94                              @Override
95                              public InputStream call() {
96                                  return fis;
97                              }
98                          }, new Func1<InputStream, Observable<byte[]>>() {
99  
100                             @Override
101                             public Observable<byte[]> call(InputStream t1) {
102                                 return Bytes.from(fis, maxBytesPerEmission)
103                                         // move marker
104                                         .doOnNext(new Action1<byte[]>() {
105                                     @Override
106                                     public void call(byte[] bytes) {
107                                         currentPosition.addAndGet(bytes.length);
108                                     }
109                                 });
110                             }
111                         }, new Action1<InputStream>() {
112                             @Override
113                             public void call(InputStream is) {
114                                 try {
115                                     is.close();
116                                 } catch (IOException e) {
117                                     // don't care
118                                 }
119                             }
120                         });
121                     } catch (IOException e) {
122                         return Observable.error(e);
123                     }
124                 } else
125                     return Observable.empty();
126             }
127 
128         };
129     }
130 
131 }