+ public void onMessage(final Message message) {
+ final String query;
+ final String correlationId;
+ try {
+ query = message.getStringProperty(PROPERTY_QUERY);
+ correlationId = message.getJMSCorrelationID();
+ } catch (JMSException e1) {
+ throw new SlcException("Cannot analyze incoming message " + message);
+ }
+
+ final Object response;
+ final Destination destinationSend;
+ if (QUERY_PING_ALL.equals(query)) {
+ ReferenceList refList = (ReferenceList) convertFrom(message);
+ if (!refList.getReferences().contains(getAgentUuid())) {
+ response = getAgentDescriptor();
+ destinationSend = agentRegister;
+ log.info("Agent #" + getAgentUuid() + " registering to "
+ + agentRegister + " in reply to a " + QUERY_PING_ALL
+ + " query");
+ } else {
+ return;
+ }
+ } else {
+ response = process(query, message);
+ destinationSend = responseDestination;
+ }
+
+ // Send response
+ if (log.isTraceEnabled())
+ log.trace("About to send response " + response.getClass());
+ jmsTemplate.convertAndSend(destinationSend, response,
+ new MessagePostProcessor() {
+ public Message postProcessMessage(Message messageToSend)
+ throws JMSException {
+ messageToSend.setStringProperty(PROPERTY_QUERY, query);
+ messageToSend.setStringProperty(
+ MsgConstants.PROPERTY_SLC_AGENT_ID,
+ getAgentUuid());
+ messageToSend.setJMSCorrelationID(correlationId);
+ return messageToSend;
+ }
+ });
+ if (log.isTraceEnabled())
+ log.debug("Sent response to query '" + query
+ + "' with correlationId " + correlationId);
+ }
+
+ /** @return response */
+ public Object process(String query, Message message) {
+ try {
+ if ("getExecutionModuleDescriptor".equals(query)) {
+ String moduleName = message.getStringProperty("moduleName");
+ String version = message.getStringProperty("version");
+ return getExecutionModuleDescriptor(moduleName, version);
+ } else if ("listExecutionModuleDescriptors".equals(query)) {
+
+ List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
+ SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
+ getAgentDescriptor());
+ agentDescriptorToSend.setModuleDescriptors(lst);
+ return agentDescriptorToSend;
+ } else if ("runSlcExecution".equals(query)) {
+ final SlcExecution slcExecution = (SlcExecution) convertFrom(message);
+ new Thread() {
+ public void run() {
+ runSlcExecution(slcExecution);
+ }
+ }.start();
+ return ExecutionAnswer.ok("Execution started on agent "
+ + getAgentUuid());
+ } else if ("ping".equals(query)) {
+ return ExecutionAnswer.ok("Agent " + getAgentUuid()
+ + " is alive.");
+ } else {
+ throw new SlcException("Unsupported query " + query);
+ }
+ } catch (Exception e) {
+ log.error("Processing of query " + query + " failed", e);
+ return ExecutionAnswer.error(e);
+ }
+ }
+
+ protected Object convertFrom(Message message) {
+ try {
+ return jmsTemplate.getMessageConverter().fromMessage(message);
+ } catch (JMSException e) {
+ throw new SlcException("Cannot convert message", e);
+ }
+ }
+
+ public void setResponseDestination(Destination responseDestination) {
+ this.responseDestination = responseDestination;