1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Arrays;
6
7 import rx.Observer;
8 import rx.observables.SyncOnSubscribe;
9
10 public final class OnSubscribeInputStream extends SyncOnSubscribe<InputStream,byte[]> {
11
12 private final InputStream is;
13 private final int size;
14
15 public OnSubscribeInputStream(InputStream is, int size) {
16 this.is = is;
17 this.size = size;
18 }
19
20 @Override
21 protected InputStream generateState() {
22 return is;
23 }
24
25 @Override
26 protected InputStream next(InputStream is, Observer<? super byte[]> observer) {
27 byte[] buffer = new byte[size];
28 try {
29 int count = is.read(buffer);
30 if (count == -1) {
31 observer.onCompleted();
32 }
33 else if (count < size) {
34 observer.onNext(Arrays.copyOf(buffer, count));
35 }
36 else {
37 observer.onNext(buffer);
38 }
39 } catch (IOException e) {
40 observer.onError(e);
41 }
42 return is;
43 }
44 }