1 package com.github.davidmoten.rx.internal.operators; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 import java.util.Arrays; 6 7 import rx.Observer; 8 import rx.observables.SyncOnSubscribe; 9 10 public final class OnSubscribeInputStream extends SyncOnSubscribe<InputStream,byte[]> { 11 12 private final InputStream is; 13 private final int size; 14 15 public OnSubscribeInputStream(InputStream is, int size) { 16 this.is = is; 17 this.size = size; 18 } 19 20 @Override 21 protected InputStream generateState() { 22 return is; 23 } 24 25 @Override 26 protected InputStream next(InputStream is, Observer<? super byte[]> observer) { 27 byte[] buffer = new byte[size]; 28 try { 29 int count = is.read(buffer); 30 if (count == -1) { 31 observer.onCompleted(); 32 } 33 else if (count < size) { 34 observer.onNext(Arrays.copyOf(buffer, count)); 35 } 36 else { 37 observer.onNext(buffer); 38 } 39 } catch (IOException e) { 40 observer.onError(e); 41 } 42 return is; 43 } 44 }