Make jms more robust
authorMathieu Baudier <mbaudier@argeo.org>
Tue, 28 Apr 2009 08:14:49 +0000 (08:14 +0000)
committerMathieu Baudier <mbaudier@argeo.org>
Tue, 28 Apr 2009 08:14:49 +0000 (08:14 +0000)
git-svn-id: https://svn.argeo.org/slc/trunk@2386 4cfe0d0a-d680-48aa-b62c-e0a02a3f76cc

18 files changed:
demo/pom.xml
runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/services/impl/runtime/AgentServiceImpl.java
runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/dao/runtime/SlcAgentDescriptorDao.java
runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/runtime/SlcAgent.java
runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgent.java
runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgentProxy.java
runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsTransferNewExecution.java
runtime/org.argeo.slc.support.hibernate/src/main/java/org/argeo/slc/hibernate/runtime/SlcAgentDescriptorDaoHibernate.java
runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/runtime/AbstractAgent.java
runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/msg/ExecutionAnswer.java
server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.activemq/META-INF/MANIFEST.MF
server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.activemq/META-INF/spring/activemq-osgi.xml
server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.activemq/META-INF/spring/activemq.xml
server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.jdbc/META-INF/MANIFEST.MF
server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.jms/META-INF/spring/jms-osgi.xml
server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.jms/META-INF/spring/jms.xml
server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.services/META-INF/spring/services-osgi.xml
server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.services/META-INF/spring/services.xml

index e4f569132e78ba7214314cde18b60f0e5d369e3d..7265e29fca6d6c69cf628e7d1b29f63942d14070 100644 (file)
@@ -58,7 +58,9 @@
                                                        <execDir>target/exec/server</execDir>
                                                        <jvmArgs>
                                                                <jvmArg>-Xmx256m</jvmArg>
-                                                               <!-- <jvmArg>-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000</jvmArg> -->    
+                                                               <!--
+                                                                       <jvmArg>-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000</jvmArg>
+                                                               -->
                                                        </jvmArgs>
                                                        <systemProperties>
                                                                <slc.osgi.start>
@@ -91,7 +93,9 @@
                                                        <execDir>target/exec/agent</execDir>
                                                        <jvmArgs>
                                                                <jvmArg>-Xmx128m</jvmArg>
-       <!--                                                    <jvmArg>-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8001</jvmArg> --> 
+                                                               <!--
+                                                                       <jvmArg>-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8001</jvmArg>
+                                                               -->
                                                        </jvmArgs>
                                                        <systemProperties>
                                                                <slc.osgi.start>
                                </plugins>
                        </build>
                </profile>
+               <profile>
+                       <id>server_mysql</id>
+                       <build>
+                               <plugins>
+                                       <plugin>
+                                               <groupId>org.argeo.slc.maven</groupId>
+                                               <artifactId>maven-argeo-osgi-plugin</artifactId>
+                                               <configuration>
+                                                       <execDir>target/exec/server</execDir>
+                                                       <jvmArgs>
+                                                               <jvmArg>-Xmx256m</jvmArg>
+                                                               <!--
+                                                                       <jvmArg>-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000</jvmArg>
+                                                               -->
+                                                       </jvmArgs>
+                                                       <systemProperties>
+                                                               <slc.osgi.start>
+                                                                       org.argeo.dep.osgi.catalina.start,
+                                                                       org.springframework.osgi.extender,
+                                                                       org.springframework.osgi.web.extender,
+                                                                       org.springframework.osgi.samples.simplewebapp,
+                                                                       org.argeo.slc.server.activemq,
+                                                                       org.argeo.slc.server.mysql,
+                                                                       org.argeo.slc.server.hibernate,
+                                                                       org.argeo.slc.server.services,
+                                                                       org.argeo.slc.server.jms,
+                                                                       org.argeo.slc.webapp,
+                                                                       org.argeo.slc.ria
+                                                               </slc.osgi.start>
+                                                       </systemProperties>
+                                               </configuration>
+                                       </plugin>
+                               </plugins>
+                       </build>
+               </profile>
        </profiles>
 </project>
index 0f127269fbee00d1d671a943fd35133ab8716770..1f2397a09be8f55f94281a19b6e214c5091f78f4 100644 (file)
@@ -1,18 +1,33 @@
 package org.argeo.slc.services.impl.runtime;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.argeo.slc.dao.runtime.SlcAgentDescriptorDao;
+import org.argeo.slc.runtime.SlcAgent;
 import org.argeo.slc.runtime.SlcAgentDescriptor;
+import org.argeo.slc.runtime.SlcAgentFactory;
 import org.argeo.slc.services.runtime.AgentService;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
 
-public class AgentServiceImpl implements AgentService {
+public class AgentServiceImpl implements AgentService, InitializingBean,
+               DisposableBean {
        private final static Log log = LogFactory.getLog(AgentServiceImpl.class);
 
        private final SlcAgentDescriptorDao slcAgentDescriptorDao;
+       private final SlcAgentFactory agentFactory;
+
+       private Long pingCycle = 60000l;
 
-       public AgentServiceImpl(SlcAgentDescriptorDao slcAgentDescriptorDao) {
+       private Boolean pingThreadActive = true;
+
+       public AgentServiceImpl(SlcAgentDescriptorDao slcAgentDescriptorDao,
+                       SlcAgentFactory agentFactory) {
                this.slcAgentDescriptorDao = slcAgentDescriptorDao;
+               this.agentFactory = agentFactory;
        }
 
        public void register(SlcAgentDescriptor slcAgentDescriptor) {
@@ -25,4 +40,49 @@ public class AgentServiceImpl implements AgentService {
                log.info("Unregistered agent #" + slcAgentDescriptor.getUuid());
        }
 
+       public void afterPropertiesSet() throws Exception {
+               if (pingCycle > 0)
+                       new PingThread().start();
+       }
+
+       public synchronized void destroy() throws Exception {
+               pingThreadActive = false;
+               notifyAll();
+       }
+
+       public void setPingCycle(Long pingCycle) {
+               this.pingCycle = pingCycle;
+       }
+
+       protected class PingThread extends Thread {
+               public void run() {
+                       while (pingThreadActive) {
+                               List<SlcAgentDescriptor> lst = slcAgentDescriptorDao
+                                               .listSlcAgentDescriptors();
+                               List<String> agentIds = new ArrayList<String>();
+                               for (SlcAgentDescriptor ad : lst)
+                                       agentIds.add(ad.getUuid());
+
+                               if (log.isDebugEnabled())
+                                       log.debug("Ping " + agentIds.size() + " agent.");
+                               for (String agentId : agentIds) {
+                                       SlcAgent agent = agentFactory.getAgent(agentId);
+                                       if (!agent.ping()) {
+                                               log.info("Agent " + agentId + " did not reply to ping,"
+                                                               + " removing its descriptor...");
+                                               slcAgentDescriptorDao.delete(agentId);
+                                       }
+                               }
+
+                               synchronized (AgentServiceImpl.this) {
+                                       try {
+                                               AgentServiceImpl.this.wait(pingCycle);
+                                       } catch (InterruptedException e) {
+                                               // silent
+                                       }
+                               }
+                       }
+               }
+
+       }
 }
index 3709968c39a4df95835fd6e64ce04557e0cf6e33..cb8184dc99233dd93e823d823ac76032376dc303 100644 (file)
@@ -9,6 +9,9 @@ public interface SlcAgentDescriptorDao {
 
        public void delete(SlcAgentDescriptor slcAgentDescriptor);
 
+       public void delete(String agentId);
+
        public List<SlcAgentDescriptor> listSlcAgentDescriptors();
 
+       public SlcAgentDescriptor getAgentDescriptor(String agentId);
 }
index a0bb783d00f040b4f7d3499403e67515cab5fcd1..b97aa6e904d87f8cd4525633e53a5083a1e3b762 100644 (file)
@@ -3,6 +3,7 @@ package org.argeo.slc.runtime;
 import java.util.List;
 
 import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.process.SlcExecution;
 
 /** A local agent, able to run SLC Execution locally. */
 public interface SlcAgent {
@@ -11,4 +12,8 @@ public interface SlcAgent {
 
        public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors();
 
+       public void runSlcExecution(SlcExecution slcExecution);
+
+       /** @return true if still alive. */
+       public boolean ping();
 }
index baddb1c604f075f4af5a5cc7667531331f48631a..6882ebaa45b3e16bb7be69d8cf05fac52b285dfc 100644 (file)
@@ -9,8 +9,7 @@ import java.util.UUID;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
+import javax.jms.MessageListener;
 import javax.jms.TextMessage;
 
 import org.apache.commons.logging.Log;
@@ -19,34 +18,30 @@ import org.argeo.slc.SlcException;
 import org.argeo.slc.core.runtime.AbstractAgent;
 import org.argeo.slc.execution.ExecutionModule;
 import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.msg.ExecutionAnswer;
 import org.argeo.slc.process.SlcExecution;
 import org.argeo.slc.runtime.SlcAgent;
 import org.argeo.slc.runtime.SlcAgentDescriptor;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.listener.SessionAwareMessageListener;
-import org.springframework.jms.support.converter.MessageConversionException;
+import org.springframework.jms.core.MessagePostProcessor;
 
 /** JMS based implementation of SLC Agent. */
 public class JmsAgent extends AbstractAgent implements SlcAgent,
-               InitializingBean, DisposableBean, SessionAwareMessageListener {
+               InitializingBean, DisposableBean, MessageListener {
        public final static String PROPERTY_QUERY = "query";
        public final static String PROPERTY_SLC_AGENT_ID = "slc_agentId";
 
        private final static Log log = LogFactory.getLog(JmsAgent.class);
 
        private final SlcAgentDescriptor agentDescriptor;
-       // private ConnectionFactory connectionFactory;
        private JmsTemplate jmsTemplate;
        private Destination agentRegister;
        private Destination agentUnregister;
 
-       // private Destination requestDestination;
        private Destination responseDestination;
 
-       // private MessageConverter messageConverter;
-
        public JmsAgent() {
                try {
                        agentDescriptor = new SlcAgentDescriptor();
@@ -58,10 +53,6 @@ public class JmsAgent extends AbstractAgent implements SlcAgent,
        }
 
        public void afterPropertiesSet() throws Exception {
-               // Initialize JMS Template
-               // jmsTemplate = new JmsTemplate(connectionFactory);
-               // jmsTemplate.setMessageConverter(messageConverter);
-
                jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
                log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
                                + agentRegister);
@@ -89,62 +80,6 @@ public class JmsAgent extends AbstractAgent implements SlcAgent,
                return messageSelector;
        }
 
-       public void onMessage(Message message, Session session) throws JMSException {
-               MessageProducer producer = session.createProducer(responseDestination);
-               String query = message.getStringProperty(PROPERTY_QUERY);
-               String correlationId = message.getJMSCorrelationID();
-               if (log.isDebugEnabled())
-                       log.debug("Received query " + query + " with correlationId "
-                                       + correlationId);
-
-               Message responseMsg = null;
-               if ("getExecutionModuleDescriptor".equals(query)) {
-                       String moduleName = message.getStringProperty("moduleName");
-                       String version = message.getStringProperty("version");
-                       ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
-                                       moduleName, version);
-                       responseMsg = jmsTemplate.getMessageConverter().toMessage(emd,
-                                       session);
-               } else if ("listExecutionModuleDescriptors".equals(query)) {
-
-                       List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
-                       SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
-                                       agentDescriptor);
-                       agentDescriptorToSend.setModuleDescriptors(lst);
-                       responseMsg = jmsTemplate.getMessageConverter().toMessage(
-                                       agentDescriptorToSend, session);
-               } else if ("newExecution".equals(query)) {
-
-                       SlcExecution slcExecution = (SlcExecution) jmsTemplate
-                                       .getMessageConverter().fromMessage(message);
-                       runSlcExecution(slcExecution);
-                       return;
-               } else {
-                       // try {
-                       // // FIXME: generalize
-                       // SlcExecution slcExecution = (SlcExecution) jmsTemplate
-                       // .getMessageConverter().fromMessage(message);
-                       // runSlcExecution(slcExecution);
-                       // } catch (MessageConversionException e) {
-                       // if (log.isDebugEnabled())
-                       // log.debug("Unsupported query " + query, e);
-                       // }
-                       if (log.isDebugEnabled())
-                               log.debug("Unsupported query " + query);
-                       return;
-               }
-
-               if (responseMsg != null) {
-                       responseMsg.setJMSCorrelationID(correlationId);
-                       producer.send(responseMsg);
-                       if (log.isDebugEnabled())
-                               log.debug("Sent response to query " + query
-                                               + " with correlationId " + correlationId + ": "
-                                               + responseMsg);
-               }
-
-       }
-
        public ExecutionModuleDescriptor getExecutionModuleDescriptor(
                        String moduleName, String version) {
                return getModulesManager().getExecutionModuleDescriptor(moduleName,
@@ -165,6 +100,85 @@ public class JmsAgent extends AbstractAgent implements SlcAgent,
                return descriptors;
        }
 
+       public boolean ping() {
+               return true;
+       }
+
+       public void onMessage(final Message message) {
+               final String query;
+               final String correlationId;
+               try {
+                       query = message.getStringProperty(PROPERTY_QUERY);
+                       correlationId = message.getJMSCorrelationID();
+               } catch (JMSException e1) {
+                       throw new SlcException("Cannot analyze incoming message " + message);
+               }
+
+               final Object response = process(query, message);
+
+               new Thread() {
+                       public void run() {
+                               // Send response
+                               jmsTemplate.convertAndSend(responseDestination, response,
+                                               new MessagePostProcessor() {
+                                                       public Message postProcessMessage(
+                                                                       Message messageToSend) throws JMSException {
+                                                               messageToSend
+                                                                               .setStringProperty(
+                                                                                               PROPERTY_QUERY,
+                                                                                               message
+                                                                                                               .getStringProperty(PROPERTY_QUERY));
+                                                               messageToSend.setStringProperty(
+                                                                               PROPERTY_SLC_AGENT_ID, agentDescriptor
+                                                                                               .getUuid());
+                                                               messageToSend.setJMSCorrelationID(message
+                                                                               .getJMSCorrelationID());
+                                                               return messageToSend;
+                                                       }
+                                               });
+                               if (log.isDebugEnabled())
+                                       log.debug("Sent response to query " + query
+                                                       + " with correlationId " + correlationId);
+                       }
+               }.start();
+
+       }
+
+       /** @return response */
+       public Object process(String query, Message message) {
+               try {
+                       if ("getExecutionModuleDescriptor".equals(query)) {
+                               String moduleName = message.getStringProperty("moduleName");
+                               String version = message.getStringProperty("version");
+                               return getExecutionModuleDescriptor(moduleName, version);
+                       } else if ("listExecutionModuleDescriptors".equals(query)) {
+
+                               List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
+                               SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
+                                               agentDescriptor);
+                               agentDescriptorToSend.setModuleDescriptors(lst);
+                               return agentDescriptorToSend;
+                       } else if ("runSlcExecution".equals(query)) {
+                               SlcExecution slcExecution = (SlcExecution) convertFrom(message);
+                               runSlcExecution(slcExecution);
+                               return ExecutionAnswer.ok("Execution started on agent "
+                                               + agentDescriptor.getUuid());
+                       } else if ("ping".equals(query)) {
+                               return ExecutionAnswer.ok("Agent " + agentDescriptor.getUuid()
+                                               + " is alive.");
+                       } else {
+                               throw new SlcException("Unsupported query " + query);
+                       }
+               } catch (Exception e) {
+                       log.error("Processing of query " + query + " failed", e);
+                       return ExecutionAnswer.error(e);
+               }
+       }
+
+       protected Object convertFrom(Message message) throws JMSException {
+               return jmsTemplate.getMessageConverter().fromMessage(message);
+       }
+
        public void setResponseDestination(Destination responseDestination) {
                this.responseDestination = responseDestination;
        }
index 66cf3920e7799e8415606c3fbf67a9a2601ea1bc..8837a8155993cdc4c9a04954f45e228c848e3961 100644 (file)
@@ -13,6 +13,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.argeo.slc.SlcException;
 import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.msg.ExecutionAnswer;
+import org.argeo.slc.process.SlcExecution;
 import org.argeo.slc.runtime.SlcAgent;
 import org.argeo.slc.runtime.SlcAgentDescriptor;
 import org.springframework.jms.core.JmsTemplate;
@@ -40,7 +42,7 @@ public class JmsAgentProxy implements SlcAgent {
 
        public ExecutionModuleDescriptor getExecutionModuleDescriptor(
                        final String moduleName, final String version) {
-               return (ExecutionModuleDescriptor) sendReceive(new AgentProxyMessageCreator(
+               return (ExecutionModuleDescriptor) sendReceive(new AgentMC(
                                "getExecutionModuleDescriptor") {
                        public void setArguments(Message message) throws JMSException {
                                message.setStringProperty("moduleName", moduleName);
@@ -50,46 +52,112 @@ public class JmsAgentProxy implements SlcAgent {
        }
 
        public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
-               return ((SlcAgentDescriptor) sendReceive(new AgentProxyMessageCreator(
+               return ((SlcAgentDescriptor) sendReceive(new AgentMC(
                                "listExecutionModuleDescriptors"))).getModuleDescriptors();
        }
 
-       protected Object sendReceive(AgentProxyMessageCreator messageCreator) {
+       public void runSlcExecution(SlcExecution slcExecution) {
+               sendReceive(new AgentMC("runSlcExecution", slcExecution));
+       }
+
+       public boolean ping() {
+               Object response = sendReceive(new AgentMC("ping"), false);
+               if (response == null)
+                       return false;
+               else {
+                       ExecutionAnswer answer = (ExecutionAnswer) response;
+                       return ExecutionAnswer.OK.equals(answer.getStatus());
+               }
+       }
+
+       protected Object sendReceive(AgentMC messageCreator) {
+               return sendReceive(messageCreator, true);
+       }
+
+       /**
+        * @param timeoutException
+        *            if true throws an exception if reception timeouted, else
+        *            return null
+        */
+       protected Object sendReceive(AgentMC messageCreator,
+                       boolean timeoutException) {
                String correlationId = UUID.randomUUID().toString();
                messageCreator.setCorrelationId(correlationId);
                send(messageCreator);
-               return processResponse(correlationId);
+
+               Object response = processResponse(messageCreator, timeoutException);
+
+               if (response instanceof ExecutionAnswer) {
+                       ExecutionAnswer answer = (ExecutionAnswer) response;
+                       if (ExecutionAnswer.ERROR.equals(answer.getStatus()))
+                               throw new SlcException("Execution of '"
+                                               + messageCreator.getQuery() + "' failed on the agent "
+                                               + agentUuid + ": " + answer.getMessage()
+                                               + " (correlationId=" + correlationId + ")");
+                       else
+                               return answer;
+               } else {
+                       return response;
+               }
        }
 
-       protected void send(AgentProxyMessageCreator messageCreator) {
+       protected void send(AgentMC messageCreator) {
                jmsTemplate.send(requestDestination, messageCreator);
                if (log.isDebugEnabled())
-                       log.debug("Sent request" + messageCreator.getQuery() + " to agent "
-                                       + agentUuid + " with correlationId "
-                                       + messageCreator.getCorrelationId());
+                       log.debug("Sent query '" + messageCreator.getQuery()
+                                       + "' with correlationId "
+                                       + messageCreator.getCorrelationId() + " to agent "
+                                       + agentUuid);
        }
 
-       protected Object processResponse(String correlationId) {
+       protected Object processResponse(AgentMC messageCreator,
+                       boolean timeoutException) {
+               String correlationId = messageCreator.getCorrelationId();
+               String query = messageCreator.getQuery();
+               Message responseMsg = null;
+               try {
+                       responseMsg = jmsTemplate.receiveSelected(responseDestination,
+                                       "JMSCorrelationID='" + correlationId + "'");
+               } catch (Exception e) {
+                       throw new SlcException("Could not receive response from agent "
+                                       + agentUuid + " with correlationId " + correlationId
+                                       + " (query '" + query + "')", e);
+               }
+
+               if (responseMsg == null) {// timeout
+                       if (timeoutException)
+                               throw new SlcException("TIMEOUT: Query '" + query + "'"
+                                               + " with correlationId " + correlationId
+                                               + " sent to agent " + agentUuid + " timed out.");
+                       else
+                               return null;
+               }
+               if (log.isDebugEnabled())
+                       log.debug("Received response for query '" + query
+                                       + "' with correlationId " + correlationId + " from agent "
+                                       + agentUuid);
+
                try {
-                       Message responseMsg = jmsTemplate.receiveSelected(
-                                       responseDestination, "JMSCorrelationID='" + correlationId
-                                                       + "'");
-                       if (log.isDebugEnabled())
-                               log.debug("Received response with correlationId "
-                                               + correlationId);
                        return messageConverter.fromMessage(responseMsg);
                } catch (Exception e) {
-                       throw new SlcException("Could not process response from agent "
-                                       + agentUuid + " with correlationId " + correlationId, e);
+                       throw new SlcException("Could not convert response from agent "
+                                       + agentUuid + " with correlationId " + correlationId
+                                       + " (query '" + query + "')", e);
                }
        }
 
-       protected class AgentProxyMessageCreator implements MessageCreator {
+       protected class AgentMC implements MessageCreator {
                private final String query;
+               private Object body = null;
                private String correlationId;
 
-               public AgentProxyMessageCreator(String query) {
+               public AgentMC(String query) {
+                       this.query = query;
+               }
+
+               public AgentMC(String query, Object body) {
                        this.query = query;
+                       this.body = body;
                }
 
                public final Message createMessage(Session session) throws JMSException {
@@ -97,7 +165,11 @@ public class JmsAgentProxy implements SlcAgent {
                                throw new SlcException("Agent UUID not set");
                        if (correlationId == null)
                                throw new SlcException("JMSCorrelationID not set");
-                       TextMessage msg = session.createTextMessage();
+                       final Message msg;
+                       if (body == null)
+                               msg = session.createTextMessage();
+                       else
+                               msg = messageConverter.toMessage(body, session);
                        msg.setStringProperty(JmsAgent.PROPERTY_SLC_AGENT_ID, agentUuid);
                        msg.setStringProperty(JmsAgent.PROPERTY_QUERY, query);
                        msg.setJMSCorrelationID(correlationId);
index 856b956a45fbdf80b88c764ab709862e534b8969..28913aea87e46807b39f60c5f3a07ee64bfcccbc 100644 (file)
@@ -1,34 +1,43 @@
 package org.argeo.slc.jms;
 
-import java.util.UUID;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
+import javax.jms.MessageListener;
+
+import org.argeo.slc.SlcException;
+import org.argeo.slc.process.SlcExecution;
+import org.argeo.slc.runtime.SlcAgent;
+import org.argeo.slc.runtime.SlcAgentFactory;
+import org.springframework.jms.support.converter.MessageConverter;
 
-import org.springframework.jms.listener.SessionAwareMessageListener;
+/** Temporary hack */
+public class JmsTransferNewExecution implements MessageListener {
+       private MessageConverter messageConverter;
+       private SlcAgentFactory agentFactory;
 
-/** Temporary hack*/
-public class JmsTransferNewExecution implements SessionAwareMessageListener {
-       private Destination requestDestination;
+       public void onMessage(final Message message) {
+               try {
+                       String agentId = message
+                                       .getStringProperty(JmsAgent.PROPERTY_SLC_AGENT_ID);
+                       final SlcAgent agent = agentFactory.getAgent(agentId);
+                       final SlcExecution slcExecution = (SlcExecution) messageConverter
+                                       .fromMessage(message);
+                       new Thread() {
+                               public void run() {
+                                       agent.runSlcExecution(slcExecution);
+                               }
+                       }.start();
+               } catch (Exception e) {
+                       throw new SlcException("Could not transfer new execution "
+                                       + message, e);
+               }
+       }
 
-       public void onMessage(Message message, Session session) throws JMSException {
-               TextMessage messageToSend = session
-                               .createTextMessage(((TextMessage) message).getText());
-               messageToSend
-                               .setStringProperty(JmsAgent.PROPERTY_QUERY, "newExecution");
-               messageToSend.setStringProperty(JmsAgent.PROPERTY_SLC_AGENT_ID, message
-                               .getStringProperty(JmsAgent.PROPERTY_SLC_AGENT_ID));
-               messageToSend.setJMSCorrelationID(UUID.randomUUID().toString());
-               MessageProducer producer = session.createProducer(requestDestination);
-               producer.send(messageToSend);
+       public void setMessageConverter(MessageConverter messageConverter) {
+               this.messageConverter = messageConverter;
        }
 
-       public void setRequestDestination(Destination requestDestination) {
-               this.requestDestination = requestDestination;
+       public void setAgentFactory(SlcAgentFactory agentFactory) {
+               this.agentFactory = agentFactory;
        }
 
 }
index 6fead8d8d6512c2f190b90843759133d8d538800..adb96e325c4f1fb2e3ad5f3076918b493be60555 100644 (file)
@@ -17,6 +17,19 @@ public class SlcAgentDescriptorDaoHibernate extends HibernateDaoSupport
                getHibernateTemplate().delete(slcAgentDescriptor);\r
        }\r
 \r
+       public void delete(String agentId) {\r
+               Object obj = getHibernateTemplate().get(SlcAgentDescriptor.class,\r
+                               agentId);\r
+               if (obj != null)\r
+                       getHibernateTemplate().delete(obj);\r
+       }\r
+\r
+       public SlcAgentDescriptor getAgentDescriptor(String agentId) {\r
+\r
+               return (SlcAgentDescriptor) getHibernateTemplate().get(\r
+                               SlcAgentDescriptor.class, agentId);\r
+       }\r
+\r
        public List<SlcAgentDescriptor> listSlcAgentDescriptors() {\r
                return (List<SlcAgentDescriptor>) getHibernateTemplate().loadAll(\r
                                SlcAgentDescriptor.class);\r
index 75215115e9ecd5259bf11c5bd070ba328319b351..c7a419c655cb2dc52476128ae9c8349d30471971 100644 (file)
@@ -8,7 +8,7 @@ public abstract class AbstractAgent {
 
        private ExecutionModulesManager modulesManager;
 
-       protected void runSlcExecution(final SlcExecution slcExecution) {
+       public void runSlcExecution(final SlcExecution slcExecution) {
                modulesManager.process(slcExecution);
        }
 
index e113f98bb65a95d012b87791c59d019649384b56..0ae20be70e495c6c434cbed6d7be8ed186e83260 100644 (file)
@@ -1,5 +1,9 @@
 package org.argeo.slc.msg;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+import org.apache.commons.io.IOUtils;
 import org.argeo.slc.SlcException;
 
 /** Answer to an execution of a remote service which performed changes. */
@@ -44,6 +48,16 @@ public class ExecutionAnswer {
                return new ExecutionAnswer(ERROR, message);
        }
 
+       public static ExecutionAnswer error(Throwable e) {
+               StringWriter writer = new StringWriter();
+               try {
+                       e.printStackTrace(new PrintWriter(writer));
+                       return new ExecutionAnswer(ERROR, writer.toString());
+               } finally {
+                       IOUtils.closeQuietly(writer);
+               }
+       }
+
        public static ExecutionAnswer ok(String message) {
                return new ExecutionAnswer(OK, message);
        }
index 08b00f5f437dd0c978e359e3611abbc8225b3923..e7fa0c18a20b982de9750c184af18b23d7865f76 100644 (file)
@@ -5,7 +5,9 @@ Import-Package: javax.jms,
  org.apache.commons.logging,
  org.apache.activemq.pool,
  org.apache.commons.pool
-Require-Bundle: org.argeo.slc.server,
+Require-Bundle: 
+ org.argeo.slc.specs,
+ org.argeo.slc.server,
  org.argeo.slc.support.activemq,
  org.springframework.core,
  org.springframework.context,
index 789db5f62b084dba6a734e73ad064fbe0072948f..f617c281b817cb32f9c71f936c9e29aaf0b73085 100644 (file)
@@ -8,4 +8,6 @@
 \r
        <service ref="jmsConnectionFactory" interface="javax.jms.ConnectionFactory" />\r
 \r
+       <service ref="agentFactory" interface="org.argeo.slc.runtime.SlcAgentFactory" />\r
+\r
 </beans:beans>
\ No newline at end of file
index ef27997d386b1f70ac5242d60467cca3412644d4..6695820832d20f1ba41784464ec15bc49da22a34 100644 (file)
        </amq:broker>
 
        <!-- Connection Factory -->
-       <!-- <bean id="jmsConnectionFactory"
+       <!--
+               <bean id="jmsConnectionFactory"
                class="org.springframework.jms.connection.SingleConnectionFactory">
-               <property name="targetConnectionFactory">
-                       <bean id="slcDefault.jms.amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
-                               <property name="brokerURL">
-                                       <value>vm://localhost</value>
-                               </property>
-                       </bean>
-               </property>
-       </bean> -->
+               <property name="targetConnectionFactory"> <bean
+               id="slcDefault.jms.amqConnectionFactory"
+               class="org.apache.activemq.ActiveMQConnectionFactory"> <property
+               name="brokerURL"> <value>vm://localhost</value> </property> </bean>
+               </property> </bean>
+       -->
 
        <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
                destroy-method="stop">
                <property name="config" value="classpath:activemq-conf.xml" />
                <property name="start" value="true" /> </bean>
        -->
+
+       <bean id="agentFactory" class="org.argeo.slc.jms.JmsAgentProxyFactory">
+               <property name="jmsTemplate">
+                       <bean class="org.springframework.jms.core.JmsTemplate">
+                               <property name="connectionFactory" ref="jmsConnectionFactory" />
+                               <property name="receiveTimeout" value="20000" />
+                       </bean>
+               </property>
+               <property name="messageConverter" ref="slcDefault.jms.castorMessageConverter" />
+               <property name="requestDestination" ref="slcJms.destination.agent.request" />
+               <property name="responseDestination" ref="slcJms.destination.agent.response" />
+       </bean>
+
+
 </beans>
\ No newline at end of file
index 12f9325bcdb8a2fbf60c4cd1c768586c49e8455b..391fff9ff573de471df1ce705803b7fe003f60f0 100644 (file)
@@ -1,5 +1,5 @@
 Bundle-SymbolicName: org.argeo.slc.server.jdbc
-Bundle-Version: 1.0.0
+Bundle-Version: 0.11.3.qualifier
 Fragment-Host: org.springframework.jdbc
 Import-Package: 
  org.hsqldb;resolution:=optional,
index 1abc49736f97d6e1df62498e259f8b74b154cbd2..6c41418a6d1164fc10678f4cac9aa60b5097dbff 100644 (file)
@@ -14,5 +14,5 @@
 
        <reference id="jmsConnectionFactory" interface="javax.jms.ConnectionFactory" />
 
-       <service ref="agentFactory" interface="org.argeo.slc.runtime.SlcAgentFactory" />
+       <reference id="agentFactory" interface="org.argeo.slc.runtime.SlcAgentFactory" />
 </beans:beans>
\ No newline at end of file
index b2000f6254659ed32670f16c9b8a64bdda5aa640..24cb545bb2974099e0f633b7796eae7775260607 100644 (file)
@@ -6,18 +6,6 @@
 
        <import resource="classpath:org/argeo/slc/activemq/spring.xml" />
 
-       <bean id="agentFactory" class="org.argeo.slc.jms.JmsAgentProxyFactory">
-               <property name="jmsTemplate">
-                       <bean class="org.springframework.jms.core.JmsTemplate">
-                               <property name="connectionFactory" ref="jmsConnectionFactory" />
-                               <property name="receiveTimeout" value="20000" />
-                       </bean>
-               </property>
-               <property name="messageConverter" ref="slcDefault.jms.castorMessageConverter" />
-               <property name="requestDestination" ref="slcJms.destination.agent.request" />
-               <property name="responseDestination" ref="slcJms.destination.agent.response" />
-       </bean>
-
        <!-- Agent Service -->
        <bean id="agentService.jmsContainer.register" parent="listenerContainer">
                <property name="destination" ref="slcJms.destination.agent.register" />
@@ -43,7 +31,8 @@
                <property name="destination" ref="slcJms.destination.agent.newExecution" />
                <property name="messageListener">
                        <bean class="org.argeo.slc.jms.JmsTransferNewExecution">
-                               <property name="requestDestination" ref="slcJms.destination.agent.request" />
+                               <property name="messageConverter" ref="slcDefault.jms.castorMessageConverter" />
+                               <property name="agentFactory" ref="agentFactory" />
                        </bean>
                </property>
        </bean>
index 04c557c55a5ab7c2bbdf890ee86e1505c1afe09d..f486d9dee08c29cea8b3274f4ea21bae20f4f3c5 100644 (file)
        <reference interface="org.argeo.slc.dao.runtime.SlcAgentDescriptorDao"
                id="slcAgentDescriptorDao" />
 
-       <!-- <reference id="transactionManager"
-               interface="org.springframework.transaction.PlatformTransactionManager" /> -->
+       <!--
+               <reference id="transactionManager"
+               interface="org.springframework.transaction.PlatformTransactionManager"
+               />
+       -->
+
+       <reference id="agentFactory" interface="org.argeo.slc.runtime.SlcAgentFactory" />
 
 </beans:beans>
\ No newline at end of file
index e93f04885e8b72d5945a6fc2a39e8fa5afa84520..6ae0a3b850ec7498483e7e44b980c00417ab2086 100644 (file)
 
        <bean id="agentService" class="org.argeo.slc.services.impl.runtime.AgentServiceImpl">
                <constructor-arg ref="slcAgentDescriptorDao" />
+               <constructor-arg ref="agentFactory" />
+               <property name="pingCycle" value="0" />
        </bean>
 
-       <bean id="slcExecutionService" class="org.argeo.slc.services.impl.process.SlcExecutionServiceImpl">
+       <bean id="slcExecutionService"
+               class="org.argeo.slc.services.impl.process.SlcExecutionServiceImpl">
                <constructor-arg ref="slcExecutionDao" />
        </bean>
 
-       <!-- Services transactions
-       <aop:config>
-               <aop:pointcut id="serviceMethods"
-                       expression="execution(* org.argeo.slc.services.test.TestManagerService.*(..))" />
-               <aop:advisor advice-ref="serviceAdvice" pointcut-ref="serviceMethods" />
-       </aop:config>
-
-       <tx:advice id="serviceAdvice" transaction-manager="transactionManager">
-               <tx:attributes>
-                       <tx:method name="*" propagation="REQUIRED" />
-               </tx:attributes>
-       </tx:advice>
- -->
+       <!--
+               Services transactions <aop:config> <aop:pointcut id="serviceMethods"
+               expression="execution(*
+               org.argeo.slc.services.test.TestManagerService.*(..))" /> <aop:advisor
+               advice-ref="serviceAdvice" pointcut-ref="serviceMethods" />
+               </aop:config> <tx:advice id="serviceAdvice"
+               transaction-manager="transactionManager"> <tx:attributes> <tx:method
+               name="*" propagation="REQUIRED" /> </tx:attributes> </tx:advice>
+       -->
 </beans>
\ No newline at end of file