View Javadoc
1   package com.github.davidmoten.rx;
2   
3   import java.io.BufferedInputStream;
4   import java.io.ByteArrayOutputStream;
5   import java.io.File;
6   import java.io.FileInputStream;
7   import java.io.FileNotFoundException;
8   import java.io.IOException;
9   import java.io.InputStream;
10  import java.util.zip.ZipEntry;
11  import java.util.zip.ZipInputStream;
12  
13  import com.github.davidmoten.rx.internal.operators.OnSubscribeInputStream;
14  import com.github.davidmoten.rx.util.ZippedEntry;
15  
16  import rx.Observable;
17  import rx.Observable.Transformer;
18  import rx.Observer;
19  import rx.functions.Action1;
20  import rx.functions.Action2;
21  import rx.functions.Func0;
22  import rx.functions.Func1;
23  import rx.observables.SyncOnSubscribe;
24  
25  public final class Bytes {
26  
27      private Bytes() {
28          // prevent instantiation
29      }
30  
31      /**
32       * Returns an Observable stream of byte arrays from the given
33       * {@link InputStream} between 1 and {@code size} bytes.
34       * 
35       * @param is
36       *            input stream of bytes
37       * @param size
38       *            max emitted byte array size
39       * @return a stream of byte arrays
40       */
41      public static Observable<byte[]> from(InputStream is, int size) {
42          return Observable.create(new OnSubscribeInputStream(is, size));
43      }
44  
45      public static Observable<byte[]> from(File file) {
46          return from(file, 8192);
47      }
48  
49      public static Observable<byte[]> from(final File file, final int size) {
50          Func0<InputStream> resourceFactory = new Func0<InputStream>() {
51  
52              @Override
53              public InputStream call() {
54                  try {
55                      return new BufferedInputStream(new FileInputStream(file), size);
56                  } catch (FileNotFoundException e) {
57                      throw new RuntimeException(e);
58                  }
59              }
60          };
61          Func1<InputStream, Observable<byte[]>> observableFactory = new Func1<InputStream, Observable<byte[]>>() {
62  
63              @Override
64              public Observable<byte[]> call(InputStream is) {
65                  return from(is, size);
66              }
67          };
68          return Observable.using(resourceFactory, observableFactory, InputStreamCloseHolder.INSTANCE, true);
69      }
70      
71      private static class InputStreamCloseHolder {
72          private static final Action1<InputStream> INSTANCE = new Action1<InputStream>() {
73  
74              @Override
75              public void call(InputStream is) {
76                  try {
77                      is.close();
78                  } catch (IOException e) {
79                      throw new RuntimeException(e);
80                  }
81              }
82          };
83      }
84  
85      /**
86       * Returns an Observable stream of byte arrays from the given
87       * {@link InputStream} of {@code 8192} bytes. The final byte array may be
88       * less than {@code 8192} bytes.
89       * 
90       * @param is
91       *            input stream of bytes
92       * @return a stream of byte arrays
93       */
94      public static Observable<byte[]> from(InputStream is) {
95          return from(is, 8192);
96      }
97  
98      public static Observable<ZippedEntry> unzip(final File file) {
99          Func0<ZipInputStream> resourceFactory = new Func0<ZipInputStream>() {
100             @Override
101             public ZipInputStream call() {
102                 try {
103                     return new ZipInputStream(new FileInputStream(file));
104                 } catch (FileNotFoundException e) {
105                     throw new RuntimeException(e);
106                 }
107             }
108         };
109         Func1<ZipInputStream, Observable<ZippedEntry>> observableFactory = ZipHolder.OBSERVABLE_FACTORY;
110         Action1<ZipInputStream> disposeAction = ZipHolder.DISPOSER;
111         return Observable.using(resourceFactory, observableFactory, disposeAction);
112     }
113 
114     private static final class ZipHolder {
115         static final Action1<ZipInputStream> DISPOSER = new Action1<ZipInputStream>() {
116 
117             @Override
118             public void call(ZipInputStream zis) {
119                 try {
120                     zis.close();
121                 } catch (IOException e) {
122                     throw new RuntimeException(e);
123                 }
124             }
125         };
126         final static Func1<ZipInputStream, Observable<ZippedEntry>> OBSERVABLE_FACTORY = new Func1<ZipInputStream, Observable<ZippedEntry>>() {
127             @Override
128             public Observable<ZippedEntry> call(ZipInputStream zis) {
129                 return unzip(zis);
130             }
131         };
132     }
133 
134     public static Observable<ZippedEntry> unzip(final InputStream is) {
135         return unzip(new ZipInputStream(is));
136     }
137 
138     public static Observable<ZippedEntry> unzip(final ZipInputStream zis) {
139         return Observable.create(new SyncOnSubscribe<ZipInputStream, ZippedEntry>() {
140 
141             @Override
142             protected ZipInputStream generateState() {
143                 return zis;
144             }
145 
146             @Override
147             protected ZipInputStream next(ZipInputStream zis, Observer<? super ZippedEntry> observer) {
148                 try {
149                     ZipEntry zipEntry = zis.getNextEntry();
150                     if (zipEntry != null) {
151                         observer.onNext(new ZippedEntry(zipEntry, zis));
152                     } else {
153                         zis.close();
154                         observer.onCompleted();
155                     }
156                 } catch (IOException e) {
157                     observer.onError(e);
158                 }
159                 return zis;
160             }
161         });
162     }
163 
164     public static Transformer<byte[], byte[]> collect() {
165         return new Transformer<byte[], byte[]>() {
166 
167             @Override
168             public Observable<byte[]> call(Observable<byte[]> source) {
169                 return source.collect(BosCreatorHolder.INSTANCE, BosCollectorHolder.INSTANCE)
170                         .map(BosToArrayHolder.INSTANCE);
171             }
172         };
173     }
174 
175     private static final class BosCreatorHolder {
176 
177         static final Func0<ByteArrayOutputStream> INSTANCE = new Func0<ByteArrayOutputStream>() {
178 
179             @Override
180             public ByteArrayOutputStream call() {
181                 return new ByteArrayOutputStream();
182             }
183         };
184     }
185 
186     private static final class BosCollectorHolder {
187 
188         static final Action2<ByteArrayOutputStream, byte[]> INSTANCE = new Action2<ByteArrayOutputStream, byte[]>() {
189 
190             @Override
191             public void call(ByteArrayOutputStream bos, byte[] bytes) {
192                 try {
193                     bos.write(bytes);
194                 } catch (IOException e) {
195                     throw new RuntimeException(e);
196                 }
197             }
198         };
199     }
200 
201     private static final class BosToArrayHolder {
202         static final Func1<ByteArrayOutputStream, byte[]> INSTANCE = new Func1<ByteArrayOutputStream, byte[]>() {
203             @Override
204             public byte[] call(ByteArrayOutputStream bos) {
205                 return bos.toByteArray();
206             }
207         };
208     }
209 
210 }