package org.argeo.slc.jms;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.List;
-import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.argeo.slc.msg.ReferenceList;
import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.runtime.SlcAgentDescriptor;
-import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.InitializingBean;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessagePostProcessor;
-/** JMS based implementation of SLC Agent. */
-public class JmsAgent extends DefaultAgent implements InitializingBean,
- DisposableBean, MessageListener {
+/** JMS based implementation of an SLC Agent. */
+public class JmsAgent extends DefaultAgent implements MessageListener {
public final static String PROPERTY_QUERY = "query";
public final static String QUERY_PING_ALL = "pingAll";
private final static Log log = LogFactory.getLog(JmsAgent.class);
- private final SlcAgentDescriptor agentDescriptor;
private JmsTemplate jmsTemplate;
private Destination agentRegister;
private Destination agentUnregister;
private Destination responseDestination;
- public JmsAgent() {
+ public void init() {
+ super.init();
try {
- agentDescriptor = new SlcAgentDescriptor();
- agentDescriptor.setUuid(UUID.randomUUID().toString());
- agentDescriptor.setHost(InetAddress.getLocalHost().getHostName());
- } catch (UnknownHostException e) {
- throw new SlcException("Unable to create agent descriptor.", e);
- }
- }
-
- public void afterPropertiesSet() throws Exception {
- try {
- jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
- log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
+ jmsTemplate.convertAndSend(agentRegister, getAgentDescriptor());
+ log.info("Agent #" + getAgentUuid() + " registered to "
+ agentRegister);
} catch (JmsException e) {
- log
- .warn("Could not register agent "
- + agentDescriptor.getUuid()
- + " to server: "
- + e.getMessage()
- + ". The agent will stay offline but will keep listening for a ping all sent by server.");
+ log.warn("Could not register agent "
+ + getAgentDescriptor().getUuid()
+ + " to server: "
+ + e.getMessage()
+ + ". The agent will stay offline but will keep listening for a ping all sent by server.");
if (log.isTraceEnabled())
log.debug("Original error.", e);
}
}
- public void destroy() throws Exception {
+ public void dispose() {
try {
- jmsTemplate.convertAndSend(agentUnregister, agentDescriptor);
- log.info("Agent #" + agentDescriptor.getUuid()
- + " unregistered from " + agentUnregister);
+ jmsTemplate.convertAndSend(agentUnregister, getAgentDescriptor());
+ log.info("Agent #" + getAgentUuid() + " unregistered from "
+ + agentUnregister);
} catch (JmsException e) {
- log.warn("Could not unregister agent " + agentDescriptor.getUuid()
- + ": " + e.getMessage());
+ log.warn("Could not unregister agent " + getAgentUuid() + ": "
+ + e.getMessage());
if (log.isTraceEnabled())
log.debug("Original error.", e);
}
+ super.dispose();
}
public void setAgentRegister(Destination agentRegister) {
}
public String getMessageSelector() {
- String messageSelector = "slc_agentId='" + agentDescriptor.getUuid()
- + "'";
+ String messageSelector = "slc_agentId='" + getAgentUuid() + "'";
// if (log.isDebugEnabled())
// log.debug("Message selector: " + messageSelector);
return messageSelector;
final Destination destinationSend;
if (QUERY_PING_ALL.equals(query)) {
ReferenceList refList = (ReferenceList) convertFrom(message);
- if (!refList.getReferences().contains(agentDescriptor.getUuid())) {
- response = agentDescriptor;
+ if (!refList.getReferences().contains(getAgentUuid())) {
+ response = getAgentDescriptor();
destinationSend = agentRegister;
- log.info("Agent #" + agentDescriptor.getUuid()
- + " registering to " + agentRegister
- + " in reply to a " + QUERY_PING_ALL + " query");
+ log.info("Agent #" + getAgentUuid() + " registering to "
+ + agentRegister + " in reply to a " + QUERY_PING_ALL
+ + " query");
} else {
return;
}
}
// Send response
+ if (log.isTraceEnabled())
+ log.trace("About to send response " + response.getClass());
jmsTemplate.convertAndSend(destinationSend, response,
new MessagePostProcessor() {
public Message postProcessMessage(Message messageToSend)
messageToSend.setStringProperty(PROPERTY_QUERY, query);
messageToSend.setStringProperty(
MsgConstants.PROPERTY_SLC_AGENT_ID,
- agentDescriptor.getUuid());
+ getAgentUuid());
messageToSend.setJMSCorrelationID(correlationId);
return messageToSend;
}
List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
- agentDescriptor);
+ getAgentDescriptor());
agentDescriptorToSend.setModuleDescriptors(lst);
return agentDescriptorToSend;
} else if ("runSlcExecution".equals(query)) {
}
}.start();
return ExecutionAnswer.ok("Execution started on agent "
- + agentDescriptor.getUuid());
+ + getAgentUuid());
} else if ("ping".equals(query)) {
- return ExecutionAnswer.ok("Agent " + agentDescriptor.getUuid()
+ return ExecutionAnswer.ok("Agent " + getAgentUuid()
+ " is alive.");
} else {
throw new SlcException("Unsupported query " + query);