View Javadoc
1   package com.github.davidmoten.rx2;
2   
3   import java.io.IOException;
4   import java.net.ServerSocket;
5   import java.net.Socket;
6   import java.util.concurrent.Callable;
7   
8   import com.github.davidmoten.rx2.internal.flowable.FlowableServerSocket;
9   
10  import io.reactivex.Flowable;
11  import io.reactivex.functions.Action;
12  import io.reactivex.functions.Consumer;
13  import io.reactivex.functions.Predicate;
14  import io.reactivex.internal.functions.Functions;
15  
16  public final class IO {
17  
18      private IO() {
19          // prevent instantiation
20      }
21  
22      public static ServerSocketBuilder serverSocket(final int port) {
23          return new ServerSocketBuilder(new Callable<ServerSocket>() {
24              @Override
25              public ServerSocket call() throws IOException {
26                  return new ServerSocket(port);
27              }
28          });
29      }
30  
31      public static ServerSocketBuilder serverSocketAutoAllocatePort(final Consumer<Integer> onAllocated) {
32          return serverSocket(new Callable<ServerSocket>() {
33  
34              @Override
35              public ServerSocket call() throws Exception {
36                  ServerSocket ss = new ServerSocket(0);
37                  onAllocated.accept(ss.getLocalPort());
38                  return ss;
39              }
40          });
41      }
42  
43      public static ServerSocketBuilder serverSocket(Callable<? extends ServerSocket> serverSocketFactory) {
44          return new ServerSocketBuilder(serverSocketFactory);
45      }
46  
47      public static final class ServerSocketBuilder {
48  
49          private final Callable<? extends ServerSocket> serverSocketFactory;
50          private int readTimeoutMs = Integer.MAX_VALUE;
51          private int bufferSize = 8192;
52          private Action preAcceptAction = Actions.doNothing();
53          private int acceptTimeoutMs = Integer.MAX_VALUE;
54          private Predicate<? super Socket> acceptSocket = Functions.<Socket>alwaysTrue();
55  
56          public ServerSocketBuilder(final Callable<? extends ServerSocket> serverSocketFactory) {
57              this.serverSocketFactory = serverSocketFactory;
58          }
59  
60          public ServerSocketBuilder readTimeoutMs(int readTimeoutMs) {
61              this.readTimeoutMs = readTimeoutMs;
62              return this;
63          }
64  
65          public ServerSocketBuilder bufferSize(int bufferSize) {
66              this.bufferSize = bufferSize;
67              return this;
68          }
69  
70          public ServerSocketBuilder preAcceptAction(Action action) {
71              this.preAcceptAction = action;
72              return this;
73          }
74  
75          public ServerSocketBuilder acceptTimeoutMs(int acceptTimeoutMs) {
76              this.acceptTimeoutMs = acceptTimeoutMs;
77              return this;
78          }
79  
80          public ServerSocketBuilder acceptSocketIf(Predicate<? super Socket> acceptSocket) {
81              this.acceptSocket = acceptSocket;
82              return this;
83          }
84  
85          public Flowable<Flowable<byte[]>> create() {
86              return FlowableServerSocket.create(serverSocketFactory, readTimeoutMs, bufferSize, preAcceptAction,
87                      acceptTimeoutMs, acceptSocket);
88          }
89  
90      }
91  
92  }