]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgent.java
Introduce JMS based notifications
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgent.java
index 66c0718fc946c29f6a8199990123496357f3633d..e8d3cd83c33796b615733a2eeaf9b2b32dd9c8f4 100644 (file)
@@ -4,6 +4,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.UUID;
 
+import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -12,24 +13,27 @@ import javax.jms.MessageListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.argeo.slc.SlcException;
+import org.argeo.slc.core.runtime.AbstractAgent;
 import org.argeo.slc.process.SlcExecution;
 import org.argeo.slc.runtime.SlcAgent;
 import org.argeo.slc.runtime.SlcAgentDescriptor;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.converter.MessageConverter;
 
 /** JMS based implementation of SLC Agent. */
-public class JmsAgent implements SlcAgent, InitializingBean, DisposableBean {
+public class JmsAgent extends AbstractAgent implements SlcAgent,
+               InitializingBean, DisposableBean, MessageListener {
        private final static Log log = LogFactory.getLog(JmsAgent.class);
 
        private final SlcAgentDescriptor agentDescriptor;
+       private ConnectionFactory connectionFactory;
        private JmsTemplate jmsTemplate;
        private Destination agentRegister;
        private Destination agentUnregister;
 
-       private String agentDestinationPrefix = "agent.";
-       private String agentDestinationBase;
+       private MessageConverter messageConverter;
 
        public JmsAgent() {
                try {
@@ -42,8 +46,10 @@ public class JmsAgent implements SlcAgent, InitializingBean, DisposableBean {
        }
 
        public void afterPropertiesSet() throws Exception {
-               agentDestinationBase = agentDestinationPrefix
-                               + agentDescriptor.getUuid() + ".";
+               // Initialize JMS Template
+               jmsTemplate = new JmsTemplate(connectionFactory);
+               jmsTemplate.setMessageConverter(messageConverter);
+
                jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
                log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
                                + agentRegister);
@@ -51,32 +57,48 @@ public class JmsAgent implements SlcAgent, InitializingBean, DisposableBean {
 
        public void destroy() throws Exception {
                jmsTemplate.convertAndSend(agentUnregister, agentDescriptor);
-               log.info("Agent #" + agentDescriptor.getUuid() + " unregistered to "
-                               + agentRegister);
+               log.info("Agent #" + agentDescriptor.getUuid() + " unregistered from "
+                               + agentUnregister);
        }
 
-       public String actionDestinationName(String action) {
-               return agentDestinationBase + action;
+       public void setAgentRegister(Destination agentRegister) {
+               this.agentRegister = agentRegister;
        }
 
-       public void newExecution(SlcExecution slcExecution) {
-               log.info("Would execute SlcExecution :" + slcExecution);
+       public void setAgentUnregister(Destination agentUnregister) {
+               this.agentUnregister = agentUnregister;
        }
 
-       public void setJmsTemplate(JmsTemplate jmsTemplate) {
-               this.jmsTemplate = jmsTemplate;
+       public void onMessage(Message message) {
+               // FIXME: we filter the messages on the client side,
+               // because of a weird problem with selector since moving to OSGi
+               try {
+                       if (message.getStringProperty("slc-agentId").equals(
+                                       agentDescriptor.getUuid())) {
+                               runSlcExecution((SlcExecution) messageConverter
+                                               .fromMessage(message));
+                       }
+               } catch (JMSException e) {
+                       throw new SlcException("Cannot convert message " + message, e);
+               }
+
        }
 
-       public void setAgentRegister(Destination agentRegister) {
-               this.agentRegister = agentRegister;
+       public String getMessageSelector() {
+               String messageSelector = "slc-agentId=" + agentDescriptor.getUuid()
+                               + "";
+               // String messageSelector = "slc-agentId LIKE '%'";
+               if (log.isDebugEnabled())
+                       log.debug("Message selector: '" + messageSelector + "'");
+               return messageSelector;
        }
 
-       public void setAgentUnregister(Destination agentUnregister) {
-               this.agentUnregister = agentUnregister;
+       public void setMessageConverter(MessageConverter messageConverter) {
+               this.messageConverter = messageConverter;
        }
 
-       public void setAgentDestinationPrefix(String agentDestinationPrefix) {
-               this.agentDestinationPrefix = agentDestinationPrefix;
+       public void setConnectionFactory(ConnectionFactory connectionFactory) {
+               this.connectionFactory = connectionFactory;
        }
 
 }