import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.commons.logging.LogFactory;
import org.argeo.slc.SlcException;
import org.argeo.slc.core.runtime.AbstractAgent;
-import org.argeo.slc.execution.ExecutionModule;
import org.argeo.slc.execution.ExecutionModuleDescriptor;
import org.argeo.slc.msg.ExecutionAnswer;
+import org.argeo.slc.msg.MsgConstants;
import org.argeo.slc.msg.ReferenceList;
import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.runtime.SlcAgent;
import org.argeo.slc.runtime.SlcAgentDescriptor;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
+import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessagePostProcessor;
public class JmsAgent extends AbstractAgent implements SlcAgent,
InitializingBean, DisposableBean, MessageListener {
public final static String PROPERTY_QUERY = "query";
- public final static String PROPERTY_SLC_AGENT_ID = "slc_agentId";
-
public final static String QUERY_PING_ALL = "pingAll";
private final static Log log = LogFactory.getLog(JmsAgent.class);
}
public void afterPropertiesSet() throws Exception {
- jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
- log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
- + agentRegister);
+ try {
+ jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
+ log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
+ + agentRegister);
+ } catch (JmsException e) {
+ log
+ .warn("Could not register agent "
+ + agentDescriptor.getUuid()
+ + " to server: "
+ + e.getMessage()
+ + ". The agent will stay offline but will keep listening for a ping all sent by server.");
+ if (log.isTraceEnabled())
+ log.debug("Original error.", e);
+ }
}
public void destroy() throws Exception {
- jmsTemplate.convertAndSend(agentUnregister, agentDescriptor);
- log.info("Agent #" + agentDescriptor.getUuid() + " unregistered from "
- + agentUnregister);
+ try {
+ jmsTemplate.convertAndSend(agentUnregister, agentDescriptor);
+ log.info("Agent #" + agentDescriptor.getUuid()
+ + " unregistered from " + agentUnregister);
+ } catch (JmsException e) {
+ log.warn("Could not unregister agent " + agentDescriptor.getUuid()
+ + ": " + e.getMessage());
+ if (log.isTraceEnabled())
+ log.debug("Original error.", e);
+ }
}
public void setAgentRegister(Destination agentRegister) {
}
public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
- List<ExecutionModule> modules = getModulesManager()
- .listExecutionModules();
-
- List<ExecutionModuleDescriptor> descriptors = new ArrayList<ExecutionModuleDescriptor>();
- for (ExecutionModule module : modules) {
- ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
- md.setName(module.getName());
- md.setVersion(module.getVersion());
- descriptors.add(md);
- }
- return descriptors;
+ return getModulesManager().listExecutionModules();
}
public boolean ping() {
destinationSend = responseDestination;
}
- new Thread() {
- public void run() {
- // Send response
- jmsTemplate.convertAndSend(destinationSend, response,
- new MessagePostProcessor() {
- public Message postProcessMessage(
- Message messageToSend) throws JMSException {
- messageToSend.setStringProperty(PROPERTY_QUERY,
- query);
- messageToSend.setStringProperty(
- PROPERTY_SLC_AGENT_ID, agentDescriptor
- .getUuid());
- messageToSend
- .setJMSCorrelationID(correlationId);
- return messageToSend;
- }
- });
- if (log.isDebugEnabled())
- log.debug("Sent response to query '" + query
- + "' with correlationId " + correlationId);
- }
- }.start();
-
+ // Send response
+ jmsTemplate.convertAndSend(destinationSend, response,
+ new MessagePostProcessor() {
+ public Message postProcessMessage(Message messageToSend)
+ throws JMSException {
+ messageToSend.setStringProperty(PROPERTY_QUERY, query);
+ messageToSend.setStringProperty(
+ MsgConstants.PROPERTY_SLC_AGENT_ID,
+ agentDescriptor.getUuid());
+ messageToSend.setJMSCorrelationID(correlationId);
+ return messageToSend;
+ }
+ });
+ if (log.isTraceEnabled())
+ log.debug("Sent response to query '" + query
+ + "' with correlationId " + correlationId);
}
/** @return response */
agentDescriptorToSend.setModuleDescriptors(lst);
return agentDescriptorToSend;
} else if ("runSlcExecution".equals(query)) {
- SlcExecution slcExecution = (SlcExecution) convertFrom(message);
- runSlcExecution(slcExecution);
+ final SlcExecution slcExecution = (SlcExecution) convertFrom(message);
+ new Thread() {
+ public void run() {
+ runSlcExecution(slcExecution);
+ }
+ }.start();
return ExecutionAnswer.ok("Execution started on agent "
+ agentDescriptor.getUuid());
} else if ("ping".equals(query)) {