View Javadoc
1   /**
2    * Copyright 2015 Netflix, Inc.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5    * use this file except in compliance with the License. You may obtain a copy of
6    * the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations under
14   * the License.
15   */
16  package com.github.davidmoten.rx.util;
17  
18  import java.util.concurrent.atomic.AtomicLong;
19  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
20  
21  /**
22   * Utility functions for use with backpressure.
23   *
24   */
25  public final class BackpressureUtils {
26  
27      /** Utility class, no instances. */
28      private BackpressureUtils() {
29          throw new IllegalStateException("No instances!");
30      }
31  
32      /**
33       * Adds {@code n} to {@code requested} field and returns the value prior to
34       * addition once the addition is successful (uses CAS semantics). If
35       * overflows then sets {@code requested} field to {@code Long.MAX_VALUE}.
36       * 
37       * @param requested
38       *            atomic field updater for a request count
39       * @param object
40       *            contains the field updated by the updater
41       * @param n
42       *            the number of requests to add to the requested count
43       * @param <T>
44       *            then type of the volatile being updated
45       * @return requested value just prior to successful addition
46       */
47      public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
48          // add n to field but check for overflow
49          while (true) {
50              long current = requested.get(object);
51              long next = current + n;
52              // check for overflow
53              if (next < 0) {
54                  next = Long.MAX_VALUE;
55              }
56              if (requested.compareAndSet(object, current, next)) {
57                  return current;
58              }
59          }
60      }
61  
62      /**
63       * Adds {@code n} to {@code requested} and returns the value prior to
64       * addition once the addition is successful (uses CAS semantics). If
65       * overflows then sets {@code requested} field to {@code Long.MAX_VALUE}.
66       * 
67       * @param requested
68       *            atomic field updater for a request count
69       * @param n
70       *            the number of requests to add to the requested count
71       * @return requested value just prior to successful addition
72       */
73      public static long getAndAddRequest(AtomicLong requested, long n) {
74          // add n to field but check for overflow
75          while (true) {
76              long current = requested.get();
77              long next = current + n;
78              // check for overflow
79              if (next < 0) {
80                  next = Long.MAX_VALUE;
81              }
82              if (requested.compareAndSet(current, next)) {
83                  return current;
84              }
85          }
86      }
87  }