]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgent.java
Finalize JMS serialization
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgent.java
index 66c0718fc946c29f6a8199990123496357f3633d..f5472b0e5937d6a8626605cbaad2d7eaf499072b 100644 (file)
@@ -1,7 +1,24 @@
+/*
+ * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.argeo.slc.jms;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.List;
 import java.util.UUID;
 
 import javax.jms.Destination;
@@ -12,15 +29,25 @@ import javax.jms.MessageListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.argeo.slc.SlcException;
+import org.argeo.slc.core.runtime.DefaultAgent;
+import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.msg.ExecutionAnswer;
+import org.argeo.slc.msg.MsgConstants;
+import org.argeo.slc.msg.ReferenceList;
 import org.argeo.slc.process.SlcExecution;
-import org.argeo.slc.runtime.SlcAgent;
 import org.argeo.slc.runtime.SlcAgentDescriptor;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.InitializingBean;
+import org.springframework.jms.JmsException;
 import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessagePostProcessor;
 
 /** JMS based implementation of SLC Agent. */
-public class JmsAgent implements SlcAgent, InitializingBean, DisposableBean {
+public class JmsAgent extends DefaultAgent implements InitializingBean,
+               DisposableBean, MessageListener {
+       public final static String PROPERTY_QUERY = "query";
+       public final static String QUERY_PING_ALL = "pingAll";
+
        private final static Log log = LogFactory.getLog(JmsAgent.class);
 
        private final SlcAgentDescriptor agentDescriptor;
@@ -28,8 +55,7 @@ public class JmsAgent implements SlcAgent, InitializingBean, DisposableBean {
        private Destination agentRegister;
        private Destination agentUnregister;
 
-       private String agentDestinationPrefix = "agent.";
-       private String agentDestinationBase;
+       private Destination responseDestination;
 
        public JmsAgent() {
                try {
@@ -42,41 +68,148 @@ public class JmsAgent implements SlcAgent, InitializingBean, DisposableBean {
        }
 
        public void afterPropertiesSet() throws Exception {
-               agentDestinationBase = agentDestinationPrefix
-                               + agentDescriptor.getUuid() + ".";
-               jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
-               log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
-                               + agentRegister);
+               try {
+                       jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
+                       log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
+                                       + agentRegister);
+               } catch (JmsException e) {
+                       log
+                                       .warn("Could not register agent "
+                                                       + agentDescriptor.getUuid()
+                                                       + " to server: "
+                                                       + e.getMessage()
+                                                       + ". The agent will stay offline but will keep listening for a ping all sent by server.");
+                       if (log.isTraceEnabled())
+                               log.debug("Original error.", e);
+               }
        }
 
        public void destroy() throws Exception {
-               jmsTemplate.convertAndSend(agentUnregister, agentDescriptor);
-               log.info("Agent #" + agentDescriptor.getUuid() + " unregistered to "
-                               + agentRegister);
+               try {
+                       jmsTemplate.convertAndSend(agentUnregister, agentDescriptor);
+                       log.info("Agent #" + agentDescriptor.getUuid()
+                                       + " unregistered from " + agentUnregister);
+               } catch (JmsException e) {
+                       log.warn("Could not unregister agent " + agentDescriptor.getUuid()
+                                       + ": " + e.getMessage());
+                       if (log.isTraceEnabled())
+                               log.debug("Original error.", e);
+               }
        }
 
-       public String actionDestinationName(String action) {
-               return agentDestinationBase + action;
+       public void setAgentRegister(Destination agentRegister) {
+               this.agentRegister = agentRegister;
        }
 
-       public void newExecution(SlcExecution slcExecution) {
-               log.info("Would execute SlcExecution :" + slcExecution);
+       public void setAgentUnregister(Destination agentUnregister) {
+               this.agentUnregister = agentUnregister;
        }
 
-       public void setJmsTemplate(JmsTemplate jmsTemplate) {
-               this.jmsTemplate = jmsTemplate;
+       public String getMessageSelector() {
+               String messageSelector = "slc_agentId='" + agentDescriptor.getUuid()
+                               + "'";
+               // if (log.isDebugEnabled())
+               // log.debug("Message selector: " + messageSelector);
+               return messageSelector;
        }
 
-       public void setAgentRegister(Destination agentRegister) {
-               this.agentRegister = agentRegister;
+       public void onMessage(final Message message) {
+               final String query;
+               final String correlationId;
+               try {
+                       query = message.getStringProperty(PROPERTY_QUERY);
+                       correlationId = message.getJMSCorrelationID();
+               } catch (JMSException e1) {
+                       throw new SlcException("Cannot analyze incoming message " + message);
+               }
+
+               final Object response;
+               final Destination destinationSend;
+               if (QUERY_PING_ALL.equals(query)) {
+                       ReferenceList refList = (ReferenceList) convertFrom(message);
+                       if (!refList.getReferences().contains(agentDescriptor.getUuid())) {
+                               response = agentDescriptor;
+                               destinationSend = agentRegister;
+                               log.info("Agent #" + agentDescriptor.getUuid()
+                                               + " registering to " + agentRegister
+                                               + " in reply to a " + QUERY_PING_ALL + " query");
+                       } else {
+                               return;
+                       }
+               } else {
+                       response = process(query, message);
+                       destinationSend = responseDestination;
+               }
+
+               // Send response
+               if (log.isTraceEnabled())
+                       log.trace("About to send response " + response.getClass());
+               jmsTemplate.convertAndSend(destinationSend, response,
+                               new MessagePostProcessor() {
+                                       public Message postProcessMessage(Message messageToSend)
+                                                       throws JMSException {
+                                               messageToSend.setStringProperty(PROPERTY_QUERY, query);
+                                               messageToSend.setStringProperty(
+                                                               MsgConstants.PROPERTY_SLC_AGENT_ID,
+                                                               agentDescriptor.getUuid());
+                                               messageToSend.setJMSCorrelationID(correlationId);
+                                               return messageToSend;
+                                       }
+                               });
+               if (log.isTraceEnabled())
+                       log.debug("Sent response to query '" + query
+                                       + "' with correlationId " + correlationId);
        }
 
-       public void setAgentUnregister(Destination agentUnregister) {
-               this.agentUnregister = agentUnregister;
+       /** @return response */
+       public Object process(String query, Message message) {
+               try {
+                       if ("getExecutionModuleDescriptor".equals(query)) {
+                               String moduleName = message.getStringProperty("moduleName");
+                               String version = message.getStringProperty("version");
+                               return getExecutionModuleDescriptor(moduleName, version);
+                       } else if ("listExecutionModuleDescriptors".equals(query)) {
+
+                               List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
+                               SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
+                                               agentDescriptor);
+                               agentDescriptorToSend.setModuleDescriptors(lst);
+                               return agentDescriptorToSend;
+                       } else if ("runSlcExecution".equals(query)) {
+                               final SlcExecution slcExecution = (SlcExecution) convertFrom(message);
+                               new Thread() {
+                                       public void run() {
+                                               runSlcExecution(slcExecution);
+                                       }
+                               }.start();
+                               return ExecutionAnswer.ok("Execution started on agent "
+                                               + agentDescriptor.getUuid());
+                       } else if ("ping".equals(query)) {
+                               return ExecutionAnswer.ok("Agent " + agentDescriptor.getUuid()
+                                               + " is alive.");
+                       } else {
+                               throw new SlcException("Unsupported query " + query);
+                       }
+               } catch (Exception e) {
+                       log.error("Processing of query " + query + " failed", e);
+                       return ExecutionAnswer.error(e);
+               }
        }
 
-       public void setAgentDestinationPrefix(String agentDestinationPrefix) {
-               this.agentDestinationPrefix = agentDestinationPrefix;
+       protected Object convertFrom(Message message) {
+               try {
+                       return jmsTemplate.getMessageConverter().fromMessage(message);
+               } catch (JMSException e) {
+                       throw new SlcException("Cannot convert message", e);
+               }
+       }
+
+       public void setResponseDestination(Destination responseDestination) {
+               this.responseDestination = responseDestination;
+       }
+
+       public void setJmsTemplate(JmsTemplate jmsTemplate) {
+               this.jmsTemplate = jmsTemplate;
        }
 
 }