File |
Project |
Line |
com/github/davidmoten/rx2/internal/flowable/FlowableInsertMaybe.java |
rxjava2-extras |
171 |
com/github/davidmoten/rx2/internal/flowable/FlowableInsertTimeout.java |
rxjava2-extras |
186 |
downstream.onComplete();
}
return;
} else {
// nothing to emit and not done
break;
}
} else {
downstream.onNext(t);
e++;
}
}
if (e != 0L && r != Long.MAX_VALUE) {
requested.addAndGet(-e);
}
missed = addAndGet(-missed);
if (missed == 0) {
return;
}
}
}
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
// modify request to upstream to account for inserted values
// use a CAS loop because request can be called from any thread
while (true) {
long ins = inserted.get();
long d = Math.min(ins, n);
if (inserted.compareAndSet(ins, ins - d)) {
if (n - d > 0) {
upstream.request(n - d);
}
break;
}
}
drain();
}
}
@Override
public void cancel() {
if (!cancelled) {
cancelled = true;
upstream.cancel(); |
File |
Project |
Line |
com/github/davidmoten/rx2/Flowables.java |
rxjava2-extras |
300 |
com/github/davidmoten/rx2/Observables.java |
rxjava2-extras |
136 |
final Scheduler scheduler, final AtomicReference<CachedFlowable<T>> cacheRef,
final AtomicReference<Optional<Scheduler.Worker>> workerRef) {
Runnable action = new Runnable() {
@Override
public void run() {
cacheRef.get().reset();
}
};
// CAS loop to cancel the current worker and create a new one
while (true) {
Optional<Scheduler.Worker> wOld = workerRef.get();
if (wOld == null) {
// we are finished
return;
}
Optional<Scheduler.Worker> w = Optional.of(scheduler.createWorker());
if (workerRef.compareAndSet(wOld, w)) {
if (wOld.isPresent())
wOld.get().dispose();
w.get().schedule(action, duration, unit);
break;
}
}
} |
File |
Project |
Line |
com/github/davidmoten/rx2/Flowables.java |
rxjava2-extras |
265 |
com/github/davidmoten/rx2/Observables.java |
rxjava2-extras |
100 |
CachedFlowable<T> cache = new CachedFlowable<T>(source);
cacheRef.set(cache);
Runnable closeAction = new Runnable() {
@Override
public void run() {
while (true) {
Optional<Scheduler.Worker> w = workerRef.get();
if (w == null) {
// we are finished
break;
} else {
if (workerRef.compareAndSet(w, null)) {
if (w.isPresent()) {
w.get().dispose();
}
// we are finished
workerRef.set(null);
break;
}
}
// if not finished then try again
}
}
};
Runnable resetAction = new Runnable() {
@Override
public void run() {
startScheduledResetAgain(duration, unit, scheduler, cacheRef, workerRef);
}
};
return new CloseableFlowableWithReset<T>(cache, closeAction, resetAction); |
File |
Project |
Line |
com/github/davidmoten/rx2/StateMachine.java |
rxjava2-extras |
63 |
com/github/davidmoten/rx2/StateMachine2.java |
rxjava2-extras |
18 |
public static Builder builder() {
return new Builder();
}
public static final class Builder {
private Builder() {
// prevent instantiation from other packages
}
public <State> Builder2<State> initialStateFactory(Callable<State> initialState) {
return new Builder2<State>(initialState);
}
public <State> Builder2<State> initialState(final State initialState) {
return initialStateFactory(Callables.constant(initialState));
}
}
public static final class Builder2<State> {
private final Callable<State> initialState;
private Builder2(Callable<State> initialState) {
this.initialState = initialState;
}
public <In, Out> Builder3<State, In, Out> transition(Transition<State, In, Out> transition) { |
File |
Project |
Line |
com/github/davidmoten/rx2/Flowables.java |
rxjava2-extras |
29 |
com/github/davidmoten/rx2/flowable/Transformers.java |
rxjava2-extras |
125 |
}
public static <A, B, K, C> Flowable<C> match(Flowable<A> a, Flowable<B> b,
Function<? super A, K> aKey, Function<? super B, K> bKey,
BiFunction<? super A, ? super B, C> combiner, int requestSize) {
return new FlowableMatch<A, B, K, C>(a, b, aKey, bKey, combiner, requestSize);
}
public static <A, B, K, C> Flowable<C> match(Flowable<A> a, Flowable<B> b, |