1 package com.github.davidmoten.rx;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.util.Arrays;
7 import java.util.List;
8 import java.util.Map;
9
10 import com.github.davidmoten.util.Optional;
11
12 import rx.Observable;
13 import rx.Observable.OnSubscribe;
14 import rx.Subscriber;
15 import rx.functions.Action1;
16 import rx.functions.Func0;
17 import rx.functions.Func1;
18 import rx.schedulers.Schedulers;
19
20 public final class Processes {
21
22 public static void main(String[] args) throws IOException, InterruptedException {
23 execute("ls").map(new Func1<byte[], String>() {
24
25 @Override
26 public String call(byte[] bytes) {
27 return new String(bytes);
28 }
29 });
30 }
31
32 public static Observable<byte[]> execute(String... command) {
33 return execute(
34 new Parameters(Arrays.asList(command), Optional.<Map<String, String>> absent(),
35 true, new File("."), Optional.<Long> absent()));
36 }
37
38 public static Observable<byte[]> execute(final Parameters parameters) {
39 Func0<Process> resourceFactory = new Func0<Process>() {
40
41 @Override
42 public Process call() {
43 ProcessBuilder b = new ProcessBuilder(parameters.command());
44 if (parameters.env().isPresent()) {
45 if (parameters.appendEnv())
46 b.environment().clear();
47 b.environment().putAll(parameters.env().get());
48 }
49 b.directory(parameters.directory());
50 b.redirectErrorStream(true);
51 try {
52 return b.start();
53 } catch (IOException e) {
54 throw new RuntimeException(e);
55 }
56 }
57 };
58
59 Func1<Process, Observable<byte[]>> factory = new Func1<Process, Observable<byte[]>>() {
60 @Override
61 public Observable<byte[]> call(final Process process) {
62 InputStream is = process.getInputStream();
63 Observable<byte[]> output;
64 if (is != null)
65 output = Bytes.from(is);
66 else
67 output = Observable.empty();
68 Observable<byte[]> completion = Observable.create(new OnSubscribe<byte[]>() {
69 @Override
70 public void call(Subscriber<? super byte[]> sub) {
71 try {
72
73
74
75 if (parameters.waitForMs().isPresent()) {
76
77
78
79
80
81
82
83
84 sub.onError(new IllegalArgumentException("not implemented yet"));
85 } else {
86 int exitCode = process.waitFor();
87 if (exitCode != 0)
88 sub.onError(new ProcessException(exitCode));
89 return;
90 }
91 sub.onCompleted();
92 } catch (InterruptedException e) {
93 sub.onError(e);
94 }
95 }
96 }).subscribeOn(Schedulers.io());
97 return output.concatWith(completion);
98 }
99 };
100 Action1<? super Process> disposeAction = new Action1<Process>() {
101 @Override
102 public void call(Process process) {
103 process.destroy();
104 }
105 };
106 return Observable.using(resourceFactory, factory, disposeAction);
107 }
108
109 public static class ProcessException extends RuntimeException {
110 private static final long serialVersionUID = 722422557667123473L;
111
112 private final int exitCode;
113
114 public ProcessException(int exitCode) {
115 super("process returned exitCode " + exitCode);
116 this.exitCode = exitCode;
117 }
118
119 public int exitCode() {
120 return exitCode;
121 }
122
123 }
124
125 public static final class Parameters {
126 private final List<String> command;
127 private final Optional<Map<String, String>> env;
128 private final boolean appendEnv;
129 private final File directory;
130 private final Optional<Long> waitForMs;
131
132 public Parameters(List<String> command, Optional<Map<String, String>> env,
133 boolean appendEnv, File directory, Optional<Long> waitForMs) {
134 this.command = command;
135 this.env = env;
136 this.appendEnv = appendEnv;
137 this.directory = directory;
138 this.waitForMs = waitForMs;
139 }
140
141 public Optional<Long> waitForMs() {
142 return waitForMs;
143 }
144
145 public File directory() {
146 return directory;
147 }
148
149 public List<String> command() {
150 return command;
151 }
152
153 public Optional<Map<String, String>> env() {
154 return env;
155 }
156
157 public boolean appendEnv() {
158 return appendEnv;
159 }
160
161 }
162 }