1 package org.davidmoten.kool.internal.operators.stream;
2
3 import java.util.List;
4 import java.util.NoSuchElementException;
5
6 import org.davidmoten.kool.Stream;
7 import org.davidmoten.kool.StreamIterator;
8 import org.davidmoten.kool.exceptions.CompositeException;
9 import org.davidmoten.kool.internal.util.Exceptions;
10
11 import com.github.davidmoten.guavamini.Lists;
12
13 public final class MergeInterleaved<T> implements Stream<T> {
14
15 private final Stream<? extends T>[] streams;
16
17 @SafeVarargs
18 public MergeInterleaved(Stream<? extends T>... streams) {
19 this.streams = streams;
20 }
21
22 @Override
23 public StreamIterator<T> iterator() {
24 return new StreamIterator<T>() {
25
26 List<StreamIterator<? extends T>> list = getIterators(streams);
27 int index = 0;
28 T next;
29
30 @Override
31 public boolean hasNext() {
32 load();
33 return next != null;
34 }
35
36 @Override
37 public T next() {
38 load();
39 T v = next;
40 if (v == null) {
41 throw new NoSuchElementException();
42 } else {
43 next = null;
44 return v;
45 }
46 }
47
48 @Override
49 public void dispose() {
50 if (list != null) {
51 try {
52 List<Throwable> errors = getDisposalErrors();
53 throw new CompositeException(errors);
54 } finally {
55 list = null;
56 }
57 }
58 }
59
60 private List<Throwable> getDisposalErrors() {
61 List<Throwable> errors = Lists.newArrayList();
62 for (StreamIterator<? extends T> it : list) {
63 try {
64 it.dispose();
65 } catch (Throwable e) {
66 errors.add(e);
67 }
68 }
69 return errors;
70 }
71
72 private void load() {
73 if (list != null && !list.isEmpty() && next == null) {
74 while (true) {
75 StreamIterator<? extends T> it = list.get(index);
76 if (it.hasNext()) {
77 next = it.nextNullChecked();
78 index = (index + 1) % list.size();
79 break;
80 } else {
81 int idx = list.indexOf(it);
82 list.remove(idx);
83 try {
84 it.dispose();
85 } catch (Throwable e) {
86
87 List<Throwable> errors = getDisposalErrors();
88 if (errors.isEmpty()) {
89 Exceptions.rethrow(e);
90 return;
91 } else {
92 errors.add(e);
93 throw new CompositeException(errors);
94 }
95 }
96 if (list.isEmpty()) {
97 list = null;
98 break;
99 }
100 if (index >= idx) {
101 index = index % list.size();
102 }
103 }
104 }
105 }
106 }
107
108 };
109 }
110
111 static <T> List<StreamIterator<? extends T>> getIterators(Stream<? extends T>[] streams) {
112 List<StreamIterator<? extends T>> list = Lists.newArrayList();
113 for (Stream<? extends T> stream : streams) {
114 list.add(stream.iteratorNullChecked());
115 }
116 return list;
117 }
118
119 }