1 package com.github.davidmoten.rx2;
2
3 import java.io.IOException;
4 import java.net.ServerSocket;
5 import java.net.Socket;
6 import java.util.concurrent.Callable;
7
8 import com.github.davidmoten.rx2.internal.flowable.FlowableServerSocket;
9
10 import io.reactivex.Flowable;
11 import io.reactivex.functions.Action;
12 import io.reactivex.functions.Consumer;
13 import io.reactivex.functions.Predicate;
14 import io.reactivex.internal.functions.Functions;
15
16 public final class IO {
17
18 private IO() {
19
20 }
21
22 public static ServerSocketBuilder serverSocket(final int port) {
23 return new ServerSocketBuilder(new Callable<ServerSocket>() {
24 @Override
25 public ServerSocket call() throws IOException {
26 return new ServerSocket(port);
27 }
28 });
29 }
30
31 public static ServerSocketBuilder serverSocketAutoAllocatePort(final Consumer<Integer> onAllocated) {
32 return serverSocket(new Callable<ServerSocket>() {
33
34 @Override
35 public ServerSocket call() throws Exception {
36 ServerSocket ss = new ServerSocket(0);
37 onAllocated.accept(ss.getLocalPort());
38 return ss;
39 }
40 });
41 }
42
43 public static ServerSocketBuilder serverSocket(Callable<? extends ServerSocket> serverSocketFactory) {
44 return new ServerSocketBuilder(serverSocketFactory);
45 }
46
47 public static final class ServerSocketBuilder {
48
49 private final Callable<? extends ServerSocket> serverSocketFactory;
50 private int readTimeoutMs = Integer.MAX_VALUE;
51 private int bufferSize = 8192;
52 private Action preAcceptAction = Actions.doNothing();
53 private int acceptTimeoutMs = Integer.MAX_VALUE;
54 private Predicate<? super Socket> acceptSocket = Functions.<Socket>alwaysTrue();
55
56 public ServerSocketBuilder(final Callable<? extends ServerSocket> serverSocketFactory) {
57 this.serverSocketFactory = serverSocketFactory;
58 }
59
60 public ServerSocketBuilder readTimeoutMs(int readTimeoutMs) {
61 this.readTimeoutMs = readTimeoutMs;
62 return this;
63 }
64
65 public ServerSocketBuilder bufferSize(int bufferSize) {
66 this.bufferSize = bufferSize;
67 return this;
68 }
69
70 public ServerSocketBuilder preAcceptAction(Action action) {
71 this.preAcceptAction = action;
72 return this;
73 }
74
75 public ServerSocketBuilder acceptTimeoutMs(int acceptTimeoutMs) {
76 this.acceptTimeoutMs = acceptTimeoutMs;
77 return this;
78 }
79
80 public ServerSocketBuilder acceptSocketIf(Predicate<? super Socket> acceptSocket) {
81 this.acceptSocket = acceptSocket;
82 return this;
83 }
84
85 public Flowable<Flowable<byte[]>> create() {
86 return FlowableServerSocket.create(serverSocketFactory, readTimeoutMs, bufferSize, preAcceptAction,
87 acceptTimeoutMs, acceptSocket);
88 }
89
90 }
91
92 }