1 package com.github.davidmoten.rx.internal.operators;
2
3 import java.util.Queue;
4
5 import rx.Observer;
6 import rx.observables.SyncOnSubscribe;
7
8 public class OnSubscribeFromQueue<T> extends SyncOnSubscribe<Queue<T>, T> {
9
10 private final Queue<T> queue;
11
12 public OnSubscribeFromQueue(Queue<T> queue) {
13 this.queue = queue;
14 }
15
16 @Override
17 protected Queue<T> generateState() {
18 return this.queue;
19 }
20
21 @Override
22 protected Queue<T> next(Queue<T> queue, Observer<? super T> observer) {
23 T value = queue.poll();
24 if (value == null) {
25 observer.onCompleted();
26 } else {
27 observer.onNext(value);
28 }
29 return queue;
30 }
31
32 }