1 package xuml.tools.model.compiler.runtime.actor;
2
3 import javax.persistence.EntityManager;
4 import javax.persistence.EntityManagerFactory;
5 import javax.persistence.EntityTransaction;
6
7 import akka.actor.UntypedActor;
8 import akka.event.Logging;
9 import akka.event.LoggingAdapter;
10 import xuml.tools.model.compiler.runtime.Entity;
11 import xuml.tools.model.compiler.runtime.QueuedSignal;
12 import xuml.tools.model.compiler.runtime.SignalProcessorListener;
13 import xuml.tools.model.compiler.runtime.SignalProcessorListenerDoesNothing;
14 import xuml.tools.model.compiler.runtime.message.CloseEntityActor;
15 import xuml.tools.model.compiler.runtime.message.Signal;
16 import xuml.tools.model.compiler.runtime.message.StopEntityActor;
17
18 public class EntityActor extends UntypedActor {
19
20 private EntityManagerFactory emf;
21 private final LoggingAdapter log;
22 private SignalProcessorListener listener = SignalProcessorListenerDoesNothing.getInstance();
23
24 public EntityActor() {
25 log = Logging.getLogger(getContext().system(), this);
26 }
27
28 @Override
29 public void onReceive(Object message) throws Exception {
30 log.debug("received message {}", message.getClass().getName());
31 if (message instanceof EntityManagerFactory)
32 handleMessage((EntityManagerFactory) message);
33 else if (message instanceof SignalProcessorListener)
34 listener = (SignalProcessorListener) message;
35 else if (message instanceof Signal) {
36 handleMessage((Signal<?>) message);
37 } else if (message instanceof StopEntityActor) {
38 getContext().stop(getSelf());
39 }
40 }
41
42 @SuppressWarnings("unchecked")
43 private void handleMessage(@SuppressWarnings("rawtypes") Signal signal) {
44 if (emf != null) {
45
46
47 EntityManager em = null;
48 EntityTransaction tx = null;
49 Entity<?> entity = null;
50 try {
51 listener.beforeProcessing(signal, this);
52 em = emf.createEntityManager();
53 tx = em.getTransaction();
54 tx.begin();
55 log.debug("started transaction");
56 entity = (Entity<?>) em.find(signal.getEntityClass(), signal.getEntityId());
57 log.debug("calling event {} on entity id = ",
58 signal.getEvent().getClass().getSimpleName(), signal.getEntityId());
59 entity.helper().setEntityManager(em);
60 entity.event(signal.getEvent());
61 log.debug("removing signal from persistence signalId={}, entityId={}",
62 signal.getId(), signal.getEntityId());
63 int countDeleted = em
64 .createQuery("delete from " + QueuedSignal.class.getSimpleName()
65 + " where id=:id")
66 .setParameter("id", signal.getId()).executeUpdate();
67 if (countDeleted == 0) {
68 throw new RuntimeException("queued signal not deleted: " + signal.getId());
69 }
70 tx.commit();
71 log.debug("committed");
72 listener.afterProcessing(signal, this);
73 em.close();
74 entity.helper().setEntityManager(null);
75
76
77 entity.helper().sendQueuedSignals();
78 } catch (RuntimeException e) {
79 handleException(signal, em, tx, e);
80 } finally {
81
82
83 if (entity != null) {
84 entity.helper().setEntityManager(null);
85 }
86
87 getSender().tell(new CloseEntityActor(signal.getEntityUniqueId()), getSelf());
88 }
89 }
90 }
91
92 private void handleException(@SuppressWarnings("rawtypes") Signal signal, EntityManager em,
93 EntityTransaction tx, RuntimeException e) {
94 try {
95 if (tx != null && tx.isActive()) {
96 tx.rollback();
97 }
98 if (em != null && em.isOpen()) {
99 em.close();
100 }
101 listener.failure(signal, e, this);
102 } catch (RuntimeException e2) {
103 log.error(e2.getMessage(), e2);
104 throw e;
105 }
106 }
107
108 private void handleMessage(EntityManagerFactory message) {
109 this.emf = message;
110 }
111
112 }