1 package org
.argeo
.slc
.jms
;
6 import javax
.jms
.Destination
;
7 import javax
.jms
.JMSException
;
8 import javax
.jms
.Message
;
9 import javax
.jms
.Session
;
10 import javax
.jms
.TextMessage
;
12 import org
.apache
.commons
.logging
.Log
;
13 import org
.apache
.commons
.logging
.LogFactory
;
14 import org
.argeo
.slc
.SlcException
;
15 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
16 import org
.argeo
.slc
.msg
.ExecutionAnswer
;
17 import org
.argeo
.slc
.msg
.MsgConstants
;
18 import org
.argeo
.slc
.process
.SlcExecution
;
19 import org
.argeo
.slc
.runtime
.SlcAgent
;
20 import org
.argeo
.slc
.runtime
.SlcAgentDescriptor
;
21 import org
.springframework
.jms
.core
.JmsTemplate
;
22 import org
.springframework
.jms
.core
.MessageCreator
;
24 public class JmsAgentProxy
implements SlcAgent
{
25 private final static Log log
= LogFactory
.getLog(JmsAgentProxy
.class);
27 private final String agentUuid
;
28 private final Destination requestDestination
;
29 private final Destination responseDestination
;
30 private final JmsTemplate jmsTemplate
;
32 public JmsAgentProxy(String agentUuid
, Destination requestDestination
,
33 Destination responseDestination
, JmsTemplate jmsTemplate
) {
34 this.agentUuid
= agentUuid
;
35 this.requestDestination
= requestDestination
;
36 this.responseDestination
= responseDestination
;
37 this.jmsTemplate
= jmsTemplate
;
40 public ExecutionModuleDescriptor
getExecutionModuleDescriptor(
41 final String moduleName
, final String version
) {
42 return (ExecutionModuleDescriptor
) sendReceive(new AgentMC(
43 "getExecutionModuleDescriptor") {
44 public void setArguments(Message message
) throws JMSException
{
45 message
.setStringProperty("moduleName", moduleName
);
46 message
.setStringProperty("version", version
);
51 public List
<ExecutionModuleDescriptor
> listExecutionModuleDescriptors() {
52 return ((SlcAgentDescriptor
) sendReceive(new AgentMC(
53 "listExecutionModuleDescriptors"))).getModuleDescriptors();
56 public void runSlcExecution(SlcExecution slcExecution
) {
57 sendReceive(new AgentMC("runSlcExecution", slcExecution
));
60 public boolean ping() {
61 Object response
= sendReceive(new AgentMC("ping"), false);
65 ExecutionAnswer answer
= (ExecutionAnswer
) response
;
66 return ExecutionAnswer
.OK
.equals(answer
.getStatus());
70 protected Object
sendReceive(AgentMC messageCreator
) {
71 return sendReceive(messageCreator
, true);
75 * @param timeoutException
76 * if true throws an exception if reception timeouted, else
79 protected Object
sendReceive(AgentMC messageCreator
,
80 boolean timeoutException
) {
81 String correlationId
= UUID
.randomUUID().toString();
82 messageCreator
.setCorrelationId(correlationId
);
85 Object response
= processResponse(messageCreator
, timeoutException
);
87 if (response
instanceof ExecutionAnswer
) {
88 ExecutionAnswer answer
= (ExecutionAnswer
) response
;
89 if (ExecutionAnswer
.ERROR
.equals(answer
.getStatus()))
90 throw new SlcException("Execution of '"
91 + messageCreator
.getQuery() + "' failed on the agent "
92 + agentUuid
+ ": " + answer
.getMessage()
93 + " (correlationId=" + correlationId
+ ")");
101 protected void send(AgentMC messageCreator
) {
102 jmsTemplate
.send(requestDestination
, messageCreator
);
103 if (log
.isTraceEnabled())
104 log
.debug("Sent query '" + messageCreator
.getQuery()
105 + "' with correlationId "
106 + messageCreator
.getCorrelationId() + " to agent "
110 protected Object
processResponse(AgentMC messageCreator
,
111 boolean timeoutException
) {
112 String correlationId
= messageCreator
.getCorrelationId();
113 String query
= messageCreator
.getQuery();
114 Message responseMsg
= null;
116 responseMsg
= jmsTemplate
.receiveSelected(responseDestination
,
117 "JMSCorrelationID='" + correlationId
+ "'");
118 } catch (Exception e
) {
119 throw new SlcException("Could not receive response from agent "
120 + agentUuid
+ " with correlationId " + correlationId
121 + " (query '" + query
+ "')", e
);
124 if (responseMsg
== null) {// timeout
125 if (timeoutException
)
126 throw new SlcException("TIMEOUT: Query '" + query
+ "'"
127 + " with correlationId " + correlationId
128 + " sent to agent " + agentUuid
+ " timed out.");
132 if (log
.isTraceEnabled())
133 log
.debug("Received response for query '" + query
134 + "' with correlationId " + correlationId
+ " from agent "
138 return fromMessage(responseMsg
);
139 } catch (Exception e
) {
140 throw new SlcException("Could not convert response from agent "
141 + agentUuid
+ " with correlationId " + correlationId
142 + " (query '" + query
+ "')", e
);
146 protected Object
fromMessage(Message message
) throws JMSException
{
147 return jmsTemplate
.getMessageConverter().fromMessage(message
);
150 protected Message
toMessage(Object obj
, Session session
)
151 throws JMSException
{
152 return jmsTemplate
.getMessageConverter().toMessage(obj
, session
);
155 protected class AgentMC
implements MessageCreator
{
156 private final String query
;
157 private Object body
= null;
158 private String correlationId
;
160 public AgentMC(String query
) {
164 public AgentMC(String query
, Object body
) {
169 public final Message
createMessage(Session session
) throws JMSException
{
170 if (agentUuid
== null)
171 throw new SlcException("Agent UUID not set");
172 if (correlationId
== null)
173 throw new SlcException("JMSCorrelationID not set");
176 msg
= session
.createTextMessage();
178 msg
= toMessage(body
, session
);
179 msg
.setStringProperty(MsgConstants
.PROPERTY_SLC_AGENT_ID
, agentUuid
);
180 msg
.setStringProperty(JmsAgent
.PROPERTY_QUERY
, query
);
181 msg
.setJMSCorrelationID(correlationId
);
183 if (msg
instanceof TextMessage
) {
184 TextMessage textMessage
= (TextMessage
) msg
;
185 if (textMessage
.getText() == null) {
186 // TODO: remove workaround when upgrading to ActiveMQ 5.3
188 // https://issues.apache.org/activemq/browse/AMQ-2046
189 textMessage
.setText("");
195 protected void setArguments(Message message
) throws JMSException
{
198 public String
getQuery() {
202 public String
getCorrelationId() {
203 return correlationId
;
206 public void setCorrelationId(String correlationId
) {
207 this.correlationId
= correlationId
;