CPD Results

The following document contains the results of PMD's CPD 5.3.2.

Duplications

File Project Line
com/github/davidmoten/rx2/internal/flowable/FlowableInsertMaybe.java rxjava2-extras 171
com/github/davidmoten/rx2/internal/flowable/FlowableInsertTimeout.java rxjava2-extras 186
                                downstream.onComplete();
                            }
                            return;
                        } else {
                            // nothing to emit and not done
                            break;
                        }
                    } else {
                        downstream.onNext(t);
                        e++;
                    }
                }
                if (e != 0L && r != Long.MAX_VALUE) {
                    requested.addAndGet(-e);
                }
                missed = addAndGet(-missed);
                if (missed == 0) {
                    return;
                }
            }
        }

        @Override
        public void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(requested, n);
                // modify request to upstream to account for inserted values
                // use a CAS loop because request can be called from any thread
                while (true) {
                    long ins = inserted.get();
                    long d = Math.min(ins, n);
                    if (inserted.compareAndSet(ins, ins - d)) {
                        if (n - d > 0) {
                            upstream.request(n - d);
                        }
                        break;
                    }
                }
                drain();
            }
        }

        @Override
        public void cancel() {
            if (!cancelled) {
                cancelled = true;
                upstream.cancel();
File Project Line
com/github/davidmoten/rx2/Flowables.java rxjava2-extras 300
com/github/davidmoten/rx2/Observables.java rxjava2-extras 136
            final Scheduler scheduler, final AtomicReference<CachedFlowable<T>> cacheRef,
            final AtomicReference<Optional<Scheduler.Worker>> workerRef) {

        Runnable action = new Runnable() {
            @Override
            public void run() {
                cacheRef.get().reset();
            }
        };
        // CAS loop to cancel the current worker and create a new one
        while (true) {
            Optional<Scheduler.Worker> wOld = workerRef.get();
            if (wOld == null) {
                // we are finished
                return;
            }
            Optional<Scheduler.Worker> w = Optional.of(scheduler.createWorker());
            if (workerRef.compareAndSet(wOld, w)) {
                if (wOld.isPresent())
                    wOld.get().dispose();
                w.get().schedule(action, duration, unit);
                break;
            }
        }
    }
File Project Line
com/github/davidmoten/rx2/Flowables.java rxjava2-extras 265
com/github/davidmoten/rx2/Observables.java rxjava2-extras 100
        CachedFlowable<T> cache = new CachedFlowable<T>(source);
        cacheRef.set(cache);
        Runnable closeAction = new Runnable() {
            @Override
            public void run() {
                while (true) {
                    Optional<Scheduler.Worker> w = workerRef.get();
                    if (w == null) {
                        // we are finished
                        break;
                    } else {
                        if (workerRef.compareAndSet(w, null)) {
                            if (w.isPresent()) {
                                w.get().dispose();
                            }
                            // we are finished
                            workerRef.set(null);
                            break;
                        }
                    }
                    // if not finished then try again
                }
            }
        };
        Runnable resetAction = new Runnable() {

            @Override
            public void run() {
                startScheduledResetAgain(duration, unit, scheduler, cacheRef, workerRef);
            }
        };
        return new CloseableFlowableWithReset<T>(cache, closeAction, resetAction);
File Project Line
com/github/davidmoten/rx2/StateMachine.java rxjava2-extras 63
com/github/davidmoten/rx2/StateMachine2.java rxjava2-extras 18
    public static Builder builder() {
        return new Builder();
    }

    public static final class Builder {

        private Builder() {
            // prevent instantiation from other packages
        }

        public <State> Builder2<State> initialStateFactory(Callable<State> initialState) {
            return new Builder2<State>(initialState);
        }

        public <State> Builder2<State> initialState(final State initialState) {
            return initialStateFactory(Callables.constant(initialState));
        }

    }

    public static final class Builder2<State> {

        private final Callable<State> initialState;

        private Builder2(Callable<State> initialState) {
            this.initialState = initialState;
        }

        public <In, Out> Builder3<State, In, Out> transition(Transition<State, In, Out> transition) {
File Project Line
com/github/davidmoten/rx2/Flowables.java rxjava2-extras 29
com/github/davidmoten/rx2/flowable/Transformers.java rxjava2-extras 125
    }

    public static <A, B, K, C> Flowable<C> match(Flowable<A> a, Flowable<B> b,
            Function<? super A, K> aKey, Function<? super B, K> bKey,
            BiFunction<? super A, ? super B, C> combiner, int requestSize) {
        return new FlowableMatch<A, B, K, C>(a, b, aKey, bKey, combiner, requestSize);
    }

    public static <A, B, K, C> Flowable<C> match(Flowable<A> a, Flowable<B> b,