import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
-import javax.jms.TextMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.argeo.slc.execution.ExecutionModule;
import org.argeo.slc.execution.ExecutionModuleDescriptor;
import org.argeo.slc.msg.ExecutionAnswer;
+import org.argeo.slc.msg.ReferenceList;
import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.runtime.SlcAgent;
import org.argeo.slc.runtime.SlcAgentDescriptor;
public final static String PROPERTY_QUERY = "query";
public final static String PROPERTY_SLC_AGENT_ID = "slc_agentId";
+ public final static String QUERY_PING_ALL = "pingAll";
+
private final static Log log = LogFactory.getLog(JmsAgent.class);
private final SlcAgentDescriptor agentDescriptor;
throw new SlcException("Cannot analyze incoming message " + message);
}
- final Object response = process(query, message);
+ final Object response;
+ final Destination destinationSend;
+ if (QUERY_PING_ALL.equals(query)) {
+ ReferenceList refList = (ReferenceList) convertFrom(message);
+ if (!refList.getReferences().contains(agentDescriptor.getUuid())) {
+ response = agentDescriptor;
+ destinationSend = agentRegister;
+ log.info("Agent #" + agentDescriptor.getUuid()
+ + " registering to " + agentRegister
+ + " in reply to a " + QUERY_PING_ALL + " query");
+ } else {
+ return;
+ }
+ } else {
+ response = process(query, message);
+ destinationSend = responseDestination;
+ }
new Thread() {
public void run() {
// Send response
- jmsTemplate.convertAndSend(responseDestination, response,
+ jmsTemplate.convertAndSend(destinationSend, response,
new MessagePostProcessor() {
public Message postProcessMessage(
Message messageToSend) throws JMSException {
- messageToSend
- .setStringProperty(
- PROPERTY_QUERY,
- message
- .getStringProperty(PROPERTY_QUERY));
+ messageToSend.setStringProperty(PROPERTY_QUERY,
+ query);
messageToSend.setStringProperty(
PROPERTY_SLC_AGENT_ID, agentDescriptor
.getUuid());
- messageToSend.setJMSCorrelationID(message
- .getJMSCorrelationID());
+ messageToSend
+ .setJMSCorrelationID(correlationId);
return messageToSend;
}
});
if (log.isDebugEnabled())
- log.debug("Sent response to query " + query
- + " with correlationId " + correlationId);
+ log.debug("Sent response to query '" + query
+ + "' with correlationId " + correlationId);
}
}.start();
}
}
- protected Object convertFrom(Message message) throws JMSException {
- return jmsTemplate.getMessageConverter().fromMessage(message);
+ protected Object convertFrom(Message message) {
+ try {
+ return jmsTemplate.getMessageConverter().fromMessage(message);
+ } catch (JMSException e) {
+ throw new SlcException("Cannot convert message", e);
+ }
}
public void setResponseDestination(Destination responseDestination) {