View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.net.ServerSocket;
6   import java.net.Socket;
7   import java.net.SocketException;
8   import java.net.SocketTimeoutException;
9   
10  import com.github.davidmoten.rx.Actions;
11  import com.github.davidmoten.rx.Bytes;
12  import com.github.davidmoten.rx.Checked;
13  import com.github.davidmoten.rx.Checked.F0;
14  import com.github.davidmoten.rx.Functions;
15  
16  import rx.Observable;
17  import rx.Observer;
18  import rx.functions.Action0;
19  import rx.functions.Action1;
20  import rx.functions.Action2;
21  import rx.functions.Func0;
22  import rx.functions.Func1;
23  import rx.observables.SyncOnSubscribe;
24  
25  public final class ObservableServerSocket {
26  
27      private ObservableServerSocket() {
28          // prevent instantiation
29      }
30  
31      public static Observable<Observable<byte[]>> create(
32              final Func0<? extends ServerSocket> serverSocketFactory, final int timeoutMs,
33              final int bufferSize, Action0 preAcceptAction, int acceptTimeoutMs,
34              Func1<? super Socket, Boolean> acceptSocket) {
35          Func1<ServerSocket, Observable<Observable<byte[]>>> observableFactory = createObservableFactory(
36                  timeoutMs, bufferSize, preAcceptAction, acceptSocket);
37          return Observable.<Observable<byte[]>, ServerSocket> using( //
38                  createServerSocketFactory(serverSocketFactory, acceptTimeoutMs), //
39                  observableFactory, //
40                  new Action1<ServerSocket>() {
41  
42                      @Override
43                      public void call(ServerSocket ss) {
44                          try {
45                              ss.close();
46                          } catch (IOException e) {
47                              throw new RuntimeException(e);
48                          }
49                      }
50  
51                  }, true);
52      }
53  
54      private static Func0<ServerSocket> createServerSocketFactory(
55              final Func0<? extends ServerSocket> serverSocketFactory, final int acceptTimeoutMs) {
56          return Checked.f0(new F0<ServerSocket>() {
57              @Override
58              public ServerSocket call() throws Exception {
59                  return createServerSocket(serverSocketFactory, acceptTimeoutMs);
60              }
61          });
62      }
63  
64      private static ServerSocket createServerSocket(
65              Func0<? extends ServerSocket> serverSocketCreator, long timeoutMs) throws IOException {
66          ServerSocket s = serverSocketCreator.call();
67          s.setSoTimeout((int) timeoutMs);
68          return s;
69      }
70  
71      private static Func1<ServerSocket, Observable<Observable<byte[]>>> createObservableFactory(
72              final int timeoutMs, final int bufferSize, final Action0 preAcceptAction,
73              final Func1<? super Socket, Boolean> acceptSocket) {
74          return new Func1<ServerSocket, Observable<Observable<byte[]>>>() {
75              @Override
76              public Observable<Observable<byte[]>> call(ServerSocket serverSocket) {
77                  return createServerSocketObservable(serverSocket, timeoutMs, bufferSize,
78                          preAcceptAction, acceptSocket);
79              }
80          };
81      }
82  
83      private static Observable<Observable<byte[]>> createServerSocketObservable(
84              ServerSocket serverSocket, final long timeoutMs, final int bufferSize,
85              final Action0 preAcceptAction, final Func1<? super Socket, Boolean> acceptSocket) {
86          return Observable.create( //
87                  SyncOnSubscribe.<ServerSocket, Observable<byte[]>> createSingleState( //
88                          Functions.constant0(serverSocket), //
89                          new Action2<ServerSocket, Observer<? super Observable<byte[]>>>() {
90  
91                              @Override
92                              public void call(ServerSocket ss,
93                                      Observer<? super Observable<byte[]>> observer) {
94                                  acceptConnection(timeoutMs, bufferSize, ss, observer,
95                                          preAcceptAction, acceptSocket);
96                              }
97                          }));
98      }
99  
100     private static void acceptConnection(long timeoutMs, int bufferSize, ServerSocket ss,
101             Observer<? super Observable<byte[]>> observer, Action0 preAcceptAction,
102             Func1<? super Socket, Boolean> acceptSocket) {
103         Socket socket;
104         while (true) {
105             try {
106                 preAcceptAction.call();
107                 socket = ss.accept();
108                 if (!acceptSocket.call(socket)) {
109                     closeQuietly(socket);
110                 } else {
111                     observer.onNext(createSocketObservable(socket, timeoutMs, bufferSize));
112                     break;
113                 }
114             } catch (SocketTimeoutException e) {
115                 // timed out so will loop around again
116             } catch (IOException e) {
117                 // unknown problem
118                 observer.onError(e);
119                 break;
120             }
121         }
122     }
123 
124     private static void closeQuietly(Socket socket) {
125         try {
126             socket.close();
127         } catch (IOException e) {
128             // ignore exception
129         }
130     }
131 
132     private static Observable<byte[]> createSocketObservable(final Socket socket, long timeoutMs,
133             final int bufferSize) {
134         setTimeout(socket, timeoutMs);
135         return Observable.using( //
136                 Checked.f0(new F0<InputStream>() {
137                     @Override
138                     public InputStream call() throws Exception {
139                         return socket.getInputStream();
140                     }
141                 }), //
142                 new Func1<InputStream, Observable<byte[]>>() {
143                     @Override
144                     public Observable<byte[]> call(InputStream is) {
145                         return Bytes.from(is, bufferSize);
146                     }
147                 }, //
148                 Actions.close(), //
149                 true);
150     }
151 
152     private static void setTimeout(Socket socket, long timeoutMs) {
153         try {
154             socket.setSoTimeout((int) timeoutMs);
155         } catch (SocketException e) {
156             throw new RuntimeException(e);
157         }
158     }
159 
160 }