View Javadoc
1   package ordertracker;
2   
3   import static org.junit.Assert.assertEquals;
4   
5   import java.io.File;
6   import java.sql.Connection;
7   import java.sql.DriverManager;
8   import java.sql.SQLException;
9   import java.util.ArrayList;
10  import java.util.List;
11  import java.util.concurrent.CopyOnWriteArrayList;
12  import java.util.concurrent.CountDownLatch;
13  import java.util.concurrent.Executors;
14  import java.util.concurrent.TimeUnit;
15  
16  import org.junit.After;
17  import org.junit.Before;
18  import org.junit.Test;
19  import org.slf4j.Logger;
20  import org.slf4j.LoggerFactory;
21  
22  import rx.Scheduler;
23  import rx.Subscriber;
24  import rx.functions.Action1;
25  import rx.schedulers.Schedulers;
26  import xuml.tools.util.database.DerbyUtil;
27  
28  public class AppTest {
29  
30      private static final Logger log = LoggerFactory.getLogger(AppTest.class);
31  
32      private final Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
33  
34      @Before
35      public void setup() {
36          DerbyUtil.disableDerbyLog();
37          for (File file : new File("target").listFiles(f -> f.getName().startsWith("testdb.")))
38              file.delete();
39          try (Connection c = DriverManager.getConnection("jdbc:hsqldb:file:target/testdb", "sa",
40                  "")) {
41              c.prepareStatement("create schema ORDERTRACKER authorization sa").execute();
42          } catch (SQLException e) {
43              log.warn(e.getMessage());
44          }
45          App.startup();
46      }
47  
48      @After
49      public void shutdown() {
50          App.shutdown();
51      }
52  
53      @Test
54      public void testEventService() throws InterruptedException {
55          // TODO move this test to xuml-model-compiler-test
56          final CountDownLatch latch = new CountDownLatch(1);
57          EventService.instance().events().subscribeOn(scheduler).subscribe(new Action1<String>() {
58              @Override
59              public void call(String event) {
60                  latch.countDown();
61              }
62          });
63  
64          Order.create(new Order.Events.Create("1", "test order", "canberra", "sydney",
65                  "fred@yahoo.com", "joey@gmail.com", 3, "created"));
66          latch.await(5000, TimeUnit.MILLISECONDS);
67      }
68  
69      @Test
70      public void testDeliverySequence() throws InterruptedException {
71          final List<String> states = new CopyOnWriteArrayList<>();
72          List<String> expectedStates = toList(Order.State.PREPARING, Order.State.READY_FOR_DISPATCH,
73                  Order.State.COURIER_ASSIGNED, Order.State.IN_TRANSIT, Order.State.IN_TRANSIT,
74                  Order.State.READY_FOR_DELIVERY, Order.State.DELIVERING, Order.State.DELIVERED);
75          final List<CountDownLatch> latches = new CopyOnWriteArrayList<>();
76          for (@SuppressWarnings("unused")
77          String state : expectedStates)
78              latches.add(new CountDownLatch(1));
79          Subscriber<String> subscriber = createSubscriber(states, latches);
80          EventService.instance().events().take(expectedStates.size()).subscribeOn(scheduler)
81                  .subscribe(subscriber);
82  
83          Order order;
84          synchronized (this) {
85              // note for using with h2 enforce happens-before by creating order
86              // using depot
87              Depot depot = Depot.create(new Depot.Events.Create("2", "Bungendore", -35.0, 142.0));
88              order = Order.create(new Order.Events.Create(depot.getId(), "test order", "canberra",
89                      "sydney", "fred@yahoo.com", "joey@gmail.com", 3, "created"));
90          }
91          int count = 0;
92          checkLatch(latches, expectedStates, states, count++);
93          order.signal(new Order.Events.Send());
94          checkLatch(latches, expectedStates, states, count++);
95          order.signal(new Order.Events.Assign());
96          checkLatch(latches, expectedStates, states, count++);
97          order.signal(new Order.Events.PickedUp());
98          checkLatch(latches, expectedStates, states, count++);
99          order.signal(new Order.Events.ArrivedDepot("2"));
100         checkLatch(latches, expectedStates, states, count++);
101         order.signal(new Order.Events.ArrivedFinalDepot("2"));
102         checkLatch(latches, expectedStates, states, count++);
103         order.signal(new Order.Events.Delivering());
104         checkLatch(latches, expectedStates, states, count++);
105         order.signal(new Order.Events.Delivered());
106         checkLatch(latches, expectedStates, states, count++);
107         subscriber.unsubscribe();
108     }
109 
110     // keep timeout quite large so that freebie CI servers don't fail when they
111     // are under load
112     @Test
113     public void testDeliverySequenceWithRetries() throws InterruptedException {
114         final List<String> states = new CopyOnWriteArrayList<>();
115         List<String> expectedStates = toList(Order.State.PREPARING, Order.State.READY_FOR_DISPATCH,
116                 Order.State.COURIER_ASSIGNED, Order.State.IN_TRANSIT, Order.State.IN_TRANSIT,
117                 Order.State.READY_FOR_DELIVERY, Order.State.DELIVERING, Order.State.DELIVERY_FAILED,
118                 Order.State.AWAITING_NEXT_DELIVERY_ATTEMPT, Order.State.READY_FOR_DELIVERY,
119                 Order.State.DELIVERING, Order.State.DELIVERY_FAILED,
120                 Order.State.AWAITING_NEXT_DELIVERY_ATTEMPT, Order.State.READY_FOR_DELIVERY,
121                 Order.State.DELIVERING, Order.State.DELIVERY_FAILED, Order.State.HELD_FOR_PICKUP,
122                 Order.State.DELIVERED);
123         final List<CountDownLatch> latches = new ArrayList<>();
124         for (@SuppressWarnings("unused")
125         String state : expectedStates)
126             latches.add(new CountDownLatch(1));
127         Subscriber<String> subscriber = createSubscriber(states, latches);
128         EventService.instance().events().take(expectedStates.size()).subscribeOn(scheduler)
129                 .subscribe(subscriber);
130         Order order;
131         synchronized (this) {
132             // prevent reordering with latch checks
133             order = Order.create(new Order.Events.Create("3", "test order", "canberra", "sydney",
134                     "fred@yahoo.com", "joey@gmail.com", 3, "created"));
135             Depot.create(new Depot.Events.Create("3", "Bungendore", -35.0, 142.0));
136         }
137         int count = 0;
138         checkLatch(latches, expectedStates, states, count++);
139         order.signal(new Order.Events.Send());
140         checkLatch(latches, expectedStates, states, count++);
141         order.signal(new Order.Events.Assign());
142         checkLatch(latches, expectedStates, states, count++);
143         order.signal(new Order.Events.PickedUp());
144         checkLatch(latches, expectedStates, states, count++);
145         order.signal(new Order.Events.ArrivedDepot("3"));
146         checkLatch(latches, expectedStates, states, count++);
147         order.signal(new Order.Events.ArrivedFinalDepot("3"));
148         checkLatch(latches, expectedStates, states, count++);
149         order.signal(new Order.Events.Delivering());
150         checkLatch(latches, expectedStates, states, count++);
151         order.signal(new Order.Events.DeliveryFailed());
152         checkLatch(latches, expectedStates, states, count++);
153         order.signal(new Order.Events.DeliverAgain());
154         checkLatch(latches, expectedStates, states, count++);
155         order.signal(new Order.Events.DeliveryFailed());
156         checkLatch(latches, expectedStates, states, count++);
157         // order.signal(new Order.Events.DeliverAgain());
158         // checkLatch(latches, expectedStates, states, count++);
159         // order.signal(new Order.Events.DeliveryFailed());
160         // checkLatch(latches, expectedStates, states, count++);
161         // order.signal(new Order.Events.DeliveredByPickup());
162         // checkLatch(latches, expectedStates, states, count++);
163         subscriber.unsubscribe();
164     }
165 
166     private static List<String> toList(Order.State... states) {
167         List<String> list = new ArrayList<String>();
168         for (Order.State state : states)
169             list.add(state.toString());
170         return list;
171     }
172 
173     private static synchronized void checkLatch(List<CountDownLatch> latches,
174             List<String> expectedStates, List<String> states, int index)
175                     throws InterruptedException {
176         System.out.println(
177                 "waiting for latch " + index + " to detect state " + expectedStates.get(index));
178         latches.get(index).await(120, TimeUnit.SECONDS);
179         System.out.println("latch obtained for " + index);
180         assertEquals(expectedStates.subList(0, index + 1), states.subList(0, index + 1));
181     }
182 
183     private Subscriber<String> createSubscriber(final List<String> states,
184             final List<CountDownLatch> latches) {
185         return new Subscriber<String>() {
186 
187             int count = 0;
188 
189             @Override
190             public void onCompleted() {
191             }
192 
193             @Override
194             public void onError(Throwable e) {
195                 throw new RuntimeException(e);
196             }
197 
198             @Override
199             public void onNext(String state) {
200                 log.info(Thread.currentThread().getName() + " - " + state);
201                 states.add(state);
202                 latches.get(count).countDown();
203                 count++;
204             }
205         };
206     }
207 
208 }