]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgent.java
Replace executable by runnable
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgent.java
index baddb1c604f075f4af5a5cc7667531331f48631a..90818a572d0524c87a2bd7241560313af0204aa4 100644 (file)
@@ -9,9 +9,7 @@ import java.util.UUID;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
+import javax.jms.MessageListener;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -19,34 +17,33 @@ import org.argeo.slc.SlcException;
 import org.argeo.slc.core.runtime.AbstractAgent;
 import org.argeo.slc.execution.ExecutionModule;
 import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.msg.ExecutionAnswer;
+import org.argeo.slc.msg.ReferenceList;
 import org.argeo.slc.process.SlcExecution;
 import org.argeo.slc.runtime.SlcAgent;
 import org.argeo.slc.runtime.SlcAgentDescriptor;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.listener.SessionAwareMessageListener;
-import org.springframework.jms.support.converter.MessageConversionException;
+import org.springframework.jms.core.MessagePostProcessor;
 
 /** JMS based implementation of SLC Agent. */
 public class JmsAgent extends AbstractAgent implements SlcAgent,
-               InitializingBean, DisposableBean, SessionAwareMessageListener {
+               InitializingBean, DisposableBean, MessageListener {
        public final static String PROPERTY_QUERY = "query";
        public final static String PROPERTY_SLC_AGENT_ID = "slc_agentId";
 
+       public final static String QUERY_PING_ALL = "pingAll";
+
        private final static Log log = LogFactory.getLog(JmsAgent.class);
 
        private final SlcAgentDescriptor agentDescriptor;
-       // private ConnectionFactory connectionFactory;
        private JmsTemplate jmsTemplate;
        private Destination agentRegister;
        private Destination agentUnregister;
 
-       // private Destination requestDestination;
        private Destination responseDestination;
 
-       // private MessageConverter messageConverter;
-
        public JmsAgent() {
                try {
                        agentDescriptor = new SlcAgentDescriptor();
@@ -58,10 +55,6 @@ public class JmsAgent extends AbstractAgent implements SlcAgent,
        }
 
        public void afterPropertiesSet() throws Exception {
-               // Initialize JMS Template
-               // jmsTemplate = new JmsTemplate(connectionFactory);
-               // jmsTemplate.setMessageConverter(messageConverter);
-
                jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
                log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
                                + agentRegister);
@@ -89,62 +82,6 @@ public class JmsAgent extends AbstractAgent implements SlcAgent,
                return messageSelector;
        }
 
-       public void onMessage(Message message, Session session) throws JMSException {
-               MessageProducer producer = session.createProducer(responseDestination);
-               String query = message.getStringProperty(PROPERTY_QUERY);
-               String correlationId = message.getJMSCorrelationID();
-               if (log.isDebugEnabled())
-                       log.debug("Received query " + query + " with correlationId "
-                                       + correlationId);
-
-               Message responseMsg = null;
-               if ("getExecutionModuleDescriptor".equals(query)) {
-                       String moduleName = message.getStringProperty("moduleName");
-                       String version = message.getStringProperty("version");
-                       ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
-                                       moduleName, version);
-                       responseMsg = jmsTemplate.getMessageConverter().toMessage(emd,
-                                       session);
-               } else if ("listExecutionModuleDescriptors".equals(query)) {
-
-                       List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
-                       SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
-                                       agentDescriptor);
-                       agentDescriptorToSend.setModuleDescriptors(lst);
-                       responseMsg = jmsTemplate.getMessageConverter().toMessage(
-                                       agentDescriptorToSend, session);
-               } else if ("newExecution".equals(query)) {
-
-                       SlcExecution slcExecution = (SlcExecution) jmsTemplate
-                                       .getMessageConverter().fromMessage(message);
-                       runSlcExecution(slcExecution);
-                       return;
-               } else {
-                       // try {
-                       // // FIXME: generalize
-                       // SlcExecution slcExecution = (SlcExecution) jmsTemplate
-                       // .getMessageConverter().fromMessage(message);
-                       // runSlcExecution(slcExecution);
-                       // } catch (MessageConversionException e) {
-                       // if (log.isDebugEnabled())
-                       // log.debug("Unsupported query " + query, e);
-                       // }
-                       if (log.isDebugEnabled())
-                               log.debug("Unsupported query " + query);
-                       return;
-               }
-
-               if (responseMsg != null) {
-                       responseMsg.setJMSCorrelationID(correlationId);
-                       producer.send(responseMsg);
-                       if (log.isDebugEnabled())
-                               log.debug("Sent response to query " + query
-                                               + " with correlationId " + correlationId + ": "
-                                               + responseMsg);
-               }
-
-       }
-
        public ExecutionModuleDescriptor getExecutionModuleDescriptor(
                        String moduleName, String version) {
                return getModulesManager().getExecutionModuleDescriptor(moduleName,
@@ -165,6 +102,102 @@ public class JmsAgent extends AbstractAgent implements SlcAgent,
                return descriptors;
        }
 
+       public boolean ping() {
+               return true;
+       }
+
+       public void onMessage(final Message message) {
+               final String query;
+               final String correlationId;
+               try {
+                       query = message.getStringProperty(PROPERTY_QUERY);
+                       correlationId = message.getJMSCorrelationID();
+               } catch (JMSException e1) {
+                       throw new SlcException("Cannot analyze incoming message " + message);
+               }
+
+               final Object response;
+               final Destination destinationSend;
+               if (QUERY_PING_ALL.equals(query)) {
+                       ReferenceList refList = (ReferenceList) convertFrom(message);
+                       if (!refList.getReferences().contains(agentDescriptor.getUuid())) {
+                               response = agentDescriptor;
+                               destinationSend = agentRegister;
+                               log.info("Agent #" + agentDescriptor.getUuid()
+                                               + " registering to " + agentRegister
+                                               + " in reply to a " + QUERY_PING_ALL + " query");
+                       } else {
+                               return;
+                       }
+               } else {
+                       response = process(query, message);
+                       destinationSend = responseDestination;
+               }
+
+               new Thread() {
+                       public void run() {
+                               // Send response
+                               jmsTemplate.convertAndSend(destinationSend, response,
+                                               new MessagePostProcessor() {
+                                                       public Message postProcessMessage(
+                                                                       Message messageToSend) throws JMSException {
+                                                               messageToSend.setStringProperty(PROPERTY_QUERY,
+                                                                               query);
+                                                               messageToSend.setStringProperty(
+                                                                               PROPERTY_SLC_AGENT_ID, agentDescriptor
+                                                                                               .getUuid());
+                                                               messageToSend
+                                                                               .setJMSCorrelationID(correlationId);
+                                                               return messageToSend;
+                                                       }
+                                               });
+                               if (log.isDebugEnabled())
+                                       log.debug("Sent response to query '" + query
+                                                       + "' with correlationId " + correlationId);
+                       }
+               }.start();
+
+       }
+
+       /** @return response */
+       public Object process(String query, Message message) {
+               try {
+                       if ("getExecutionModuleDescriptor".equals(query)) {
+                               String moduleName = message.getStringProperty("moduleName");
+                               String version = message.getStringProperty("version");
+                               return getExecutionModuleDescriptor(moduleName, version);
+                       } else if ("listExecutionModuleDescriptors".equals(query)) {
+
+                               List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
+                               SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
+                                               agentDescriptor);
+                               agentDescriptorToSend.setModuleDescriptors(lst);
+                               return agentDescriptorToSend;
+                       } else if ("runSlcExecution".equals(query)) {
+                               SlcExecution slcExecution = (SlcExecution) convertFrom(message);
+                               runSlcExecution(slcExecution);
+                               return ExecutionAnswer.ok("Execution started on agent "
+                                               + agentDescriptor.getUuid());
+                       } else if ("ping".equals(query)) {
+                               return ExecutionAnswer.ok("Agent " + agentDescriptor.getUuid()
+                                               + " is alive.");
+                       } else {
+                               throw new SlcException("Unsupported query " + query);
+                       }
+               } catch (Exception e) {
+                       log.error("Processing of query " + query + " failed", e);
+                       return ExecutionAnswer.error(e);
+               }
+       }
+
+       protected Object convertFrom(Message message) {
+               try {
+                       return jmsTemplate.getMessageConverter().fromMessage(message);
+               } catch (JMSException e) {
+                       throw new SlcException("Cannot convert message", e);
+               }
+       }
+
        public void setResponseDestination(Destination responseDestination) {
                this.responseDestination = responseDestination;
        }