import java.util.List;
import java.util.UUID;
-import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import org.argeo.slc.core.runtime.AbstractAgent;
import org.argeo.slc.execution.ExecutionModule;
import org.argeo.slc.execution.ExecutionModuleDescriptor;
-import org.argeo.slc.msg.ObjectList;
+import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.runtime.SlcAgent;
import org.argeo.slc.runtime.SlcAgentDescriptor;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.SessionAwareMessageListener;
-import org.springframework.jms.support.converter.MessageConverter;
+import org.springframework.jms.support.converter.MessageConversionException;
/** JMS based implementation of SLC Agent. */
public class JmsAgent extends AbstractAgent implements SlcAgent,
InitializingBean, DisposableBean, SessionAwareMessageListener {
+ public final static String PROPERTY_QUERY = "query";
+ public final static String PROPERTY_SLC_AGENT_ID = "slc_agentId";
+
private final static Log log = LogFactory.getLog(JmsAgent.class);
private final SlcAgentDescriptor agentDescriptor;
- private ConnectionFactory connectionFactory;
+ // private ConnectionFactory connectionFactory;
private JmsTemplate jmsTemplate;
private Destination agentRegister;
private Destination agentUnregister;
// private Destination requestDestination;
private Destination responseDestination;
- private MessageConverter messageConverter;
+ // private MessageConverter messageConverter;
public JmsAgent() {
try {
public void afterPropertiesSet() throws Exception {
// Initialize JMS Template
- jmsTemplate = new JmsTemplate(connectionFactory);
- jmsTemplate.setMessageConverter(messageConverter);
+ // jmsTemplate = new JmsTemplate(connectionFactory);
+ // jmsTemplate.setMessageConverter(messageConverter);
jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
this.agentUnregister = agentUnregister;
}
- /*
- * public void onMessage(Message message) { // FIXME: we filter the messages
- * on the client side, // because of a weird problem with selector since
- * moving to OSGi try { if (message.getStringProperty("slc-agentId").equals(
- * agentDescriptor.getUuid())) { runSlcExecution((SlcExecution)
- * messageConverter .fromMessage(message)); } else { if
- * (log.isDebugEnabled()) log.debug("Filtered out message " + message); } }
- * catch (JMSException e) { throw new SlcException("Cannot convert message "
- * + message, e); }
- *
- * }
- */
-
public String getMessageSelector() {
String messageSelector = "slc_agentId='" + agentDescriptor.getUuid()
+ "'";
public void onMessage(Message message, Session session) throws JMSException {
MessageProducer producer = session.createProducer(responseDestination);
- String query = message.getStringProperty("query");
+ String query = message.getStringProperty(PROPERTY_QUERY);
String correlationId = message.getJMSCorrelationID();
if (log.isDebugEnabled())
log.debug("Received query " + query + " with correlationId "
String version = message.getStringProperty("version");
ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
moduleName, version);
- responseMsg = messageConverter.toMessage(emd, session);
+ responseMsg = jmsTemplate.getMessageConverter().toMessage(emd,
+ session);
} else if ("listExecutionModuleDescriptors".equals(query)) {
List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
agentDescriptor);
agentDescriptorToSend.setModuleDescriptors(lst);
- responseMsg = messageConverter.toMessage(agentDescriptorToSend,
- session);
+ responseMsg = jmsTemplate.getMessageConverter().toMessage(
+ agentDescriptorToSend, session);
+ } else if ("newExecution".equals(query)) {
+
+ SlcExecution slcExecution = (SlcExecution) jmsTemplate
+ .getMessageConverter().fromMessage(message);
+ runSlcExecution(slcExecution);
+ return;
} else {
- throw new SlcException("Unsupported query " + query);
+ // try {
+ // // FIXME: generalize
+ // SlcExecution slcExecution = (SlcExecution) jmsTemplate
+ // .getMessageConverter().fromMessage(message);
+ // runSlcExecution(slcExecution);
+ // } catch (MessageConversionException e) {
+ // if (log.isDebugEnabled())
+ // log.debug("Unsupported query " + query, e);
+ // }
+ if (log.isDebugEnabled())
+ log.debug("Unsupported query " + query);
+ return;
}
if (responseMsg != null) {
}
- public void setMessageConverter(MessageConverter messageConverter) {
- this.messageConverter = messageConverter;
- }
-
- public void setConnectionFactory(ConnectionFactory connectionFactory) {
- this.connectionFactory = connectionFactory;
- }
-
public ExecutionModuleDescriptor getExecutionModuleDescriptor(
String moduleName, String version) {
return getModulesManager().getExecutionModuleDescriptor(moduleName,
this.responseDestination = responseDestination;
}
+ public void setJmsTemplate(JmsTemplate jmsTemplate) {
+ this.jmsTemplate = jmsTemplate;
+ }
+
}