1 package xuml.tools.model.compiler.runtime;
2
3 import java.io.Serializable;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.concurrent.TimeUnit;
7
8 import javax.persistence.EntityManager;
9 import javax.persistence.EntityManagerFactory;
10 import javax.persistence.EntityTransaction;
11
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.Maps;
18 import com.typesafe.config.ConfigFactory;
19
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.Cancellable;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import scala.concurrent.ExecutionContext;
26 import scala.concurrent.Future;
27 import scala.concurrent.duration.Duration;
28 import scala.concurrent.duration.FiniteDuration;
29 import xuml.tools.model.compiler.runtime.actor.RootActor;
30 import xuml.tools.model.compiler.runtime.message.ActorConfig;
31 import xuml.tools.model.compiler.runtime.message.Signal;
32
33 public class Signaller {
34
35 private static final Logger log = LoggerFactory.getLogger(Signaller.class);
36
37 private final ThreadLocal<Info> info = new ThreadLocal<Info>() {
38 @Override
39 protected Info initialValue() {
40 return new Info();
41 }
42 };
43 private final ActorSystem actorSystem = ActorSystem.create("xuml-tools",
44 ConfigFactory.load("xuml-akka").withFallback(ConfigFactory.load()));
45 private final ActorRef root = actorSystem.actorOf(Props.create(RootActor.class), "root");
46 private final EntityManagerFactory emf;
47
48 public Signaller(EntityManagerFactory emf, int entityActorPoolSize,
49 SignalProcessorListenerFactory listenerFactory) {
50 this.emf = emf;
51 log.debug("Akka system settings:\n{}", actorSystem.settings());
52 root.tell(new ActorConfig(entityActorPoolSize), root);
53 root.tell(emf, root);
54 if (listenerFactory != null)
55 root.tell(listenerFactory, root);
56 }
57
58 public EntityManagerFactory getEntityManagerFactory() {
59 return emf;
60 }
61
62
63
64
65
66
67
68
69
70
71
72
73 public <T extends Entity<T>> T create(Class<T> cls, CreationEvent<T> event) {
74 EntityManager em = null;
75 EntityTransaction tx = null;
76 T t;
77 try {
78
79
80 t = cls.newInstance();
81 } catch (InstantiationException e) {
82 throw new RuntimeException(e);
83 } catch (IllegalAccessException e) {
84 throw new RuntimeException(e);
85 }
86 try {
87 em = emf.createEntityManager();
88 t.helper().setEntityManager(em);
89 tx = em.getTransaction();
90 tx.begin();
91 t.event(event);
92 em.persist(t);
93 tx.commit();
94
95
96 t.helper().sendQueuedSignals();
97 } catch (RuntimeException e) {
98 if (tx != null && tx.isActive())
99 tx.rollback();
100 throw e;
101 } finally {
102 t.helper().setEntityManager(null);
103 if (em != null && em.isOpen())
104 em.close();
105 }
106 return t;
107
108 }
109
110 public <T extends Entity<T>> void signal(String fromEntityUniqueId, Entity<T> entity,
111 Event<T> event, Optional<Duration> delay) {
112 signal(fromEntityUniqueId, entity, event, delay, Optional.<FiniteDuration> absent());
113 }
114
115 public <T extends Entity<T>> void signal(String fromEntityUniqueId, Entity<T> entity,
116 Event<T> event, Long time, Optional<FiniteDuration> repeatInterval) {
117 signal(fromEntityUniqueId, entity, event, getDelay(time), repeatInterval);
118 }
119
120 private Optional<Duration> getDelay(Long time) {
121 long now = System.currentTimeMillis();
122 if (time == null || time <= now)
123 return Optional.absent();
124 else
125 return Optional.<Duration> of(Duration.create(time - now, TimeUnit.MILLISECONDS));
126 }
127
128 public <T extends Entity<T>> void signal(String fromEntityUniqueId, Entity<T> entity,
129 Event<T> event, Optional<Duration> delay, Optional<FiniteDuration> repeatInterval) {
130 Preconditions.checkNotNull(delay);
131 Preconditions.checkNotNull(repeatInterval);
132 long time;
133 long now = System.currentTimeMillis();
134 if (!delay.isPresent())
135 time = now;
136 else
137 time = now + delay.get().toMillis();
138 Optional<Long> repeatIntervalMs;
139 if (repeatInterval.isPresent())
140 repeatIntervalMs = Optional.of(repeatInterval.get().toMillis());
141 else
142 repeatIntervalMs = Optional.absent();
143
144 @SuppressWarnings("unchecked")
145 String id = persistSignal(fromEntityUniqueId, entity.getId(), (Class<T>) entity.getClass(),
146 event, time, repeatIntervalMs, entity.uniqueId());
147 @SuppressWarnings("unchecked")
148 Signal<T> signal = new Signal<T>(fromEntityUniqueId, (Class<Entity<T>>) entity.getClass(),
149 event, id, time, repeatInterval, entity.getId(), entity.uniqueId());
150 signal(signal);
151 }
152
153 private static class EntityEvent {
154 String fromEntityUniqueId;
155 String entityUniqueId;
156 String eventSignature;
157
158 EntityEvent(String fromEntityUniqueId, String entityUniqueId, String eventSignature) {
159 this.fromEntityUniqueId = fromEntityUniqueId;
160 this.entityUniqueId = entityUniqueId;
161 this.eventSignature = eventSignature;
162 }
163
164 @Override
165 public int hashCode() {
166 final int prime = 31;
167 int result = 1;
168 result = prime * result + ((entityUniqueId == null) ? 0 : entityUniqueId.hashCode());
169 result = prime * result + ((eventSignature == null) ? 0 : eventSignature.hashCode());
170 result = prime * result
171 + ((fromEntityUniqueId == null) ? 0 : fromEntityUniqueId.hashCode());
172 return result;
173 }
174
175 @Override
176 public boolean equals(Object obj) {
177 if (this == obj)
178 return true;
179 if (obj == null)
180 return false;
181 if (getClass() != obj.getClass())
182 return false;
183 EntityEvent other = (EntityEvent) obj;
184 if (entityUniqueId == null) {
185 if (other.entityUniqueId != null)
186 return false;
187 } else if (!entityUniqueId.equals(other.entityUniqueId))
188 return false;
189 if (eventSignature == null) {
190 if (other.eventSignature != null)
191 return false;
192 } else if (!eventSignature.equals(other.eventSignature))
193 return false;
194 if (fromEntityUniqueId == null) {
195 if (other.fromEntityUniqueId != null)
196 return false;
197 } else if (!fromEntityUniqueId.equals(other.fromEntityUniqueId))
198 return false;
199 return true;
200 }
201
202 }
203
204 private final Map<EntityEvent, Cancellable> scheduleCancellers = Maps.newHashMap();
205
206 public <T> void cancelSignal(String fromEntityUniqueId, Entity<T> entity,
207 String eventSignatureKey) {
208 cancelSignal(fromEntityUniqueId, entity.uniqueId(), eventSignatureKey);
209 }
210
211 <T> void signal(Signal<T> signal) {
212 if (signalInitiatedFromEvent()) {
213 info.get().getCurrentEntity().helper().queueSignal(signal);
214 } else {
215 long now = System.currentTimeMillis();
216 long delayMs = (signal.getTime() == null ? now : signal.getTime()) - now;
217 if (delayMs <= 0)
218 root.tell(signal, root);
219 else {
220
221
222
223 synchronized (this) {
224 EntityEvent key = cancelSignal(signal.getFromEntityUniqueId(),
225 signal.getEntityUniqueId(), signal.getEvent().signatureKey());
226
227 Cancellable cancellable;
228 ExecutionContext executionContext = actorSystem.dispatcher();
229 if (!signal.getRepeatInterval().isPresent())
230 cancellable = actorSystem.scheduler().scheduleOnce(
231 Duration.create(delayMs, TimeUnit.MILLISECONDS), root, signal,
232 executionContext, root);
233 else
234 cancellable = actorSystem.scheduler().schedule(
235 Duration.create(delayMs, TimeUnit.MILLISECONDS),
236 signal.getRepeatInterval().get(), root, signal, executionContext,
237 root);
238 scheduleCancellers.put(key, cancellable);
239 }
240 }
241 }
242 }
243
244 private <T> EntityEvent cancelSignal(String fromEntityUniqueid, String toEntityUniqueId,
245 String eventSignatureKey) {
246 EntityEvent key = new EntityEvent(fromEntityUniqueid, toEntityUniqueId, eventSignatureKey);
247 Cancellable current = scheduleCancellers.get(key);
248 if (current != null)
249 current.cancel();
250 return key;
251 }
252
253 public List<QueuedSignal> queuedSignals() {
254 EntityManager em = emf.createEntityManager();
255 EntityTransaction tx = null;
256 try {
257 tx = em.getTransaction();
258 tx.begin();
259 List<QueuedSignal> signals = em.createQuery(
260 "select s from " + QueuedSignal.class.getSimpleName() + " s order by id",
261 QueuedSignal.class).getResultList();
262 tx.commit();
263 return signals;
264 } catch (RuntimeException e) {
265 if (tx != null && tx.isActive())
266 tx.rollback();
267 throw e;
268 } finally {
269 em.close();
270 }
271 }
272
273 public int sendSignalsInQueue() {
274 List<QueuedSignal> signals = queuedSignals();
275 for (QueuedSignal sig : signals) {
276 signal(sig);
277 }
278 return signals.size();
279 }
280
281 public long queueSize() {
282 EntityManager em = emf.createEntityManager();
283 EntityTransaction tx = null;
284 long count;
285 try {
286 tx = em.getTransaction();
287 tx.begin();
288 count = em.createQuery(
289 "select count(s) from " + QueuedSignal.class.getSimpleName() + " s", Long.class)
290 .getSingleResult();
291 tx.commit();
292 return count;
293 } catch (RuntimeException e) {
294 if (tx != null && tx.isActive())
295 tx.rollback();
296 throw e;
297 } finally {
298 em.close();
299 }
300 }
301
302 @SuppressWarnings({ "rawtypes", "unchecked" })
303 private void signal(QueuedSignal sig) {
304 log.debug("sending {}", sig);
305 Event<?> event = Util.toObject(sig.eventContent, sig.eventClass());
306 Serializable id = Util.toObject(sig.idContent, sig.idClass());
307 Class<?> entityClass = getClassForName(sig.entityClassName);
308 signal(new Signal(sig.fromEntityUniqueId, entityClass, event, sig.id, id,
309 sig.toEntityUniqueId));
310 }
311
312 private Class<?> getClassForName(String className) {
313 try {
314 return Class.forName(className);
315 } catch (ClassNotFoundException e) {
316 throw new RuntimeException(e);
317 }
318 }
319
320 public <T extends Entity<T>> String persistSignal(String fromEntityUniqueId, Object id,
321 Class<T> cls, Event<T> event, long time, Optional<Long> repeatIntervalMs,
322 String entityUniqueId) {
323 byte[] idBytes = Util.toBytes(id);
324 byte[] eventBytes = Util.toBytes(event);
325 QueuedSignalme/QueuedSignal.html#QueuedSignal">QueuedSignal signal = new QueuedSignal(id.getClass().getName(), idBytes, cls.getName(),
326 event.getClass().getName(), eventBytes, time, repeatIntervalMs, fromEntityUniqueId,
327 entityUniqueId);
328 EntityManager em = emf.createEntityManager();
329 em.getTransaction().begin();
330 em.persist(signal);
331 em.getTransaction().commit();
332 em.close();
333 log.trace("persisted {}", signal);
334 return signal.id;
335 }
336
337 private boolean signalInitiatedFromEvent() {
338 return info.get().getCurrentEntity() != null;
339 }
340
341 public Info getInfo() {
342 return info.get();
343 }
344
345 public Future<Terminated> stop() {
346 return actorSystem.terminate();
347 }
348
349 public void close() {
350 emf.close();
351 }
352 }