1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.net.ServerSocket;
6 import java.net.Socket;
7 import java.net.SocketException;
8 import java.net.SocketTimeoutException;
9
10 import com.github.davidmoten.rx.Actions;
11 import com.github.davidmoten.rx.Bytes;
12 import com.github.davidmoten.rx.Checked;
13 import com.github.davidmoten.rx.Checked.F0;
14 import com.github.davidmoten.rx.Functions;
15
16 import rx.Observable;
17 import rx.Observer;
18 import rx.functions.Action0;
19 import rx.functions.Action1;
20 import rx.functions.Action2;
21 import rx.functions.Func0;
22 import rx.functions.Func1;
23 import rx.observables.SyncOnSubscribe;
24
25 public final class ObservableServerSocket {
26
27 private ObservableServerSocket() {
28
29 }
30
31 public static Observable<Observable<byte[]>> create(
32 final Func0<? extends ServerSocket> serverSocketFactory, final int timeoutMs,
33 final int bufferSize, Action0 preAcceptAction, int acceptTimeoutMs,
34 Func1<? super Socket, Boolean> acceptSocket) {
35 Func1<ServerSocket, Observable<Observable<byte[]>>> observableFactory = createObservableFactory(
36 timeoutMs, bufferSize, preAcceptAction, acceptSocket);
37 return Observable.<Observable<byte[]>, ServerSocket> using(
38 createServerSocketFactory(serverSocketFactory, acceptTimeoutMs),
39 observableFactory,
40 new Action1<ServerSocket>() {
41
42 @Override
43 public void call(ServerSocket ss) {
44 try {
45 ss.close();
46 } catch (IOException e) {
47 throw new RuntimeException(e);
48 }
49 }
50
51 }, true);
52 }
53
54 private static Func0<ServerSocket> createServerSocketFactory(
55 final Func0<? extends ServerSocket> serverSocketFactory, final int acceptTimeoutMs) {
56 return Checked.f0(new F0<ServerSocket>() {
57 @Override
58 public ServerSocket call() throws Exception {
59 return createServerSocket(serverSocketFactory, acceptTimeoutMs);
60 }
61 });
62 }
63
64 private static ServerSocket createServerSocket(
65 Func0<? extends ServerSocket> serverSocketCreator, long timeoutMs) throws IOException {
66 ServerSocket s = serverSocketCreator.call();
67 s.setSoTimeout((int) timeoutMs);
68 return s;
69 }
70
71 private static Func1<ServerSocket, Observable<Observable<byte[]>>> createObservableFactory(
72 final int timeoutMs, final int bufferSize, final Action0 preAcceptAction,
73 final Func1<? super Socket, Boolean> acceptSocket) {
74 return new Func1<ServerSocket, Observable<Observable<byte[]>>>() {
75 @Override
76 public Observable<Observable<byte[]>> call(ServerSocket serverSocket) {
77 return createServerSocketObservable(serverSocket, timeoutMs, bufferSize,
78 preAcceptAction, acceptSocket);
79 }
80 };
81 }
82
83 private static Observable<Observable<byte[]>> createServerSocketObservable(
84 ServerSocket serverSocket, final long timeoutMs, final int bufferSize,
85 final Action0 preAcceptAction, final Func1<? super Socket, Boolean> acceptSocket) {
86 return Observable.create(
87 SyncOnSubscribe.<ServerSocket, Observable<byte[]>> createSingleState(
88 Functions.constant0(serverSocket),
89 new Action2<ServerSocket, Observer<? super Observable<byte[]>>>() {
90
91 @Override
92 public void call(ServerSocket ss,
93 Observer<? super Observable<byte[]>> observer) {
94 acceptConnection(timeoutMs, bufferSize, ss, observer,
95 preAcceptAction, acceptSocket);
96 }
97 }));
98 }
99
100 private static void acceptConnection(long timeoutMs, int bufferSize, ServerSocket ss,
101 Observer<? super Observable<byte[]>> observer, Action0 preAcceptAction,
102 Func1<? super Socket, Boolean> acceptSocket) {
103 Socket socket;
104 while (true) {
105 try {
106 preAcceptAction.call();
107 socket = ss.accept();
108 if (!acceptSocket.call(socket)) {
109 closeQuietly(socket);
110 } else {
111 observer.onNext(createSocketObservable(socket, timeoutMs, bufferSize));
112 break;
113 }
114 } catch (SocketTimeoutException e) {
115
116 } catch (IOException e) {
117
118 observer.onError(e);
119 break;
120 }
121 }
122 }
123
124 private static void closeQuietly(Socket socket) {
125 try {
126 socket.close();
127 } catch (IOException e) {
128
129 }
130 }
131
132 private static Observable<byte[]> createSocketObservable(final Socket socket, long timeoutMs,
133 final int bufferSize) {
134 setTimeout(socket, timeoutMs);
135 return Observable.using(
136 Checked.f0(new F0<InputStream>() {
137 @Override
138 public InputStream call() throws Exception {
139 return socket.getInputStream();
140 }
141 }),
142 new Func1<InputStream, Observable<byte[]>>() {
143 @Override
144 public Observable<byte[]> call(InputStream is) {
145 return Bytes.from(is, bufferSize);
146 }
147 },
148 Actions.close(),
149 true);
150 }
151
152 private static void setTimeout(Socket socket, long timeoutMs) {
153 try {
154 socket.setSoTimeout((int) timeoutMs);
155 } catch (SocketException e) {
156 throw new RuntimeException(e);
157 }
158 }
159
160 }