View Javadoc
1   package com.github.davidmoten.rx;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.io.InputStream;
6   import java.util.Arrays;
7   import java.util.List;
8   import java.util.Map;
9   
10  import com.github.davidmoten.util.Optional;
11  
12  import rx.Observable;
13  import rx.Observable.OnSubscribe;
14  import rx.Subscriber;
15  import rx.functions.Action1;
16  import rx.functions.Func0;
17  import rx.functions.Func1;
18  import rx.schedulers.Schedulers;
19  
20  public final class Processes {
21  
22      public static void main(String[] args) throws IOException, InterruptedException {
23          execute("ls").map(new Func1<byte[], String>() {
24  
25              @Override
26              public String call(byte[] bytes) {
27                  return new String(bytes);
28              }
29          });
30      }
31  
32      public static Observable<byte[]> execute(String... command) {
33          return execute(
34                  new Parameters(Arrays.asList(command), Optional.<Map<String, String>> absent(),
35                          true, new File("."), Optional.<Long> absent()));
36      }
37  
38      public static Observable<byte[]> execute(final Parameters parameters) {
39          Func0<Process> resourceFactory = new Func0<Process>() {
40  
41              @Override
42              public Process call() {
43                  ProcessBuilder b = new ProcessBuilder(parameters.command());
44                  if (parameters.env().isPresent()) {
45                      if (parameters.appendEnv())
46                          b.environment().clear();
47                      b.environment().putAll(parameters.env().get());
48                  }
49                  b.directory(parameters.directory());
50                  b.redirectErrorStream(true);
51                  try {
52                      return b.start();
53                  } catch (IOException e) {
54                      throw new RuntimeException(e);
55                  }
56              }
57          };
58  
59          Func1<Process, Observable<byte[]>> factory = new Func1<Process, Observable<byte[]>>() {
60              @Override
61              public Observable<byte[]> call(final Process process) {
62                  InputStream is = process.getInputStream();
63                  Observable<byte[]> output;
64                  if (is != null)
65                      output = Bytes.from(is);
66                  else
67                      output = Observable.empty();
68                  Observable<byte[]> completion = Observable.create(new OnSubscribe<byte[]>() {
69                      @Override
70                      public void call(Subscriber<? super byte[]> sub) {
71                          try {
72                              // TODO waitFor does not exist pre 1.8 with timeout!
73                              // parameters.waitForMs().get(),TimeUnit.MILLISECONDS);
74  
75                              if (parameters.waitForMs().isPresent()) {
76                                  // boolean finished = process.waitFor(
77                                  // parameters.waitForMs().get(),
78                                  // TimeUnit.MILLISECONDS);
79                                  // if (!finished) {
80                                  // sub.onError(new TimeoutException("process
81                                  // timed out"));
82                                  // return;
83                                  // }
84                                  sub.onError(new IllegalArgumentException("not implemented yet"));
85                              } else {
86                                  int exitCode = process.waitFor();
87                                  if (exitCode != 0)
88                                      sub.onError(new ProcessException(exitCode));
89                                  return;
90                              }
91                              sub.onCompleted();
92                          } catch (InterruptedException e) {
93                              sub.onError(e);
94                          }
95                      }
96                  }).subscribeOn(Schedulers.io());
97                  return output.concatWith(completion);
98              }
99          };
100         Action1<? super Process> disposeAction = new Action1<Process>() {
101             @Override
102             public void call(Process process) {
103                 process.destroy();
104             }
105         };
106         return Observable.using(resourceFactory, factory, disposeAction);
107     }
108 
109     public static class ProcessException extends RuntimeException {
110         private static final long serialVersionUID = 722422557667123473L;
111 
112         private final int exitCode;
113 
114         public ProcessException(int exitCode) {
115             super("process returned exitCode " + exitCode);
116             this.exitCode = exitCode;
117         }
118 
119         public int exitCode() {
120             return exitCode;
121         }
122 
123     }
124 
125     public static final class Parameters {
126         private final List<String> command;
127         private final Optional<Map<String, String>> env;
128         private final boolean appendEnv;
129         private final File directory;
130         private final Optional<Long> waitForMs;
131 
132         public Parameters(List<String> command, Optional<Map<String, String>> env,
133                 boolean appendEnv, File directory, Optional<Long> waitForMs) {
134             this.command = command;
135             this.env = env;
136             this.appendEnv = appendEnv;
137             this.directory = directory;
138             this.waitForMs = waitForMs;
139         }
140 
141         public Optional<Long> waitForMs() {
142             return waitForMs;
143         }
144 
145         public File directory() {
146             return directory;
147         }
148 
149         public List<String> command() {
150             return command;
151         }
152 
153         public Optional<Map<String, String>> env() {
154             return env;
155         }
156 
157         public boolean appendEnv() {
158             return appendEnv;
159         }
160 
161     }
162 }