1 package org.davidmoten.kool.internal.operators.stream;
2
3 import java.util.concurrent.Callable;
4
5 import org.davidmoten.kool.Single;
6 import org.davidmoten.kool.Stream;
7 import org.davidmoten.kool.StreamIterator;
8 import org.davidmoten.kool.function.BiConsumer;
9 import org.davidmoten.kool.internal.util.Exceptions;
10
11 public final class Collect<T, R> implements Single<R> {
12
13 private final Callable<? extends R> factory;
14 private final BiConsumer<? super R, ? super T> collector;
15 private final Stream<T> source;
16
17 public Collect(Callable<? extends R> factory, BiConsumer<? super R, ? super T> collector,
18 Stream<T> source) {
19 this.factory = factory;
20 this.collector = collector;
21 this.source = source;
22 }
23
24 @Override
25 public R get() {
26 StreamIterator<T> it = source.iteratorNullChecked();
27 try {
28 R c = factory.call();
29 while (it.hasNext()) {
30 collector.accept(c, it.nextNullChecked());
31 }
32 return c;
33 } catch (Throwable e) {
34 return Exceptions.rethrow(e);
35 } finally {
36 it.dispose();
37 }
38 }
39
40 }