View Javadoc
1   package org.davidmoten.kool.internal.operators.stream;
2   
3   import java.util.concurrent.Callable;
4   
5   import org.davidmoten.kool.Single;
6   import org.davidmoten.kool.Stream;
7   import org.davidmoten.kool.StreamIterator;
8   import org.davidmoten.kool.function.BiConsumer;
9   import org.davidmoten.kool.internal.util.Exceptions;
10  
11  public final class Collect<T, R> implements Single<R> {
12  
13      private final Callable<? extends R> factory;
14      private final BiConsumer<? super R, ? super T> collector;
15      private final Stream<T> source;
16  
17      public Collect(Callable<? extends R> factory, BiConsumer<? super R, ? super T> collector,
18              Stream<T> source) {
19          this.factory = factory;
20          this.collector = collector;
21          this.source = source;
22      }
23  
24      @Override
25      public R get() {
26          StreamIterator<T> it = source.iteratorNullChecked();
27          try {
28              R c = factory.call();
29              while (it.hasNext()) {
30                  collector.accept(c, it.nextNullChecked());
31              }
32              return c;
33          } catch (Throwable e) {
34              return Exceptions.rethrow(e);
35          } finally {
36              it.dispose();
37          }
38      }
39  
40  }