1 package org.davidmoten.kool.internal.operators.stream; 2 3 import java.util.concurrent.Callable; 4 5 import org.davidmoten.kool.Single; 6 import org.davidmoten.kool.Stream; 7 import org.davidmoten.kool.StreamIterator; 8 import org.davidmoten.kool.function.BiConsumer; 9 import org.davidmoten.kool.internal.util.Exceptions; 10 11 public final class Collect<T, R> implements Single<R> { 12 13 private final Callable<? extends R> factory; 14 private final BiConsumer<? super R, ? super T> collector; 15 private final Stream<T> source; 16 17 public Collect(Callable<? extends R> factory, BiConsumer<? super R, ? super T> collector, 18 Stream<T> source) { 19 this.factory = factory; 20 this.collector = collector; 21 this.source = source; 22 } 23 24 @Override 25 public R get() { 26 StreamIterator<T> it = source.iteratorNullChecked(); 27 try { 28 R c = factory.call(); 29 while (it.hasNext()) { 30 collector.accept(c, it.nextNullChecked()); 31 } 32 return c; 33 } catch (Throwable e) { 34 return Exceptions.rethrow(e); 35 } finally { 36 it.dispose(); 37 } 38 } 39 40 }