View Javadoc
1   package com.github.davidmoten.rx2;
2   
3   import java.io.BufferedInputStream;
4   import java.io.ByteArrayOutputStream;
5   import java.io.File;
6   import java.io.FileInputStream;
7   import java.io.FileNotFoundException;
8   import java.io.IOException;
9   import java.io.InputStream;
10  import java.util.Arrays;
11  import java.util.concurrent.Callable;
12  import java.util.zip.ZipEntry;
13  import java.util.zip.ZipInputStream;
14  
15  import com.github.davidmoten.rx2.util.ZippedEntry;
16  
17  import io.reactivex.Emitter;
18  import io.reactivex.Flowable;
19  import io.reactivex.Single;
20  import io.reactivex.functions.BiConsumer;
21  import io.reactivex.functions.Consumer;
22  import io.reactivex.functions.Function;
23  
24  public final class Bytes {
25  
26      private static final int DEFAULT_BUFFER_SIZE = 8192;
27  
28      private Bytes() {
29          // prevent instantiation
30      }
31  
32      /**
33       * Returns a Flowable stream of byte arrays from the given
34       * {@link InputStream} between 1 and {@code bufferSize} bytes.
35       * 
36       * @param is
37       *            input stream of bytes
38       * @param bufferSize
39       *            max emitted byte array size
40       * @return a stream of byte arrays
41       */
42      public static Flowable<byte[]> from(final InputStream is, final int bufferSize) {
43          return Flowable.generate(new Consumer<Emitter<byte[]>>() {
44              @Override
45              public void accept(Emitter<byte[]> emitter) throws Exception {
46                  byte[] buffer = new byte[bufferSize];
47                  int count = is.read(buffer);
48                  if (count == -1) {
49                      emitter.onComplete();
50                  } else if (count < bufferSize) {
51                      emitter.onNext(Arrays.copyOf(buffer, count));
52                  } else {
53                      emitter.onNext(buffer);
54                  }
55              }
56          });
57      }
58  
59      public static Flowable<byte[]> from(File file) {
60          return from(file, 8192);
61      }
62  
63      public static Flowable<byte[]> from(final File file, final int size) {
64          Callable<InputStream> resourceFactory = new Callable<InputStream>() {
65  
66              @Override
67              public InputStream call() throws FileNotFoundException {
68                  return new BufferedInputStream(new FileInputStream(file), size);
69              }
70          };
71          Function<InputStream, Flowable<byte[]>> observableFactory = new Function<InputStream, Flowable<byte[]>>() {
72  
73              @Override
74              public Flowable<byte[]> apply(InputStream is) {
75                  return from(is, size);
76              }
77          };
78          return Flowable.using(resourceFactory, observableFactory, InputStreamCloseHolder.INSTANCE, true);
79      }
80  
81      private static final class InputStreamCloseHolder {
82          static final Consumer<InputStream> INSTANCE = new Consumer<InputStream>() {
83  
84              @Override
85              public void accept(InputStream is) throws IOException {
86                  is.close();
87              }
88          };
89      }
90  
91      /**
92       * Returns a Flowable stream of byte arrays from the given
93       * {@link InputStream} of {@code 8192} bytes. The final byte array may be
94       * less than {@code 8192} bytes.
95       * 
96       * @param is
97       *            input stream of bytes
98       * @return a stream of byte arrays
99       */
100     public static Flowable<byte[]> from(InputStream is) {
101         return from(is, DEFAULT_BUFFER_SIZE);
102     }
103 
104     public static Flowable<ZippedEntry> unzip(final File file) {
105         Callable<ZipInputStream> resourceFactory = new Callable<ZipInputStream>() {
106             @Override
107             public ZipInputStream call() throws FileNotFoundException {
108                 return new ZipInputStream(new FileInputStream(file));
109             }
110         };
111         Function<ZipInputStream, Flowable<ZippedEntry>> observableFactory = ZipHolder.OBSERVABLE_FACTORY;
112         Consumer<ZipInputStream> disposeAction = ZipHolder.DISPOSER;
113         return Flowable.using(resourceFactory, observableFactory, disposeAction);
114     }
115 
116     public static Flowable<ZippedEntry> unzip(final InputStream is) {
117         return unzip(new ZipInputStream(is));
118     }
119 
120     public static Flowable<ZippedEntry> unzip(final ZipInputStream zis) {
121 
122         return Flowable.generate(new Consumer<Emitter<ZippedEntry>>() {
123             @Override
124             public void accept(Emitter<ZippedEntry> emitter) throws IOException {
125                 ZipEntry zipEntry = zis.getNextEntry();
126                 if (zipEntry != null) {
127                     emitter.onNext(new ZippedEntry(zipEntry, zis));
128                 } else {
129                     // end of stream so eagerly close the stream (might not be a
130                     // good idea since this method did not create the zis
131                     zis.close();
132                     emitter.onComplete();
133                 }
134             }
135         });
136 
137     }
138 
139     public static Single<byte[]> collect(Flowable<byte[]> source) {
140         return source.collect(BosCreatorHolder.INSTANCE, BosCollectorHolder.INSTANCE).map(BosToArrayHolder.INSTANCE);
141     }
142 
143     public static Function<Flowable<byte[]>, Single<byte[]>> collect() {
144         return new Function<Flowable<byte[]>, Single<byte[]>>() {
145             @Override
146             public Single<byte[]> apply(Flowable<byte[]> source) throws Exception {
147                 return collect(source);
148             }
149         };
150     }
151 
152     private static final class BosCreatorHolder {
153         static final Callable<ByteArrayOutputStream> INSTANCE = new Callable<ByteArrayOutputStream>() {
154 
155             @Override
156             public ByteArrayOutputStream call() {
157                 return new ByteArrayOutputStream();
158             }
159         };
160     }
161 
162     private static final class BosCollectorHolder {
163         static final BiConsumer<ByteArrayOutputStream, byte[]> INSTANCE = new BiConsumer<ByteArrayOutputStream, byte[]>() {
164 
165             @Override
166             public void accept(ByteArrayOutputStream bos, byte[] bytes) throws IOException {
167                 bos.write(bytes);
168             }
169         };
170     }
171 
172     private static final class BosToArrayHolder {
173         static final Function<ByteArrayOutputStream, byte[]> INSTANCE = new Function<ByteArrayOutputStream, byte[]>() {
174             @Override
175             public byte[] apply(ByteArrayOutputStream bos) {
176                 return bos.toByteArray();
177             }
178         };
179     }
180 
181     private static final class ZipHolder {
182         static final Consumer<ZipInputStream> DISPOSER = new Consumer<ZipInputStream>() {
183 
184             @Override
185             public void accept(ZipInputStream zis) throws IOException {
186                 zis.close();
187             }
188         };
189         final static Function<ZipInputStream, Flowable<ZippedEntry>> OBSERVABLE_FACTORY = new Function<ZipInputStream, Flowable<ZippedEntry>>() {
190             @Override
191             public Flowable<ZippedEntry> apply(ZipInputStream zis) {
192                 return unzip(zis);
193             }
194         };
195     }
196 
197 }