1 package com.github.davidmoten.rx.internal.operators; 2 3 import java.util.Queue; 4 5 import rx.Observer; 6 import rx.observables.SyncOnSubscribe; 7 8 public class OnSubscribeFromQueue<T> extends SyncOnSubscribe<Queue<T>, T> { 9 10 private final Queue<T> queue; 11 12 public OnSubscribeFromQueue(Queue<T> queue) { 13 this.queue = queue; 14 } 15 16 @Override 17 protected Queue<T> generateState() { 18 return this.queue; 19 } 20 21 @Override 22 protected Queue<T> next(Queue<T> queue, Observer<? super T> observer) { 23 T value = queue.poll(); 24 if (value == null) { 25 observer.onCompleted(); 26 } else { 27 observer.onNext(value); 28 } 29 return queue; 30 } 31 32 }