1 package com.github.davidmoten.rx2.internal.flowable;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.net.ServerSocket;
6 import java.net.Socket;
7 import java.net.SocketException;
8 import java.net.SocketTimeoutException;
9 import java.util.concurrent.Callable;
10
11 import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
12 import com.github.davidmoten.rx2.Bytes;
13 import com.github.davidmoten.rx2.Consumers;
14
15 import io.reactivex.Emitter;
16 import io.reactivex.Flowable;
17 import io.reactivex.functions.Action;
18 import io.reactivex.functions.Consumer;
19 import io.reactivex.functions.Function;
20 import io.reactivex.functions.Predicate;
21
22 public final class FlowableServerSocket {
23
24 private FlowableServerSocket() {
25
26 }
27
28 public static Flowable<Flowable<byte[]>> create(final Callable<? extends ServerSocket> serverSocketFactory,
29 final int timeoutMs, final int bufferSize, Action preAcceptAction, int acceptTimeoutMs,
30 Predicate<? super Socket> acceptSocket) {
31 Function<ServerSocket, Flowable<Flowable<byte[]>>> FlowableFactory = createFlowableFactory(timeoutMs,
32 bufferSize, preAcceptAction, acceptSocket);
33 return Flowable.<Flowable<byte[]>, ServerSocket>using(
34 createServerSocketFactory(serverSocketFactory, acceptTimeoutMs),
35 FlowableFactory,
36 new Consumer<ServerSocket>() {
37
38
39 @Override
40 public void accept(ServerSocket ss) throws Exception {
41 ss.close();
42 }
43 },
44 true);
45 }
46
47 private static Callable<ServerSocket> createServerSocketFactory(
48 final Callable<? extends ServerSocket> serverSocketFactory, final int acceptTimeoutMs) {
49 return new Callable<ServerSocket>() {
50 @Override
51 public ServerSocket call() throws Exception {
52 return createServerSocket(serverSocketFactory, acceptTimeoutMs);
53 }
54 };
55 }
56
57 private static ServerSocket createServerSocket(Callable<? extends ServerSocket> serverSocketCreator, long timeoutMs)
58 throws Exception {
59 ServerSocket s = serverSocketCreator.call();
60 s.setSoTimeout((int) timeoutMs);
61 return s;
62 }
63
64 private static Function<ServerSocket, Flowable<Flowable<byte[]>>> createFlowableFactory(final int timeoutMs,
65 final int bufferSize, final Action preAcceptAction, final Predicate<? super Socket> acceptSocket) {
66 return new Function<ServerSocket, Flowable<Flowable<byte[]>>>() {
67 @Override
68 public Flowable<Flowable<byte[]>> apply(ServerSocket serverSocket) {
69 return createServerSocketFlowable(serverSocket, timeoutMs, bufferSize, preAcceptAction, acceptSocket);
70 }
71 };
72 }
73
74 private static Flowable<Flowable<byte[]>> createServerSocketFlowable(final ServerSocket serverSocket,
75 final long timeoutMs, final int bufferSize, final Action preAcceptAction,
76 final Predicate<? super Socket> acceptSocket) {
77 return Flowable.generate(
78 new Consumer<Emitter<Flowable<byte[]>>>() {
79 @Override
80 public void accept(Emitter<Flowable<byte[]>> emitter) throws Exception {
81 acceptConnection(timeoutMs, bufferSize, serverSocket, emitter, preAcceptAction, acceptSocket);
82 }
83 });
84 }
85
86 private static void acceptConnection(long timeoutMs, int bufferSize, ServerSocket ss,
87 Emitter<Flowable<byte[]>> emitter, Action preAcceptAction, Predicate<? super Socket> acceptSocket) {
88 Socket socket;
89 while (true) {
90 try {
91 preAcceptAction.run();
92 socket = ss.accept();
93 if (!acceptSocket.test(socket)) {
94 closeQuietly(socket);
95 } else {
96 emitter.onNext(createSocketFlowable(socket, timeoutMs, bufferSize));
97 break;
98 }
99 } catch (SocketTimeoutException e) {
100
101 } catch (Throwable e) {
102
103
104
105
106 if (e instanceof SocketException && ("Socket closed".equals(e.getMessage())
107 || "Socket operation on nonsocket: configureBlocking".equals(e.getMessage()))) {
108 break;
109 } else {
110
111 emitter.onError(e);
112 break;
113 }
114 }
115 }
116 }
117
118 @VisibleForTesting
119 static void closeQuietly(Socket socket) {
120 try {
121 socket.close();
122 } catch (IOException e) {
123
124 }
125 }
126
127 private static Flowable<byte[]> createSocketFlowable(final Socket socket, long timeoutMs, final int bufferSize) {
128 setTimeout(socket, timeoutMs);
129 return Flowable.using(
130 new Callable<InputStream>() {
131 @Override
132 public InputStream call() throws Exception {
133 return socket.getInputStream();
134 }
135 },
136 new Function<InputStream, Flowable<byte[]>>() {
137 @Override
138 public Flowable<byte[]> apply(InputStream is) {
139 return Bytes.from(is, bufferSize);
140 }
141 },
142 Consumers.close(),
143 true);
144 }
145
146 private static void setTimeout(Socket socket, long timeoutMs) {
147 try {
148 socket.setSoTimeout((int) timeoutMs);
149 } catch (SocketException e) {
150 throw new RuntimeException(e);
151 }
152 }
153
154 }