import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
import javax.jms.Destination;
import org.apache.commons.logging.LogFactory;
import org.argeo.slc.SlcException;
import org.argeo.slc.core.runtime.AbstractAgent;
+import org.argeo.slc.execution.ExecutionModule;
+import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.msg.ExecutionAnswer;
+import org.argeo.slc.msg.ReferenceList;
import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.runtime.SlcAgent;
import org.argeo.slc.runtime.SlcAgentDescriptor;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.support.converter.MessageConverter;
+import org.springframework.jms.core.MessagePostProcessor;
/** JMS based implementation of SLC Agent. */
public class JmsAgent extends AbstractAgent implements SlcAgent,
InitializingBean, DisposableBean, MessageListener {
+ public final static String PROPERTY_QUERY = "query";
+ public final static String PROPERTY_SLC_AGENT_ID = "slc_agentId";
+
+ public final static String QUERY_PING_ALL = "pingAll";
+
private final static Log log = LogFactory.getLog(JmsAgent.class);
private final SlcAgentDescriptor agentDescriptor;
private Destination agentRegister;
private Destination agentUnregister;
- private MessageConverter messageConverter;
+ private Destination responseDestination;
public JmsAgent() {
try {
+ agentUnregister);
}
- public void setJmsTemplate(JmsTemplate jmsTemplate) {
- this.jmsTemplate = jmsTemplate;
- }
-
public void setAgentRegister(Destination agentRegister) {
this.agentRegister = agentRegister;
}
this.agentUnregister = agentUnregister;
}
- public void onMessage(Message message) {
- // FIXME: we filter the messages on the client side,
- // because of a weird problem with selector since moving to OSGi
+ public String getMessageSelector() {
+ String messageSelector = "slc_agentId='" + agentDescriptor.getUuid()
+ + "'";
+ // if (log.isDebugEnabled())
+ // log.debug("Message selector: " + messageSelector);
+ return messageSelector;
+ }
+
+ public ExecutionModuleDescriptor getExecutionModuleDescriptor(
+ String moduleName, String version) {
+ return getModulesManager().getExecutionModuleDescriptor(moduleName,
+ version);
+ }
+
+ public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
+ List<ExecutionModule> modules = getModulesManager()
+ .listExecutionModules();
+
+ List<ExecutionModuleDescriptor> descriptors = new ArrayList<ExecutionModuleDescriptor>();
+ for (ExecutionModule module : modules) {
+ ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
+ md.setName(module.getName());
+ md.setVersion(module.getVersion());
+ descriptors.add(md);
+ }
+ return descriptors;
+ }
+
+ public boolean ping() {
+ return true;
+ }
+
+ public void onMessage(final Message message) {
+ final String query;
+ final String correlationId;
+ try {
+ query = message.getStringProperty(PROPERTY_QUERY);
+ correlationId = message.getJMSCorrelationID();
+ } catch (JMSException e1) {
+ throw new SlcException("Cannot analyze incoming message " + message);
+ }
+
+ final Object response;
+ final Destination destinationSend;
+ if (QUERY_PING_ALL.equals(query)) {
+ ReferenceList refList = (ReferenceList) convertFrom(message);
+ if (!refList.getReferences().contains(agentDescriptor.getUuid())) {
+ response = agentDescriptor;
+ destinationSend = agentRegister;
+ log.info("Agent #" + agentDescriptor.getUuid()
+ + " registering to " + agentRegister
+ + " in reply to a " + QUERY_PING_ALL + " query");
+ } else {
+ return;
+ }
+ } else {
+ response = process(query, message);
+ destinationSend = responseDestination;
+ }
+
+ new Thread() {
+ public void run() {
+ // Send response
+ jmsTemplate.convertAndSend(destinationSend, response,
+ new MessagePostProcessor() {
+ public Message postProcessMessage(
+ Message messageToSend) throws JMSException {
+ messageToSend.setStringProperty(PROPERTY_QUERY,
+ query);
+ messageToSend.setStringProperty(
+ PROPERTY_SLC_AGENT_ID, agentDescriptor
+ .getUuid());
+ messageToSend
+ .setJMSCorrelationID(correlationId);
+ return messageToSend;
+ }
+ });
+ if (log.isDebugEnabled())
+ log.debug("Sent response to query '" + query
+ + "' with correlationId " + correlationId);
+ }
+ }.start();
+
+ }
+
+ /** @return response */
+ public Object process(String query, Message message) {
try {
- if (message.getStringProperty("slc-agentId").equals(
- agentDescriptor.getUuid())) {
- runSlcExecution((SlcExecution) messageConverter
- .fromMessage(message));
+ if ("getExecutionModuleDescriptor".equals(query)) {
+ String moduleName = message.getStringProperty("moduleName");
+ String version = message.getStringProperty("version");
+ return getExecutionModuleDescriptor(moduleName, version);
+ } else if ("listExecutionModuleDescriptors".equals(query)) {
+
+ List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
+ SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
+ agentDescriptor);
+ agentDescriptorToSend.setModuleDescriptors(lst);
+ return agentDescriptorToSend;
+ } else if ("runSlcExecution".equals(query)) {
+ SlcExecution slcExecution = (SlcExecution) convertFrom(message);
+ runSlcExecution(slcExecution);
+ return ExecutionAnswer.ok("Execution started on agent "
+ + agentDescriptor.getUuid());
+ } else if ("ping".equals(query)) {
+ return ExecutionAnswer.ok("Agent " + agentDescriptor.getUuid()
+ + " is alive.");
+ } else {
+ throw new SlcException("Unsupported query " + query);
}
- } catch (JMSException e) {
- throw new SlcException("Cannot convert message " + message, e);
+ } catch (Exception e) {
+ log.error("Processing of query " + query + " failed", e);
+ return ExecutionAnswer.error(e);
}
+ }
+ protected Object convertFrom(Message message) {
+ try {
+ return jmsTemplate.getMessageConverter().fromMessage(message);
+ } catch (JMSException e) {
+ throw new SlcException("Cannot convert message", e);
+ }
}
- public String getMessageSelector() {
- String messageSelector = "slc-agentId=" + agentDescriptor.getUuid()
- + "";
- // String messageSelector = "slc-agentId LIKE '%'";
- if (log.isDebugEnabled())
- log.debug("Message selector: '" + messageSelector + "'");
- return messageSelector;
+ public void setResponseDestination(Destination responseDestination) {
+ this.responseDestination = responseDestination;
}
- public void setMessageConverter(MessageConverter messageConverter) {
- this.messageConverter = messageConverter;
+ public void setJmsTemplate(JmsTemplate jmsTemplate) {
+ this.jmsTemplate = jmsTemplate;
}
-
-
+
}