View Javadoc
1   package xuml.tools.model.compiler.runtime.actor;
2   
3   import java.util.HashMap;
4   
5   import javax.persistence.EntityManagerFactory;
6   
7   import com.google.common.collect.Maps;
8   
9   import akka.actor.ActorRef;
10  import akka.actor.Props;
11  import akka.actor.UntypedActor;
12  import akka.event.Logging;
13  import akka.event.LoggingAdapter;
14  import xuml.tools.model.compiler.runtime.SignalProcessorListenerFactory;
15  import xuml.tools.model.compiler.runtime.message.ActorConfig;
16  import xuml.tools.model.compiler.runtime.message.CloseEntityActor;
17  import xuml.tools.model.compiler.runtime.message.Signal;
18  import xuml.tools.model.compiler.runtime.message.StopEntityActor;
19  
20  public class RootActor extends UntypedActor {
21  
22      private EntityManagerFactory emf;
23      private final HashMap<String, ActorInfo> actors = Maps.newHashMap();
24      private final LoggingAdapter log;
25      private SignalProcessorListenerFactory listenerFactory;
26  
27      public RootActor() {
28          log = Logging.getLogger(getContext().system(), this);
29      }
30  
31      @Override
32      public void onReceive(Object message) throws Exception {
33          log.debug("received message {}", message.getClass().getName());
34          if (message instanceof ActorConfig) {
35              handleMessage((ActorConfig) message);
36          } else if (message instanceof EntityManagerFactory)
37              handleMessage((EntityManagerFactory) message);
38          else if (message instanceof SignalProcessorListenerFactory)
39              listenerFactory = (SignalProcessorListenerFactory) message;
40          else if (message instanceof Signal)
41              handleMessage((Signal<?>) message);
42          else if (message instanceof CloseEntityActor)
43              handleMessage((CloseEntityActor) message);
44      }
45  
46      private void handleMessage(ActorConfig message) {
47      }
48  
49      private void handleMessage(CloseEntityActor message) {
50          String key = message.getEntityUniqueId();
51          ActorInfo info = actors.remove(key);
52          if (info.counter > 1) {
53              actors.put(key, info.decrement());
54          } else {
55              // when the counter gets down to 1 we stop the entity actor
56              info.actor.tell(new StopEntityActor(), getSelf());
57          }
58      }
59  
60      private void handleMessage(EntityManagerFactory message) {
61          emf = message;
62      }
63  
64      private void handleMessage(Signal<?> message) {
65          String key = message.getEntityUniqueId();
66          ActorRef actor = getActor(key);
67          actor.tell(message, getSelf());
68      }
69  
70      private ActorRef getActor(String key) {
71          ActorInfo info = actors.get(key);
72          if (info == null) {
73              ActorRef actor = createActor(key);
74              actors.put(key, new ActorInfo(actor, 1));
75              actor.tell(emf, getSelf());
76              if (listenerFactory != null)
77                  actor.tell(listenerFactory.create(key), getSelf());
78          } else {
79              actors.put(key, info.increment());
80          }
81          return actors.get(key).actor;
82      }
83  
84      private ActorRef createActor(String key) {
85          return getContext()
86                  .actorOf(Props.create(EntityActor.class).withDispatcher("akka.entity-dispatcher"));
87      }
88  
89      private static final class ActorInfo {
90  
91          final ActorRef actor;
92          final long counter;
93  
94          ActorInfo(ActorRef actor, long counter) {
95              this.actor = actor;
96              this.counter = counter;
97          }
98  
99          ActorInfo increment() {
100             return new ActorInfo(actor, counter + 1);
101         }
102 
103         ActorInfo decrement() {
104             return new ActorInfo(actor, counter - 1);
105         }
106 
107     }
108 
109 }