1 package ordertracker;
2
3 import static org.junit.Assert.assertEquals;
4
5 import java.io.File;
6 import java.sql.Connection;
7 import java.sql.DriverManager;
8 import java.sql.SQLException;
9 import java.util.ArrayList;
10 import java.util.List;
11 import java.util.concurrent.CopyOnWriteArrayList;
12 import java.util.concurrent.CountDownLatch;
13 import java.util.concurrent.Executors;
14 import java.util.concurrent.TimeUnit;
15
16 import org.junit.After;
17 import org.junit.Before;
18 import org.junit.Test;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 import rx.Scheduler;
23 import rx.Subscriber;
24 import rx.functions.Action1;
25 import rx.schedulers.Schedulers;
26 import xuml.tools.util.database.DerbyUtil;
27
28 public class AppTest {
29
30 private static final Logger log = LoggerFactory.getLogger(AppTest.class);
31
32 private final Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
33
34 @Before
35 public void setup() {
36 DerbyUtil.disableDerbyLog();
37 for (File file : new File("target").listFiles(f -> f.getName().startsWith("testdb.")))
38 file.delete();
39 try (Connection c = DriverManager.getConnection("jdbc:hsqldb:file:target/testdb", "sa",
40 "")) {
41 c.prepareStatement("create schema ORDERTRACKER authorization sa").execute();
42 } catch (SQLException e) {
43 log.warn(e.getMessage());
44 }
45 App.startup();
46 }
47
48 @After
49 public void shutdown() {
50 App.shutdown();
51 }
52
53 @Test
54 public void testEventService() throws InterruptedException {
55
56 final CountDownLatch latch = new CountDownLatch(1);
57 EventService.instance().events().subscribeOn(scheduler).subscribe(new Action1<String>() {
58 @Override
59 public void call(String event) {
60 latch.countDown();
61 }
62 });
63
64 Order.create(new Order.Events.Create("1", "test order", "canberra", "sydney",
65 "fred@yahoo.com", "joey@gmail.com", 3, "created"));
66 latch.await(5000, TimeUnit.MILLISECONDS);
67 }
68
69 @Test
70 public void testDeliverySequence() throws InterruptedException {
71 final List<String> states = new CopyOnWriteArrayList<>();
72 List<String> expectedStates = toList(Order.State.PREPARING, Order.State.READY_FOR_DISPATCH,
73 Order.State.COURIER_ASSIGNED, Order.State.IN_TRANSIT, Order.State.IN_TRANSIT,
74 Order.State.READY_FOR_DELIVERY, Order.State.DELIVERING, Order.State.DELIVERED);
75 final List<CountDownLatch> latches = new CopyOnWriteArrayList<>();
76 for (@SuppressWarnings("unused")
77 String state : expectedStates)
78 latches.add(new CountDownLatch(1));
79 Subscriber<String> subscriber = createSubscriber(states, latches);
80 EventService.instance().events().take(expectedStates.size()).subscribeOn(scheduler)
81 .subscribe(subscriber);
82
83 Order order;
84 synchronized (this) {
85
86
87 Depot depot = Depot.create(new Depot.Events.Create("2", "Bungendore", -35.0, 142.0));
88 order = Order.create(new Order.Events.Create(depot.getId(), "test order", "canberra",
89 "sydney", "fred@yahoo.com", "joey@gmail.com", 3, "created"));
90 }
91 int count = 0;
92 checkLatch(latches, expectedStates, states, count++);
93 order.signal(new Order.Events.Send());
94 checkLatch(latches, expectedStates, states, count++);
95 order.signal(new Order.Events.Assign());
96 checkLatch(latches, expectedStates, states, count++);
97 order.signal(new Order.Events.PickedUp());
98 checkLatch(latches, expectedStates, states, count++);
99 order.signal(new Order.Events.ArrivedDepot("2"));
100 checkLatch(latches, expectedStates, states, count++);
101 order.signal(new Order.Events.ArrivedFinalDepot("2"));
102 checkLatch(latches, expectedStates, states, count++);
103 order.signal(new Order.Events.Delivering());
104 checkLatch(latches, expectedStates, states, count++);
105 order.signal(new Order.Events.Delivered());
106 checkLatch(latches, expectedStates, states, count++);
107 subscriber.unsubscribe();
108 }
109
110
111
112 @Test
113 public void testDeliverySequenceWithRetries() throws InterruptedException {
114 final List<String> states = new CopyOnWriteArrayList<>();
115 List<String> expectedStates = toList(Order.State.PREPARING, Order.State.READY_FOR_DISPATCH,
116 Order.State.COURIER_ASSIGNED, Order.State.IN_TRANSIT, Order.State.IN_TRANSIT,
117 Order.State.READY_FOR_DELIVERY, Order.State.DELIVERING, Order.State.DELIVERY_FAILED,
118 Order.State.AWAITING_NEXT_DELIVERY_ATTEMPT, Order.State.READY_FOR_DELIVERY,
119 Order.State.DELIVERING, Order.State.DELIVERY_FAILED,
120 Order.State.AWAITING_NEXT_DELIVERY_ATTEMPT, Order.State.READY_FOR_DELIVERY,
121 Order.State.DELIVERING, Order.State.DELIVERY_FAILED, Order.State.HELD_FOR_PICKUP,
122 Order.State.DELIVERED);
123 final List<CountDownLatch> latches = new ArrayList<>();
124 for (@SuppressWarnings("unused")
125 String state : expectedStates)
126 latches.add(new CountDownLatch(1));
127 Subscriber<String> subscriber = createSubscriber(states, latches);
128 EventService.instance().events().take(expectedStates.size()).subscribeOn(scheduler)
129 .subscribe(subscriber);
130 Order order;
131 synchronized (this) {
132
133 order = Order.create(new Order.Events.Create("3", "test order", "canberra", "sydney",
134 "fred@yahoo.com", "joey@gmail.com", 3, "created"));
135 Depot.create(new Depot.Events.Create("3", "Bungendore", -35.0, 142.0));
136 }
137 int count = 0;
138 checkLatch(latches, expectedStates, states, count++);
139 order.signal(new Order.Events.Send());
140 checkLatch(latches, expectedStates, states, count++);
141 order.signal(new Order.Events.Assign());
142 checkLatch(latches, expectedStates, states, count++);
143 order.signal(new Order.Events.PickedUp());
144 checkLatch(latches, expectedStates, states, count++);
145 order.signal(new Order.Events.ArrivedDepot("3"));
146 checkLatch(latches, expectedStates, states, count++);
147 order.signal(new Order.Events.ArrivedFinalDepot("3"));
148 checkLatch(latches, expectedStates, states, count++);
149 order.signal(new Order.Events.Delivering());
150 checkLatch(latches, expectedStates, states, count++);
151 order.signal(new Order.Events.DeliveryFailed());
152 checkLatch(latches, expectedStates, states, count++);
153 order.signal(new Order.Events.DeliverAgain());
154 checkLatch(latches, expectedStates, states, count++);
155 order.signal(new Order.Events.DeliveryFailed());
156 checkLatch(latches, expectedStates, states, count++);
157
158
159
160
161
162
163 subscriber.unsubscribe();
164 }
165
166 private static List<String> toList(Order.State... states) {
167 List<String> list = new ArrayList<String>();
168 for (Order.State state : states)
169 list.add(state.toString());
170 return list;
171 }
172
173 private static synchronized void checkLatch(List<CountDownLatch> latches,
174 List<String> expectedStates, List<String> states, int index)
175 throws InterruptedException {
176 System.out.println(
177 "waiting for latch " + index + " to detect state " + expectedStates.get(index));
178 latches.get(index).await(120, TimeUnit.SECONDS);
179 System.out.println("latch obtained for " + index);
180 assertEquals(expectedStates.subList(0, index + 1), states.subList(0, index + 1));
181 }
182
183 private Subscriber<String> createSubscriber(final List<String> states,
184 final List<CountDownLatch> latches) {
185 return new Subscriber<String>() {
186
187 int count = 0;
188
189 @Override
190 public void onCompleted() {
191 }
192
193 @Override
194 public void onError(Throwable e) {
195 throw new RuntimeException(e);
196 }
197
198 @Override
199 public void onNext(String state) {
200 log.info(Thread.currentThread().getName() + " - " + state);
201 states.add(state);
202 latches.get(count).countDown();
203 count++;
204 }
205 };
206 }
207
208 }