1 package com.github.davidmoten.rx.internal.operators; 2 3 import java.io.IOException; 4 import java.io.Reader; 5 6 import rx.Observer; 7 import rx.observables.SyncOnSubscribe; 8 9 public final class OnSubscribeReader extends SyncOnSubscribe<Reader,String> { 10 11 private final Reader reader; 12 private final int size; 13 14 public OnSubscribeReader(Reader reader, int size) { 15 this.reader = reader; 16 this.size = size; 17 } 18 19 @Override 20 protected Reader generateState() { 21 return reader; 22 } 23 24 @Override 25 protected Reader next(Reader reader, Observer<? super String> observer) { 26 char[] buffer = new char[size]; 27 try { 28 int count = reader.read(buffer); 29 if (count == -1) 30 observer.onCompleted(); 31 else 32 observer.onNext(String.valueOf(buffer, 0, count)); 33 } catch (IOException e) { 34 observer.onError(e); 35 } 36 return reader; 37 } 38 }