View Javadoc
1   package org.davidmoten.kool.internal.operators.stream;
2   
3   import java.util.List;
4   import java.util.NoSuchElementException;
5   
6   import org.davidmoten.kool.Stream;
7   import org.davidmoten.kool.StreamIterator;
8   import org.davidmoten.kool.exceptions.CompositeException;
9   import org.davidmoten.kool.internal.util.Exceptions;
10  
11  import com.github.davidmoten.guavamini.Lists;
12  
13  public final class MergeInterleaved<T> implements Stream<T> {
14  
15      private final Stream<? extends T>[] streams;
16  
17      @SafeVarargs
18      public MergeInterleaved(Stream<? extends T>... streams) {
19          this.streams = streams;
20      }
21  
22      @Override
23      public StreamIterator<T> iterator() {
24          return new StreamIterator<T>() {
25  
26              List<StreamIterator<? extends T>> list = getIterators(streams);
27              int index = 0;
28              T next;
29  
30              @Override
31              public boolean hasNext() {
32                  load();
33                  return next != null;
34              }
35  
36              @Override
37              public T next() {
38                  load();
39                  T v = next;
40                  if (v == null) {
41                      throw new NoSuchElementException();
42                  } else {
43                      next = null;
44                      return v;
45                  }
46              }
47  
48              @Override
49              public void dispose() {
50                  if (list != null) {
51                      try {
52                          List<Throwable> errors = getDisposalErrors();
53                          throw new CompositeException(errors);
54                      } finally {
55                          list = null;
56                      }
57                  }
58              }
59  
60              private List<Throwable> getDisposalErrors() {
61                  List<Throwable> errors = Lists.newArrayList();
62                  for (StreamIterator<? extends T> it : list) {
63                      try {
64                          it.dispose();
65                      } catch (Throwable e) {
66                          errors.add(e);
67                      }
68                  }
69                  return errors;
70              }
71  
72              private void load() {
73                  if (list != null && !list.isEmpty() && next == null) {
74                      while (true) {
75                          StreamIterator<? extends T> it = list.get(index);
76                          if (it.hasNext()) {
77                              next = it.nextNullChecked();
78                              index = (index + 1) % list.size();
79                              break;
80                          } else {
81                              int idx = list.indexOf(it);
82                              list.remove(idx);
83                              try {
84                                  it.dispose();
85                              } catch (Throwable e) {
86                                  // dispose all and throw
87                                  List<Throwable> errors = getDisposalErrors();
88                                  if (errors.isEmpty()) {
89                                      Exceptions.rethrow(e);
90                                      return;
91                                  } else {
92                                      errors.add(e);
93                                      throw new CompositeException(errors);
94                                  }
95                              }
96                              if (list.isEmpty()) {
97                                  list = null;
98                                  break;
99                              }
100                             if (index >= idx) {
101                                 index = index % list.size();
102                             }
103                         }
104                     }
105                 }
106             }
107 
108         };
109     }
110 
111     static <T> List<StreamIterator<? extends T>> getIterators(Stream<? extends T>[] streams) {
112         List<StreamIterator<? extends T>> list = Lists.newArrayList();
113         for (Stream<? extends T> stream : streams) {
114             list.add(stream.iteratorNullChecked());
115         }
116         return list;
117     }
118 
119 }