View Javadoc
1   package com.github.davidmoten.rx.internal.operators;
2   
3   import java.util.Queue;
4   
5   import rx.Observer;
6   import rx.observables.SyncOnSubscribe;
7   
8   public class OnSubscribeFromQueue<T> extends SyncOnSubscribe<Queue<T>, T> {
9   
10      private final Queue<T> queue;
11  
12      public OnSubscribeFromQueue(Queue<T> queue) {
13          this.queue = queue;
14      }
15  
16      @Override
17      protected Queue<T> generateState() {
18          return this.queue;
19      }
20  
21      @Override
22      protected Queue<T> next(Queue<T> queue, Observer<? super T> observer) {
23          T value = queue.poll();
24          if (value == null) {
25              observer.onCompleted();
26          } else {
27              observer.onNext(value);
28          }
29          return queue;
30      }
31  
32  }