View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.io.IOException;
4   import java.io.Reader;
5   
6   import rx.Observer;
7   import rx.observables.SyncOnSubscribe;
8   
9   public final class OnSubscribeReader extends SyncOnSubscribe<Reader,String> {
10  
11      private final Reader reader;
12      private final int size;
13  
14      public OnSubscribeReader(Reader reader, int size) {
15          this.reader = reader;
16          this.size = size;
17      }
18  
19      @Override
20      protected Reader generateState() {
21          return reader;
22      }
23  
24      @Override
25      protected Reader next(Reader reader, Observer<? super String> observer) {
26          char[] buffer = new char[size];
27          try {
28              int count = reader.read(buffer);
29              if (count == -1)
30                  observer.onCompleted();
31              else
32                  observer.onNext(String.valueOf(buffer, 0, count));
33          } catch (IOException e) {
34              observer.onError(e);
35          }
36          return reader;
37      }
38  }