View Javadoc
1   package com.github.davidmoten.rx;
2   
3   import java.io.IOException;
4   import java.net.ServerSocket;
5   import java.net.Socket;
6   
7   import com.github.davidmoten.rx.exceptions.IORuntimeException;
8   import com.github.davidmoten.rx.internal.operators.ObservableServerSocket;
9   
10  import rx.Observable;
11  import rx.functions.Action0;
12  import rx.functions.Action1;
13  import rx.functions.Func0;
14  import rx.functions.Func1;
15  
16  public final class IO {
17  
18  	private IO() {
19  		// prevent instantiation
20  	}
21  
22  	public static ServerSocketBuilder serverSocket(final int port) {
23  		return new ServerSocketBuilder(new Func0<ServerSocket>() {
24  
25  			@Override
26  			public ServerSocket call() {
27  				try {
28  					return new ServerSocket(port);
29  				} catch (IOException e) {
30  					throw new IORuntimeException(e);
31  				}
32  			}
33  		});
34  	}
35  
36  	public static ServerSocketBuilder serverSocketAutoAllocatePort(final Action1<Integer> onAllocated) {
37  		return serverSocket(new Func0<ServerSocket>() {
38  
39  			@Override
40  			public ServerSocket call() {
41  				try {
42  					ServerSocket ss = new ServerSocket(0);
43  					onAllocated.call(ss.getLocalPort());
44  					return ss;
45  				} catch (IOException e) {
46  					throw new IORuntimeException(e);
47  				}
48  			}
49  		});
50  	}
51  
52  	public static ServerSocketBuilder serverSocket(Func0<? extends ServerSocket> serverSocketFactory) {
53  		return new ServerSocketBuilder(serverSocketFactory);
54  	}
55  
56  	public static final class ServerSocketBuilder {
57  
58  		private final Func0<? extends ServerSocket> serverSocketFactory;
59  		private int readTimeoutMs = Integer.MAX_VALUE;
60  		private int bufferSize = 8192;
61  		private Action0 preAcceptAction = Actions.doNothing0();
62  		private int acceptTimeoutMs = Integer.MAX_VALUE;
63  		private Func1<? super Socket, Boolean> acceptSocket = Functions.alwaysTrue();
64  
65  		public ServerSocketBuilder(final Func0<? extends ServerSocket> serverSocketFactory) {
66  			this.serverSocketFactory = serverSocketFactory;
67  		}
68  
69  		public ServerSocketBuilder readTimeoutMs(int readTimeoutMs) {
70  			this.readTimeoutMs = readTimeoutMs;
71  			return this;
72  		}
73  
74  		public ServerSocketBuilder bufferSize(int bufferSize) {
75  			this.bufferSize = bufferSize;
76  			return this;
77  		}
78  
79  		public ServerSocketBuilder preAcceptAction(Action0 action) {
80  			this.preAcceptAction = action;
81  			return this;
82  		}
83  
84  		public ServerSocketBuilder acceptTimeoutMs(int acceptTimeoutMs) {
85  			this.acceptTimeoutMs = acceptTimeoutMs;
86  			return this;
87  		}
88  		
89  		public ServerSocketBuilder acceptSocketIf(Func1<? super Socket, Boolean> acceptSocket) {
90  		    this.acceptSocket = acceptSocket;
91  		    return this;
92  		}
93  
94  		public Observable<Observable<byte[]>> create() {
95  			return ObservableServerSocket.create(serverSocketFactory, readTimeoutMs, bufferSize, preAcceptAction,
96  					acceptTimeoutMs, acceptSocket);
97  		}
98  
99  	}
100 
101 }