View Javadoc
1   package xuml.tools.model.compiler.runtime;
2   
3   import java.io.Serializable;
4   import java.util.List;
5   import java.util.Map;
6   import java.util.concurrent.TimeUnit;
7   
8   import javax.persistence.EntityManager;
9   import javax.persistence.EntityManagerFactory;
10  import javax.persistence.EntityTransaction;
11  
12  import org.slf4j.Logger;
13  import org.slf4j.LoggerFactory;
14  
15  import com.google.common.base.Optional;
16  import com.google.common.base.Preconditions;
17  import com.google.common.collect.Maps;
18  import com.typesafe.config.ConfigFactory;
19  
20  import akka.actor.ActorRef;
21  import akka.actor.ActorSystem;
22  import akka.actor.Cancellable;
23  import akka.actor.Props;
24  import akka.actor.Terminated;
25  import scala.concurrent.ExecutionContext;
26  import scala.concurrent.Future;
27  import scala.concurrent.duration.Duration;
28  import scala.concurrent.duration.FiniteDuration;
29  import xuml.tools.model.compiler.runtime.actor.RootActor;
30  import xuml.tools.model.compiler.runtime.message.ActorConfig;
31  import xuml.tools.model.compiler.runtime.message.Signal;
32  
33  public class Signaller {
34  
35      private static final Logger log = LoggerFactory.getLogger(Signaller.class);
36  
37      private final ThreadLocal<Info> info = new ThreadLocal<Info>() {
38          @Override
39          protected Info initialValue() {
40              return new Info();
41          }
42      };
43      private final ActorSystem actorSystem = ActorSystem.create("xuml-tools",
44              ConfigFactory.load("xuml-akka").withFallback(ConfigFactory.load()));
45      private final ActorRef root = actorSystem.actorOf(Props.create(RootActor.class), "root");
46      private final EntityManagerFactory emf;
47  
48      public Signaller(EntityManagerFactory emf, int entityActorPoolSize,
49              SignalProcessorListenerFactory listenerFactory) {
50          this.emf = emf;
51          log.debug("Akka system settings:\n{}", actorSystem.settings());
52          root.tell(new ActorConfig(entityActorPoolSize), root);
53          root.tell(emf, root);
54          if (listenerFactory != null)
55              root.tell(listenerFactory, root);
56      }
57  
58      public EntityManagerFactory getEntityManagerFactory() {
59          return emf;
60      }
61  
62      /**
63       * Returns a new instance of type T using the given {@link CreationEvent}.
64       * This is a synchronous creation using a newly created then closed
65       * EntityManager for persisting the entity. If you need finer grained
66       * control of commits then open your own entity manager and do the the
67       * persist yourself.
68       * 
69       * @param cls
70       * @param event
71       * @return
72       */
73      public <T extends Entity<T>> T create(Class<T> cls, CreationEvent<T> event) {
74          EntityManager em = null;
75          EntityTransaction tx = null;
76          T t;
77          try {
78              // TODO add before and after listener notifications for create event
79              // (see EntityActor for listener example for non-creation events
80              t = cls.newInstance();
81          } catch (InstantiationException e) {
82              throw new RuntimeException(e);
83          } catch (IllegalAccessException e) {
84              throw new RuntimeException(e);
85          }
86          try {
87              em = emf.createEntityManager();
88              t.helper().setEntityManager(em);
89              tx = em.getTransaction();
90              tx.begin();
91              t.event(event);
92              em.persist(t);
93              tx.commit();
94              // only after successful commit do we send the signals to other
95              // entities made during onEntry procedure.
96              t.helper().sendQueuedSignals();
97          } catch (RuntimeException e) {
98              if (tx != null && tx.isActive())
99                  tx.rollback();
100             throw e;
101         } finally {
102             t.helper().setEntityManager(null);
103             if (em != null && em.isOpen())
104                 em.close();
105         }
106         return t;
107 
108     }
109 
110     public <T extends Entity<T>> void signal(String fromEntityUniqueId, Entity<T> entity,
111             Event<T> event, Optional<Duration> delay) {
112         signal(fromEntityUniqueId, entity, event, delay, Optional.<FiniteDuration> absent());
113     }
114 
115     public <T extends Entity<T>> void signal(String fromEntityUniqueId, Entity<T> entity,
116             Event<T> event, Long time, Optional<FiniteDuration> repeatInterval) {
117         signal(fromEntityUniqueId, entity, event, getDelay(time), repeatInterval);
118     }
119 
120     private Optional<Duration> getDelay(Long time) {
121         long now = System.currentTimeMillis();
122         if (time == null || time <= now)
123             return Optional.absent();
124         else
125             return Optional.<Duration> of(Duration.create(time - now, TimeUnit.MILLISECONDS));
126     }
127 
128     public <T extends Entity<T>> void signal(String fromEntityUniqueId, Entity<T> entity,
129             Event<T> event, Optional<Duration> delay, Optional<FiniteDuration> repeatInterval) {
130         Preconditions.checkNotNull(delay);
131         Preconditions.checkNotNull(repeatInterval);
132         long time;
133         long now = System.currentTimeMillis();
134         if (!delay.isPresent())
135             time = now;
136         else
137             time = now + delay.get().toMillis();
138         Optional<Long> repeatIntervalMs;
139         if (repeatInterval.isPresent())
140             repeatIntervalMs = Optional.of(repeatInterval.get().toMillis());
141         else
142             repeatIntervalMs = Optional.absent();
143 
144         @SuppressWarnings("unchecked")
145         String id = persistSignal(fromEntityUniqueId, entity.getId(), (Class<T>) entity.getClass(),
146                 event, time, repeatIntervalMs, entity.uniqueId());
147         @SuppressWarnings("unchecked")
148         Signal<T> signal = new Signal<T>(fromEntityUniqueId, (Class<Entity<T>>) entity.getClass(),
149                 event, id, time, repeatInterval, entity.getId(), entity.uniqueId());
150         signal(signal);
151     }
152 
153     private static class EntityEvent {
154         String fromEntityUniqueId;
155         String entityUniqueId;
156         String eventSignature;
157 
158         EntityEvent(String fromEntityUniqueId, String entityUniqueId, String eventSignature) {
159             this.fromEntityUniqueId = fromEntityUniqueId;
160             this.entityUniqueId = entityUniqueId;
161             this.eventSignature = eventSignature;
162         }
163 
164         @Override
165         public int hashCode() {
166             final int prime = 31;
167             int result = 1;
168             result = prime * result + ((entityUniqueId == null) ? 0 : entityUniqueId.hashCode());
169             result = prime * result + ((eventSignature == null) ? 0 : eventSignature.hashCode());
170             result = prime * result
171                     + ((fromEntityUniqueId == null) ? 0 : fromEntityUniqueId.hashCode());
172             return result;
173         }
174 
175         @Override
176         public boolean equals(Object obj) {
177             if (this == obj)
178                 return true;
179             if (obj == null)
180                 return false;
181             if (getClass() != obj.getClass())
182                 return false;
183             EntityEvent other = (EntityEvent) obj;
184             if (entityUniqueId == null) {
185                 if (other.entityUniqueId != null)
186                     return false;
187             } else if (!entityUniqueId.equals(other.entityUniqueId))
188                 return false;
189             if (eventSignature == null) {
190                 if (other.eventSignature != null)
191                     return false;
192             } else if (!eventSignature.equals(other.eventSignature))
193                 return false;
194             if (fromEntityUniqueId == null) {
195                 if (other.fromEntityUniqueId != null)
196                     return false;
197             } else if (!fromEntityUniqueId.equals(other.fromEntityUniqueId))
198                 return false;
199             return true;
200         }
201 
202     }
203 
204     private final Map<EntityEvent, Cancellable> scheduleCancellers = Maps.newHashMap();
205 
206     public <T> void cancelSignal(String fromEntityUniqueId, Entity<T> entity,
207             String eventSignatureKey) {
208         cancelSignal(fromEntityUniqueId, entity.uniqueId(), eventSignatureKey);
209     }
210 
211     <T> void signal(Signal<T> signal) {
212         if (signalInitiatedFromEvent()) {
213             info.get().getCurrentEntity().helper().queueSignal(signal);
214         } else {
215             long now = System.currentTimeMillis();
216             long delayMs = (signal.getTime() == null ? now : signal.getTime()) - now;
217             if (delayMs <= 0)
218                 root.tell(signal, root);
219             else {
220                 // There can be at most one delayed signal of a given event
221                 // signature outstanding for each sender-receiver instance pair
222                 // at any one time. Mellor & Balcer p194.
223                 synchronized (this) {
224                     EntityEvent key = cancelSignal(signal.getFromEntityUniqueId(),
225                             signal.getEntityUniqueId(), signal.getEvent().signatureKey());
226 
227                     Cancellable cancellable;
228                     ExecutionContext executionContext = actorSystem.dispatcher();
229                     if (!signal.getRepeatInterval().isPresent())
230                         cancellable = actorSystem.scheduler().scheduleOnce(
231                                 Duration.create(delayMs, TimeUnit.MILLISECONDS), root, signal,
232                                 executionContext, root);
233                     else
234                         cancellable = actorSystem.scheduler().schedule(
235                                 Duration.create(delayMs, TimeUnit.MILLISECONDS),
236                                 signal.getRepeatInterval().get(), root, signal, executionContext,
237                                 root);
238                     scheduleCancellers.put(key, cancellable);
239                 }
240             }
241         }
242     }
243 
244     private <T> EntityEvent cancelSignal(String fromEntityUniqueid, String toEntityUniqueId,
245             String eventSignatureKey) {
246         EntityEvent key = new EntityEvent(fromEntityUniqueid, toEntityUniqueId, eventSignatureKey);
247         Cancellable current = scheduleCancellers.get(key);
248         if (current != null)
249             current.cancel();
250         return key;
251     }
252 
253     public List<QueuedSignal> queuedSignals() {
254         EntityManager em = emf.createEntityManager();
255         EntityTransaction tx = null;
256         try {
257             tx = em.getTransaction();
258             tx.begin();
259             List<QueuedSignal> signals = em.createQuery(
260                     "select s from " + QueuedSignal.class.getSimpleName() + " s order by id",
261                     QueuedSignal.class).getResultList();
262             tx.commit();
263             return signals;
264         } catch (RuntimeException e) {
265             if (tx != null && tx.isActive())
266                 tx.rollback();
267             throw e;
268         } finally {
269             em.close();
270         }
271     }
272 
273     public int sendSignalsInQueue() {
274         List<QueuedSignal> signals = queuedSignals();
275         for (QueuedSignal sig : signals) {
276             signal(sig);
277         }
278         return signals.size();
279     }
280 
281     public long queueSize() {
282         EntityManager em = emf.createEntityManager();
283         EntityTransaction tx = null;
284         long count;
285         try {
286             tx = em.getTransaction();
287             tx.begin();
288             count = em.createQuery(
289                     "select count(s) from " + QueuedSignal.class.getSimpleName() + " s", Long.class)
290                     .getSingleResult();
291             tx.commit();
292             return count;
293         } catch (RuntimeException e) {
294             if (tx != null && tx.isActive())
295                 tx.rollback();
296             throw e;
297         } finally {
298             em.close();
299         }
300     }
301 
302     @SuppressWarnings({ "rawtypes", "unchecked" })
303     private void signal(QueuedSignal sig) {
304         log.debug("sending {}", sig);
305         Event<?> event = Util.toObject(sig.eventContent, sig.eventClass());
306         Serializable id = Util.toObject(sig.idContent, sig.idClass());
307         Class<?> entityClass = getClassForName(sig.entityClassName);
308         signal(new Signal(sig.fromEntityUniqueId, entityClass, event, sig.id, id,
309                 sig.toEntityUniqueId));
310     }
311 
312     private Class<?> getClassForName(String className) {
313         try {
314             return Class.forName(className);
315         } catch (ClassNotFoundException e) {
316             throw new RuntimeException(e);
317         }
318     }
319 
320     public <T extends Entity<T>> String persistSignal(String fromEntityUniqueId, Object id,
321             Class<T> cls, Event<T> event, long time, Optional<Long> repeatIntervalMs,
322             String entityUniqueId) {
323         byte[] idBytes = Util.toBytes(id);
324         byte[] eventBytes = Util.toBytes(event);
325         QueuedSignalme/QueuedSignal.html#QueuedSignal">QueuedSignal signal = new QueuedSignal(id.getClass().getName(), idBytes, cls.getName(),
326                 event.getClass().getName(), eventBytes, time, repeatIntervalMs, fromEntityUniqueId,
327                 entityUniqueId);
328         EntityManager em = emf.createEntityManager();
329         em.getTransaction().begin();
330         em.persist(signal);
331         em.getTransaction().commit();
332         em.close();
333         log.trace("persisted {}", signal);
334         return signal.id;
335     }
336 
337     private boolean signalInitiatedFromEvent() {
338         return info.get().getCurrentEntity() != null;
339     }
340 
341     public Info getInfo() {
342         return info.get();
343     }
344 
345     public Future<Terminated> stop() {
346         return actorSystem.terminate();
347     }
348 
349     public void close() {
350         emf.close();
351     }
352 }