1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.io.IOException;
4 import java.io.Reader;
5
6 import rx.Observer;
7 import rx.observables.SyncOnSubscribe;
8
9 public final class OnSubscribeReader extends SyncOnSubscribe<Reader,String> {
10
11 private final Reader reader;
12 private final int size;
13
14 public OnSubscribeReader(Reader reader, int size) {
15 this.reader = reader;
16 this.size = size;
17 }
18
19 @Override
20 protected Reader generateState() {
21 return reader;
22 }
23
24 @Override
25 protected Reader next(Reader reader, Observer<? super String> observer) {
26 char[] buffer = new char[size];
27 try {
28 int count = reader.read(buffer);
29 if (count == -1)
30 observer.onCompleted();
31 else
32 observer.onNext(String.valueOf(buffer, 0, count));
33 } catch (IOException e) {
34 observer.onError(e);
35 }
36 return reader;
37 }
38 }