View Javadoc
1   package com.github.davidmoten.rx.operators;
2   
3   import java.io.IOException;
4   import java.nio.file.ClosedWatchServiceException;
5   import java.nio.file.WatchEvent;
6   import java.nio.file.WatchKey;
7   import java.nio.file.WatchService;
8   import java.util.concurrent.atomic.AtomicBoolean;
9   
10  import rx.Observable.Operator;
11  import rx.Observer;
12  import rx.Subscriber;
13  import rx.Subscription;
14  import rx.observers.Subscribers;
15  
16  /**
17   * Emits {@link WatchEvent}s for each input {@link WatchService}.
18   */
19  public class OperatorWatchServiceEvents implements Operator<WatchEvent<?>, WatchService> {
20  
21      @Override
22      public Subscriber<? super WatchService> call(final Subscriber<? super WatchEvent<?>> subscriber) {
23          Subscriber<WatchService> result = Subscribers.from(new Observer<WatchService>() {
24  
25              @Override
26              public void onCompleted() {
27                  subscriber.onCompleted();
28              }
29  
30              @Override
31              public void onError(Throwable e) {
32                  subscriber.onError(e);
33              }
34  
35              @Override
36              public void onNext(WatchService watchService) {
37                  AtomicBoolean subscribed = new AtomicBoolean(true);
38                  if (!subscribed.get()) {
39                      subscriber.onError(new RuntimeException(
40                              "WatchService closed. You can only subscribe once to a WatchService."));
41                      return;
42                  }
43                  subscriber.add(createSubscriptionToCloseWatchService(watchService, subscribed, subscriber));
44                  emitEvents(watchService, subscriber, subscribed);
45              }
46          });
47          subscriber.add(result);
48          return result;
49      }
50  
51      private static void emitEvents(WatchService watchService, Subscriber<? super WatchEvent<?>> subscriber,
52              AtomicBoolean subscribed) {
53          // get the first event before looping
54          WatchKey key = nextKey(watchService, subscriber, subscribed);
55  
56          while (key != null) {
57              if (subscriber.isUnsubscribed())
58                  return;
59              // we have a polled event, now we traverse it and
60              // receive all the states from it
61              for (WatchEvent<?> event : key.pollEvents()) {
62                  if (subscriber.isUnsubscribed())
63                      return;
64                  else
65                      subscriber.onNext(event);
66              }
67  
68              boolean valid = key.reset();
69              if (!valid && subscribed.get()) {
70                  subscriber.onCompleted();
71                  return;
72              } else if (!valid)
73                  return;
74  
75              key = nextKey(watchService, subscriber, subscribed);
76          }
77      }
78  
79      private static WatchKey nextKey(WatchService watchService, Subscriber<? super WatchEvent<?>> subscriber,
80              AtomicBoolean subscribed) {
81          try {
82              // this command blocks but unsubscribe close the watch
83              // service and interrupt it
84              return watchService.take();
85          } catch (ClosedWatchServiceException e) {
86              // must have unsubscribed
87              if (subscribed.get())
88                  subscriber.onCompleted();
89              return null;
90          } catch (InterruptedException e) {
91              // this case is problematic because unsubscribe may call
92              // Thread.interrupt() before calling the unsubscribe method of
93              // the Subscription. Thus at this point we don't know if a
94              // deliberate interrupt was called in which case I would call
95              // onComplete or if unsubscribe was called in which case I
96              // should not call anything. For the moment I choose to not call
97              // anything partly because a deliberate stop of the
98              // watchService.take ignorant of the Observable should ideally
99              // happen via a call to the WatchService.close() method rather
100             // than Thread.interrupt().
101             // TODO raise the issue with RxJava team in particular
102             // Subscriptions.from(Future) calling FutureTask.cancel(true)
103             try {
104                 watchService.close();
105             } catch (IOException e1) {
106                 // do nothing
107             }
108             return null;
109         }
110     }
111 
112     private final static Subscription createSubscriptionToCloseWatchService(final WatchService watchService,
113             final AtomicBoolean subscribed, final Subscriber<? super WatchEvent<?>> subscriber) {
114         return new Subscription() {
115 
116             @Override
117             public void unsubscribe() {
118                 try {
119                     watchService.close();
120                 } catch (ClosedWatchServiceException e) {
121                     // do nothing
122                 } catch (IOException e) {
123                     // do nothing
124                 } finally {
125                     subscribed.set(false);
126                 }
127             }
128 
129             @Override
130             public boolean isUnsubscribed() {
131                 return !subscribed.get();
132             }
133         };
134     }
135 
136 }