View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.util.Arrays;
6   
7   import rx.Observer;
8   import rx.observables.SyncOnSubscribe;
9   
10  public final class OnSubscribeInputStream extends SyncOnSubscribe<InputStream,byte[]> {
11  
12      private final InputStream is;
13      private final int size;
14  
15      public OnSubscribeInputStream(InputStream is, int size) {
16          this.is = is;
17          this.size = size;
18      }
19  
20      @Override
21      protected InputStream generateState() {
22          return is;
23      }
24  
25      @Override
26      protected InputStream next(InputStream is, Observer<? super byte[]> observer) {
27          byte[] buffer = new byte[size];
28          try {
29              int count = is.read(buffer);
30              if (count == -1) {
31                  observer.onCompleted();
32              }
33              else if (count < size) {
34                  observer.onNext(Arrays.copyOf(buffer, count));
35              }
36              else {
37                  observer.onNext(buffer);
38              }
39          } catch (IOException e) {
40              observer.onError(e);
41          }
42          return is;
43      }
44  }