import org.apache.commons.logging.LogFactory;
import org.argeo.slc.SlcException;
import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.msg.ExecutionAnswer;
+import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.runtime.SlcAgent;
import org.argeo.slc.runtime.SlcAgentDescriptor;
import org.springframework.jms.core.JmsTemplate;
public ExecutionModuleDescriptor getExecutionModuleDescriptor(
final String moduleName, final String version) {
- return (ExecutionModuleDescriptor) sendReceive(new AgentProxyMessageCreator(
+ return (ExecutionModuleDescriptor) sendReceive(new AgentMC(
"getExecutionModuleDescriptor") {
public void setArguments(Message message) throws JMSException {
message.setStringProperty("moduleName", moduleName);
}
public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
- return ((SlcAgentDescriptor) sendReceive(new AgentProxyMessageCreator(
+ return ((SlcAgentDescriptor) sendReceive(new AgentMC(
"listExecutionModuleDescriptors"))).getModuleDescriptors();
}
- protected Object sendReceive(AgentProxyMessageCreator messageCreator) {
+ public void runSlcExecution(SlcExecution slcExecution) {
+ sendReceive(new AgentMC("runSlcExecution", slcExecution));
+ }
+
+ public boolean ping() {
+ Object response = sendReceive(new AgentMC("ping"), false);
+ if (response == null)
+ return false;
+ else {
+ ExecutionAnswer answer = (ExecutionAnswer) response;
+ return ExecutionAnswer.OK.equals(answer.getStatus());
+ }
+ }
+
+ protected Object sendReceive(AgentMC messageCreator) {
+ return sendReceive(messageCreator, true);
+ }
+
+ /**
+ * @param timeoutException
+ * if true throws an exception if reception timeouted, else
+ * return null
+ */
+ protected Object sendReceive(AgentMC messageCreator,
+ boolean timeoutException) {
String correlationId = UUID.randomUUID().toString();
messageCreator.setCorrelationId(correlationId);
send(messageCreator);
- return processResponse(correlationId);
+
+ Object response = processResponse(messageCreator, timeoutException);
+
+ if (response instanceof ExecutionAnswer) {
+ ExecutionAnswer answer = (ExecutionAnswer) response;
+ if (ExecutionAnswer.ERROR.equals(answer.getStatus()))
+ throw new SlcException("Execution of '"
+ + messageCreator.getQuery() + "' failed on the agent "
+ + agentUuid + ": " + answer.getMessage()
+ + " (correlationId=" + correlationId + ")");
+ else
+ return answer;
+ } else {
+ return response;
+ }
}
- protected void send(AgentProxyMessageCreator messageCreator) {
+ protected void send(AgentMC messageCreator) {
jmsTemplate.send(requestDestination, messageCreator);
if (log.isDebugEnabled())
- log.debug("Sent request" + messageCreator.getQuery() + " to agent "
- + agentUuid + " with correlationId "
- + messageCreator.getCorrelationId());
+ log.debug("Sent query '" + messageCreator.getQuery()
+ + "' with correlationId "
+ + messageCreator.getCorrelationId() + " to agent "
+ + agentUuid);
}
- protected Object processResponse(String correlationId) {
+ protected Object processResponse(AgentMC messageCreator,
+ boolean timeoutException) {
+ String correlationId = messageCreator.getCorrelationId();
+ String query = messageCreator.getQuery();
+ Message responseMsg = null;
+ try {
+ responseMsg = jmsTemplate.receiveSelected(responseDestination,
+ "JMSCorrelationID='" + correlationId + "'");
+ } catch (Exception e) {
+ throw new SlcException("Could not receive response from agent "
+ + agentUuid + " with correlationId " + correlationId
+ + " (query '" + query + "')", e);
+ }
+
+ if (responseMsg == null) {// timeout
+ if (timeoutException)
+ throw new SlcException("TIMEOUT: Query '" + query + "'"
+ + " with correlationId " + correlationId
+ + " sent to agent " + agentUuid + " timed out.");
+ else
+ return null;
+ }
+ if (log.isDebugEnabled())
+ log.debug("Received response for query '" + query
+ + "' with correlationId " + correlationId + " from agent "
+ + agentUuid);
+
try {
- Message responseMsg = jmsTemplate.receiveSelected(
- responseDestination, "JMSCorrelationID='" + correlationId
- + "'");
- if (log.isDebugEnabled())
- log.debug("Received response with correlationId "
- + correlationId);
return messageConverter.fromMessage(responseMsg);
} catch (Exception e) {
- throw new SlcException("Could not process response from agent "
- + agentUuid + " with correlationId " + correlationId, e);
+ throw new SlcException("Could not convert response from agent "
+ + agentUuid + " with correlationId " + correlationId
+ + " (query '" + query + "')", e);
}
}
- protected class AgentProxyMessageCreator implements MessageCreator {
+ protected class AgentMC implements MessageCreator {
private final String query;
+ private Object body = null;
private String correlationId;
- public AgentProxyMessageCreator(String query) {
+ public AgentMC(String query) {
+ this.query = query;
+ }
+
+ public AgentMC(String query, Object body) {
this.query = query;
+ this.body = body;
}
public final Message createMessage(Session session) throws JMSException {
throw new SlcException("Agent UUID not set");
if (correlationId == null)
throw new SlcException("JMSCorrelationID not set");
- TextMessage msg = session.createTextMessage();
+ final Message msg;
+ if (body == null)
+ msg = session.createTextMessage();
+ else
+ msg = messageConverter.toMessage(body, session);
msg.setStringProperty(JmsAgent.PROPERTY_SLC_AGENT_ID, agentUuid);
msg.setStringProperty(JmsAgent.PROPERTY_QUERY, query);
msg.setJMSCorrelationID(correlationId);