]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgent.java
Improve detached launcher
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgent.java
index 7aff0ab20ab3315f21569251d667a82d85a1d042..baddb1c604f075f4af5a5cc7667531331f48631a 100644 (file)
@@ -2,28 +2,50 @@ package org.argeo.slc.jms;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.argeo.slc.SlcException;
+import org.argeo.slc.core.runtime.AbstractAgent;
+import org.argeo.slc.execution.ExecutionModule;
+import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.process.SlcExecution;
 import org.argeo.slc.runtime.SlcAgent;
 import org.argeo.slc.runtime.SlcAgentDescriptor;
+import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.listener.SessionAwareMessageListener;
+import org.springframework.jms.support.converter.MessageConversionException;
 
 /** JMS based implementation of SLC Agent. */
-public class JmsAgent implements SlcAgent, MessageListener, InitializingBean {
+public class JmsAgent extends AbstractAgent implements SlcAgent,
+               InitializingBean, DisposableBean, SessionAwareMessageListener {
+       public final static String PROPERTY_QUERY = "query";
+       public final static String PROPERTY_SLC_AGENT_ID = "slc_agentId";
+
        private final static Log log = LogFactory.getLog(JmsAgent.class);
 
        private final SlcAgentDescriptor agentDescriptor;
+       // private ConnectionFactory connectionFactory;
        private JmsTemplate jmsTemplate;
        private Destination agentRegister;
+       private Destination agentUnregister;
+
+       // private Destination requestDestination;
+       private Destination responseDestination;
+
+       // private MessageConverter messageConverter;
 
        public JmsAgent() {
                try {
@@ -36,26 +58,119 @@ public class JmsAgent implements SlcAgent, MessageListener, InitializingBean {
        }
 
        public void afterPropertiesSet() throws Exception {
+               // Initialize JMS Template
+               // jmsTemplate = new JmsTemplate(connectionFactory);
+               // jmsTemplate.setMessageConverter(messageConverter);
+
                jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
                log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
                                + agentRegister);
        }
 
-       public void onMessage(Message message) {
-               try {
-                       log.info("Received message " + message.getJMSMessageID());
-               } catch (JMSException e) {
-                       e.printStackTrace();
+       public void destroy() throws Exception {
+               jmsTemplate.convertAndSend(agentUnregister, agentDescriptor);
+               log.info("Agent #" + agentDescriptor.getUuid() + " unregistered from "
+                               + agentUnregister);
+       }
+
+       public void setAgentRegister(Destination agentRegister) {
+               this.agentRegister = agentRegister;
+       }
+
+       public void setAgentUnregister(Destination agentUnregister) {
+               this.agentUnregister = agentUnregister;
+       }
+
+       public String getMessageSelector() {
+               String messageSelector = "slc_agentId='" + agentDescriptor.getUuid()
+                               + "'";
+               // if (log.isDebugEnabled())
+               // log.debug("Message selector: " + messageSelector);
+               return messageSelector;
+       }
+
+       public void onMessage(Message message, Session session) throws JMSException {
+               MessageProducer producer = session.createProducer(responseDestination);
+               String query = message.getStringProperty(PROPERTY_QUERY);
+               String correlationId = message.getJMSCorrelationID();
+               if (log.isDebugEnabled())
+                       log.debug("Received query " + query + " with correlationId "
+                                       + correlationId);
+
+               Message responseMsg = null;
+               if ("getExecutionModuleDescriptor".equals(query)) {
+                       String moduleName = message.getStringProperty("moduleName");
+                       String version = message.getStringProperty("version");
+                       ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
+                                       moduleName, version);
+                       responseMsg = jmsTemplate.getMessageConverter().toMessage(emd,
+                                       session);
+               } else if ("listExecutionModuleDescriptors".equals(query)) {
+
+                       List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
+                       SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
+                                       agentDescriptor);
+                       agentDescriptorToSend.setModuleDescriptors(lst);
+                       responseMsg = jmsTemplate.getMessageConverter().toMessage(
+                                       agentDescriptorToSend, session);
+               } else if ("newExecution".equals(query)) {
+
+                       SlcExecution slcExecution = (SlcExecution) jmsTemplate
+                                       .getMessageConverter().fromMessage(message);
+                       runSlcExecution(slcExecution);
+                       return;
+               } else {
+                       // try {
+                       // // FIXME: generalize
+                       // SlcExecution slcExecution = (SlcExecution) jmsTemplate
+                       // .getMessageConverter().fromMessage(message);
+                       // runSlcExecution(slcExecution);
+                       // } catch (MessageConversionException e) {
+                       // if (log.isDebugEnabled())
+                       // log.debug("Unsupported query " + query, e);
+                       // }
+                       if (log.isDebugEnabled())
+                               log.debug("Unsupported query " + query);
+                       return;
+               }
+
+               if (responseMsg != null) {
+                       responseMsg.setJMSCorrelationID(correlationId);
+                       producer.send(responseMsg);
+                       if (log.isDebugEnabled())
+                               log.debug("Sent response to query " + query
+                                               + " with correlationId " + correlationId + ": "
+                                               + responseMsg);
                }
 
        }
 
-       public void setJmsTemplate(JmsTemplate jmsTemplate) {
-               this.jmsTemplate = jmsTemplate;
+       public ExecutionModuleDescriptor getExecutionModuleDescriptor(
+                       String moduleName, String version) {
+               return getModulesManager().getExecutionModuleDescriptor(moduleName,
+                               version);
        }
 
-       public void setAgentRegister(Destination agentRegister) {
-               this.agentRegister = agentRegister;
+       public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
+               List<ExecutionModule> modules = getModulesManager()
+                               .listExecutionModules();
+
+               List<ExecutionModuleDescriptor> descriptors = new ArrayList<ExecutionModuleDescriptor>();
+               for (ExecutionModule module : modules) {
+                       ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
+                       md.setName(module.getName());
+                       md.setVersion(module.getVersion());
+                       descriptors.add(md);
+               }
+               return descriptors;
+       }
+
+       public void setResponseDestination(Destination responseDestination) {
+               this.responseDestination = responseDestination;
+       }
+
+       public void setJmsTemplate(JmsTemplate jmsTemplate) {
+               this.jmsTemplate = jmsTemplate;
        }
 
 }