View Javadoc
1   package com.github.davidmoten.rx2.internal.flowable;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.net.ServerSocket;
6   import java.net.Socket;
7   import java.net.SocketException;
8   import java.net.SocketTimeoutException;
9   import java.util.concurrent.Callable;
10  
11  import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
12  import com.github.davidmoten.rx2.Bytes;
13  import com.github.davidmoten.rx2.Consumers;
14  
15  import io.reactivex.Emitter;
16  import io.reactivex.Flowable;
17  import io.reactivex.functions.Action;
18  import io.reactivex.functions.Consumer;
19  import io.reactivex.functions.Function;
20  import io.reactivex.functions.Predicate;
21  
22  public final class FlowableServerSocket {
23  
24      private FlowableServerSocket() {
25          // prevent instantiation
26      }
27  
28      public static Flowable<Flowable<byte[]>> create(final Callable<? extends ServerSocket> serverSocketFactory,
29              final int timeoutMs, final int bufferSize, Action preAcceptAction, int acceptTimeoutMs,
30              Predicate<? super Socket> acceptSocket) {
31          Function<ServerSocket, Flowable<Flowable<byte[]>>> FlowableFactory = createFlowableFactory(timeoutMs,
32                  bufferSize, preAcceptAction, acceptSocket);
33          return Flowable.<Flowable<byte[]>, ServerSocket>using( //
34                  createServerSocketFactory(serverSocketFactory, acceptTimeoutMs), //
35                  FlowableFactory, //
36                  new Consumer<ServerSocket>() {
37                      // Note that in java 1.6, ServerSocket does not implement
38                      // Closeable
39                      @Override
40                      public void accept(ServerSocket ss) throws Exception {
41                          ss.close();
42                      }
43                  }, //
44                  true);
45      }
46  
47      private static Callable<ServerSocket> createServerSocketFactory(
48              final Callable<? extends ServerSocket> serverSocketFactory, final int acceptTimeoutMs) {
49          return new Callable<ServerSocket>() {
50              @Override
51              public ServerSocket call() throws Exception {
52                  return createServerSocket(serverSocketFactory, acceptTimeoutMs);
53              }
54          };
55      }
56  
57      private static ServerSocket createServerSocket(Callable<? extends ServerSocket> serverSocketCreator, long timeoutMs)
58              throws Exception {
59          ServerSocket s = serverSocketCreator.call();
60          s.setSoTimeout((int) timeoutMs);
61          return s;
62      }
63  
64      private static Function<ServerSocket, Flowable<Flowable<byte[]>>> createFlowableFactory(final int timeoutMs,
65              final int bufferSize, final Action preAcceptAction, final Predicate<? super Socket> acceptSocket) {
66          return new Function<ServerSocket, Flowable<Flowable<byte[]>>>() {
67              @Override
68              public Flowable<Flowable<byte[]>> apply(ServerSocket serverSocket) {
69                  return createServerSocketFlowable(serverSocket, timeoutMs, bufferSize, preAcceptAction, acceptSocket);
70              }
71          };
72      }
73  
74      private static Flowable<Flowable<byte[]>> createServerSocketFlowable(final ServerSocket serverSocket,
75              final long timeoutMs, final int bufferSize, final Action preAcceptAction,
76              final Predicate<? super Socket> acceptSocket) {
77          return Flowable.generate( //
78                  new Consumer<Emitter<Flowable<byte[]>>>() {
79                      @Override
80                      public void accept(Emitter<Flowable<byte[]>> emitter) throws Exception {
81                          acceptConnection(timeoutMs, bufferSize, serverSocket, emitter, preAcceptAction, acceptSocket);
82                      }
83                  });
84      }
85  
86      private static void acceptConnection(long timeoutMs, int bufferSize, ServerSocket ss,
87              Emitter<Flowable<byte[]>> emitter, Action preAcceptAction, Predicate<? super Socket> acceptSocket) {
88          Socket socket;
89          while (true) {
90              try {
91                  preAcceptAction.run();
92                  socket = ss.accept();
93                  if (!acceptSocket.test(socket)) {
94                      closeQuietly(socket);
95                  } else {
96                      emitter.onNext(createSocketFlowable(socket, timeoutMs, bufferSize));
97                      break;
98                  }
99              } catch (SocketTimeoutException e) {
100                 // timed out so will loop around again
101             } catch (Throwable e) {
102                 // if the server socket has been closed then this is most likely
103                 // an unsubscribe so we don't try to report an error which would
104                 // just end up in RxJavaPlugins.onError as a stack trace in the
105                 // console.
106                 if (e instanceof SocketException && ("Socket closed".equals(e.getMessage())
107                         || "Socket operation on nonsocket: configureBlocking".equals(e.getMessage()))) {
108                     break;
109                 } else {
110                     // unknown problem
111                     emitter.onError(e);
112                     break;
113                 }
114             }
115         }
116     }
117 
118     @VisibleForTesting
119     static void closeQuietly(Socket socket) {
120         try {
121             socket.close();
122         } catch (IOException e) {
123             // ignore exception
124         }
125     }
126 
127     private static Flowable<byte[]> createSocketFlowable(final Socket socket, long timeoutMs, final int bufferSize) {
128         setTimeout(socket, timeoutMs);
129         return Flowable.using( //
130                 new Callable<InputStream>() {
131                     @Override
132                     public InputStream call() throws Exception {
133                         return socket.getInputStream();
134                     }
135                 }, //
136                 new Function<InputStream, Flowable<byte[]>>() {
137                     @Override
138                     public Flowable<byte[]> apply(InputStream is) {
139                         return Bytes.from(is, bufferSize);
140                     }
141                 }, //
142                 Consumers.close(), //
143                 true);
144     }
145 
146     private static void setTimeout(Socket socket, long timeoutMs) {
147         try {
148             socket.setSoTimeout((int) timeoutMs);
149         } catch (SocketException e) {
150             throw new RuntimeException(e);
151         }
152     }
153 
154 }