1 package com.github.davidmoten.rx;
2
3 import java.io.IOException;
4 import java.net.ServerSocket;
5 import java.net.Socket;
6
7 import com.github.davidmoten.rx.exceptions.IORuntimeException;
8 import com.github.davidmoten.rx.internal.operators.ObservableServerSocket;
9
10 import rx.Observable;
11 import rx.functions.Action0;
12 import rx.functions.Action1;
13 import rx.functions.Func0;
14 import rx.functions.Func1;
15
16 public final class IO {
17
18 private IO() {
19
20 }
21
22 public static ServerSocketBuilder serverSocket(final int port) {
23 return new ServerSocketBuilder(new Func0<ServerSocket>() {
24
25 @Override
26 public ServerSocket call() {
27 try {
28 return new ServerSocket(port);
29 } catch (IOException e) {
30 throw new IORuntimeException(e);
31 }
32 }
33 });
34 }
35
36 public static ServerSocketBuilder serverSocketAutoAllocatePort(final Action1<Integer> onAllocated) {
37 return serverSocket(new Func0<ServerSocket>() {
38
39 @Override
40 public ServerSocket call() {
41 try {
42 ServerSocket ss = new ServerSocket(0);
43 onAllocated.call(ss.getLocalPort());
44 return ss;
45 } catch (IOException e) {
46 throw new IORuntimeException(e);
47 }
48 }
49 });
50 }
51
52 public static ServerSocketBuilder serverSocket(Func0<? extends ServerSocket> serverSocketFactory) {
53 return new ServerSocketBuilder(serverSocketFactory);
54 }
55
56 public static final class ServerSocketBuilder {
57
58 private final Func0<? extends ServerSocket> serverSocketFactory;
59 private int readTimeoutMs = Integer.MAX_VALUE;
60 private int bufferSize = 8192;
61 private Action0 preAcceptAction = Actions.doNothing0();
62 private int acceptTimeoutMs = Integer.MAX_VALUE;
63 private Func1<? super Socket, Boolean> acceptSocket = Functions.alwaysTrue();
64
65 public ServerSocketBuilder(final Func0<? extends ServerSocket> serverSocketFactory) {
66 this.serverSocketFactory = serverSocketFactory;
67 }
68
69 public ServerSocketBuilder readTimeoutMs(int readTimeoutMs) {
70 this.readTimeoutMs = readTimeoutMs;
71 return this;
72 }
73
74 public ServerSocketBuilder bufferSize(int bufferSize) {
75 this.bufferSize = bufferSize;
76 return this;
77 }
78
79 public ServerSocketBuilder preAcceptAction(Action0 action) {
80 this.preAcceptAction = action;
81 return this;
82 }
83
84 public ServerSocketBuilder acceptTimeoutMs(int acceptTimeoutMs) {
85 this.acceptTimeoutMs = acceptTimeoutMs;
86 return this;
87 }
88
89 public ServerSocketBuilder acceptSocketIf(Func1<? super Socket, Boolean> acceptSocket) {
90 this.acceptSocket = acceptSocket;
91 return this;
92 }
93
94 public Observable<Observable<byte[]>> create() {
95 return ObservableServerSocket.create(serverSocketFactory, readTimeoutMs, bufferSize, preAcceptAction,
96 acceptTimeoutMs, acceptSocket);
97 }
98
99 }
100
101 }