1 package com.github.davidmoten.rx2;
2
3 import java.io.Closeable;
4 import java.util.Arrays;
5 import java.util.List;
6 import java.util.concurrent.atomic.AtomicBoolean;
7 import java.util.concurrent.atomic.AtomicInteger;
8 import java.util.concurrent.atomic.AtomicReference;
9
10 import com.github.davidmoten.rx2.exceptions.AssertionException;
11
12 import io.reactivex.functions.Consumer;
13 import io.reactivex.functions.LongConsumer;
14
15 public final class Consumers {
16
17 private Consumers() {
18
19 }
20
21 public static LongConsumer addLongTo(final List<Long> list) {
22 return new LongConsumer() {
23
24 @Override
25 public void accept(long t) throws Exception {
26 list.add(t);
27 }
28
29 };
30 }
31
32 @SuppressWarnings("unchecked")
33 public static <T extends Closeable> Consumer<T> close() {
34 return (Consumer<T>) CloseHolder.INSTANCE;
35 }
36
37 private static final class CloseHolder {
38 final static Consumer<Closeable> INSTANCE = new Consumer<Closeable>() {
39 @Override
40 public void accept(Closeable t) throws Exception {
41 t.close();
42 }
43
44 };
45 }
46
47 public static Consumer<Object> increment(final AtomicInteger value) {
48 return new Consumer<Object>() {
49 @Override
50 public void accept(Object t) throws Exception {
51 value.incrementAndGet();
52 }
53 };
54 }
55
56 public static Consumer<Throwable> printStackTrace() {
57
58 return new Consumer<Throwable>() {
59 @Override
60 public void accept(Throwable e) throws Exception {
61 e.printStackTrace();
62 }
63 };
64 }
65
66 @SuppressWarnings("unchecked")
67 public static <T> Consumer<T> doNothing() {
68 return (Consumer<T>) DoNothingHolder.INSTANCE;
69 }
70
71 private static final class DoNothingHolder {
72 static final Consumer<Object> INSTANCE = new Consumer<Object>() {
73
74 @Override
75 public void accept(Object t) throws Exception {
76
77 }
78 };
79 }
80
81 public static <T> Consumer<T> set(final AtomicReference<T> value) {
82 return new Consumer<T>() {
83
84 @Override
85 public void accept(T t) throws Exception {
86 value.set(t);
87 }
88 };
89 }
90
91 public static Consumer<Integer> set(final AtomicInteger value) {
92 return new Consumer<Integer>() {
93 @Override
94 public void accept(Integer t) throws Exception {
95 value.set(t);
96 }
97 };
98 }
99
100 public static Consumer<Object> decrement(final AtomicInteger value) {
101 return new Consumer<Object>() {
102
103 @Override
104 public void accept(Object t) throws Exception {
105 value.decrementAndGet();
106 }
107
108 };
109 }
110
111 @SuppressWarnings("unchecked")
112 public static <T> Consumer<T> setToTrue(final AtomicBoolean value) {
113 return (Consumer<T>) new Consumer<Object>() {
114
115 @Override
116 public void accept(Object t) throws Exception {
117 value.set(true);
118 }
119 };
120 }
121
122 public static <T> Consumer<T> addTo(final List<T> list) {
123 return new Consumer<T>() {
124
125 @Override
126 public void accept(T t) throws Exception {
127 list.add(t);
128 }
129 };
130 }
131
132 @SuppressWarnings("unchecked")
133 public static <T> Consumer<T> println() {
134 return (Consumer<T>) PrintlnHolder.INSTANCE;
135 }
136
137 private static final class PrintlnHolder {
138 static final Consumer<Object> INSTANCE = new Consumer<Object>() {
139 @Override
140 public void accept(Object t) throws Exception {
141 System.out.println(t);
142 }
143 };
144 }
145
146 public static Consumer<byte[]> assertBytesEquals(final byte[] expected) {
147
148 return new Consumer<byte[]>() {
149
150 @Override
151 public void accept(byte[] array) throws Exception {
152 if (!Arrays.equals(expected, array)) {
153
154 throw new AssertionException("arrays not equal: expected=" + Arrays.toString(expected) + ",actual="
155 + Arrays.toString(array));
156 }
157 }
158
159 };
160 }
161
162 public static LongConsumer printLong(final String prefix) {
163 return new LongConsumer() {
164 @Override
165 public void accept(long t) throws Exception {
166 System.out.println(prefix + t);
167 }
168 };
169 }
170 }