1 package com.github.davidmoten.rx.operators;
2
3 import java.io.IOException;
4 import java.nio.file.ClosedWatchServiceException;
5 import java.nio.file.WatchEvent;
6 import java.nio.file.WatchKey;
7 import java.nio.file.WatchService;
8 import java.util.concurrent.atomic.AtomicBoolean;
9
10 import rx.Observable.Operator;
11 import rx.Observer;
12 import rx.Subscriber;
13 import rx.Subscription;
14 import rx.observers.Subscribers;
15
16
17
18
19 public class OperatorWatchServiceEvents implements Operator<WatchEvent<?>, WatchService> {
20
21 @Override
22 public Subscriber<? super WatchService> call(final Subscriber<? super WatchEvent<?>> subscriber) {
23 Subscriber<WatchService> result = Subscribers.from(new Observer<WatchService>() {
24
25 @Override
26 public void onCompleted() {
27 subscriber.onCompleted();
28 }
29
30 @Override
31 public void onError(Throwable e) {
32 subscriber.onError(e);
33 }
34
35 @Override
36 public void onNext(WatchService watchService) {
37 AtomicBoolean subscribed = new AtomicBoolean(true);
38 if (!subscribed.get()) {
39 subscriber.onError(new RuntimeException(
40 "WatchService closed. You can only subscribe once to a WatchService."));
41 return;
42 }
43 subscriber.add(createSubscriptionToCloseWatchService(watchService, subscribed, subscriber));
44 emitEvents(watchService, subscriber, subscribed);
45 }
46 });
47 subscriber.add(result);
48 return result;
49 }
50
51 private static void emitEvents(WatchService watchService, Subscriber<? super WatchEvent<?>> subscriber,
52 AtomicBoolean subscribed) {
53
54 WatchKey key = nextKey(watchService, subscriber, subscribed);
55
56 while (key != null) {
57 if (subscriber.isUnsubscribed())
58 return;
59
60
61 for (WatchEvent<?> event : key.pollEvents()) {
62 if (subscriber.isUnsubscribed())
63 return;
64 else
65 subscriber.onNext(event);
66 }
67
68 boolean valid = key.reset();
69 if (!valid && subscribed.get()) {
70 subscriber.onCompleted();
71 return;
72 } else if (!valid)
73 return;
74
75 key = nextKey(watchService, subscriber, subscribed);
76 }
77 }
78
79 private static WatchKey nextKey(WatchService watchService, Subscriber<? super WatchEvent<?>> subscriber,
80 AtomicBoolean subscribed) {
81 try {
82
83
84 return watchService.take();
85 } catch (ClosedWatchServiceException e) {
86
87 if (subscribed.get())
88 subscriber.onCompleted();
89 return null;
90 } catch (InterruptedException e) {
91
92
93
94
95
96
97
98
99
100
101
102
103 try {
104 watchService.close();
105 } catch (IOException e1) {
106
107 }
108 return null;
109 }
110 }
111
112 private final static Subscription createSubscriptionToCloseWatchService(final WatchService watchService,
113 final AtomicBoolean subscribed, final Subscriber<? super WatchEvent<?>> subscriber) {
114 return new Subscription() {
115
116 @Override
117 public void unsubscribe() {
118 try {
119 watchService.close();
120 } catch (ClosedWatchServiceException e) {
121
122 } catch (IOException e) {
123
124 } finally {
125 subscribed.set(false);
126 }
127 }
128
129 @Override
130 public boolean isUnsubscribed() {
131 return !subscribed.get();
132 }
133 };
134 }
135
136 }