2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org
.argeo
.slc
.jms
;
19 import java
.util
.List
;
20 import java
.util
.UUID
;
22 import javax
.jms
.Destination
;
23 import javax
.jms
.JMSException
;
24 import javax
.jms
.Message
;
25 import javax
.jms
.Session
;
26 import javax
.jms
.TextMessage
;
28 import org
.apache
.commons
.logging
.Log
;
29 import org
.apache
.commons
.logging
.LogFactory
;
30 import org
.argeo
.slc
.SlcException
;
31 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
32 import org
.argeo
.slc
.execution
.ExecutionProcess
;
33 import org
.argeo
.slc
.msg
.ExecutionAnswer
;
34 import org
.argeo
.slc
.msg
.MsgConstants
;
35 import org
.argeo
.slc
.process
.SlcExecution
;
36 import org
.argeo
.slc
.runtime
.SlcAgent
;
37 import org
.argeo
.slc
.runtime
.SlcAgentDescriptor
;
38 import org
.springframework
.jms
.core
.JmsTemplate
;
39 import org
.springframework
.jms
.core
.MessageCreator
;
41 public class JmsAgentProxy
implements SlcAgent
{
42 private final static Log log
= LogFactory
.getLog(JmsAgentProxy
.class);
44 private final String agentUuid
;
45 private final Destination requestDestination
;
46 private final Destination responseDestination
;
47 private final JmsTemplate jmsTemplate
;
49 public JmsAgentProxy(String agentUuid
, Destination requestDestination
,
50 Destination responseDestination
, JmsTemplate jmsTemplate
) {
51 this.agentUuid
= agentUuid
;
52 this.requestDestination
= requestDestination
;
53 this.responseDestination
= responseDestination
;
54 this.jmsTemplate
= jmsTemplate
;
57 public String
getAgentUuid() {
61 public ExecutionModuleDescriptor
getExecutionModuleDescriptor(
62 final String moduleName
, final String version
) {
63 return (ExecutionModuleDescriptor
) sendReceive(new AgentMC(
64 "getExecutionModuleDescriptor") {
65 public void setArguments(Message message
) throws JMSException
{
66 message
.setStringProperty("moduleName", moduleName
);
67 message
.setStringProperty("version", version
);
72 public List
<ExecutionModuleDescriptor
> listExecutionModuleDescriptors() {
73 return ((SlcAgentDescriptor
) sendReceive(new AgentMC(
74 "listExecutionModuleDescriptors"))).getModuleDescriptors();
77 public void runSlcExecution(SlcExecution slcExecution
) {
78 process(slcExecution
);
81 public void process(ExecutionProcess executionProcess
) {
82 if (!(executionProcess
instanceof SlcExecution
))
83 throw new SlcException("Unsupported process type "
84 + executionProcess
.getClass());
85 sendReceive(new AgentMC("runSlcExecution",
86 (SlcExecution
) executionProcess
));
89 public boolean ping() {
90 Object response
= sendReceive(new AgentMC("ping"), false);
94 ExecutionAnswer answer
= (ExecutionAnswer
) response
;
95 return ExecutionAnswer
.OK
.equals(answer
.getStatus());
99 public void kill(ExecutionProcess process
) {
100 throw new UnsupportedOperationException();
103 protected Object
sendReceive(AgentMC messageCreator
) {
104 long begin
= System
.currentTimeMillis();
107 res
= sendReceive(messageCreator
, true);
109 if (log
.isTraceEnabled())
110 log
.trace("To agent #" + agentUuid
+ " in "
111 + (System
.currentTimeMillis() - begin
) + " ms, query '"
112 + messageCreator
.getQuery() + "', correlationId "
113 + messageCreator
.getCorrelationId());
119 * @param timeoutException
120 * if true throws an exception if reception timeouted, else
123 protected Object
sendReceive(AgentMC messageCreator
,
124 boolean timeoutException
) {
125 String correlationId
= UUID
.randomUUID().toString();
126 messageCreator
.setCorrelationId(correlationId
);
127 send(messageCreator
);
129 Object response
= processResponse(messageCreator
, timeoutException
);
131 if (response
instanceof ExecutionAnswer
) {
132 ExecutionAnswer answer
= (ExecutionAnswer
) response
;
133 if (ExecutionAnswer
.ERROR
.equals(answer
.getStatus()))
134 throw new SlcException("Execution of '"
135 + messageCreator
.getQuery() + "' failed on the agent "
136 + agentUuid
+ ": " + answer
.getMessage()
137 + " (correlationId=" + correlationId
+ ")");
145 protected void send(AgentMC messageCreator
) {
146 jmsTemplate
.send(requestDestination
, messageCreator
);
149 protected Object
processResponse(AgentMC messageCreator
,
150 boolean timeoutException
) {
151 String correlationId
= messageCreator
.getCorrelationId();
152 String query
= messageCreator
.getQuery();
153 Message responseMsg
= null;
155 responseMsg
= jmsTemplate
.receiveSelected(responseDestination
,
156 "JMSCorrelationID='" + correlationId
+ "'");
157 } catch (Exception e
) {
158 throw new SlcException("Could not receive response from agent "
159 + agentUuid
+ " with correlationId " + correlationId
160 + " (query '" + query
+ "')", e
);
163 if (responseMsg
== null) {// timeout
164 if (timeoutException
)
165 throw new SlcException("TIMEOUT: Query '" + query
+ "'"
166 + " with correlationId " + correlationId
167 + " sent to agent " + agentUuid
+ " timed out.");
173 return fromMessage(responseMsg
);
174 } catch (Exception e
) {
175 throw new SlcException("Could not convert response from agent "
176 + agentUuid
+ " with correlationId " + correlationId
177 + " (query '" + query
+ "')", e
);
181 protected Object
fromMessage(Message message
) throws JMSException
{
182 return jmsTemplate
.getMessageConverter().fromMessage(message
);
185 protected Message
toMessage(Object obj
, Session session
)
186 throws JMSException
{
187 return jmsTemplate
.getMessageConverter().toMessage(obj
, session
);
190 protected class AgentMC
implements MessageCreator
{
191 private final String query
;
192 private Object body
= null;
193 private String correlationId
;
195 public AgentMC(String query
) {
199 public AgentMC(String query
, Object body
) {
204 public final Message
createMessage(Session session
) throws JMSException
{
205 if (agentUuid
== null)
206 throw new SlcException("Agent UUID not set");
207 if (correlationId
== null)
208 throw new SlcException("JMSCorrelationID not set");
211 msg
= session
.createTextMessage();
213 msg
= toMessage(body
, session
);
214 msg
.setStringProperty(MsgConstants
.PROPERTY_SLC_AGENT_ID
, agentUuid
);
215 msg
.setStringProperty(JmsAgent
.PROPERTY_QUERY
, query
);
216 msg
.setJMSCorrelationID(correlationId
);
218 if (msg
instanceof TextMessage
) {
219 TextMessage textMessage
= (TextMessage
) msg
;
220 if (textMessage
.getText() == null) {
221 // TODO: remove workaround when upgrading to ActiveMQ 5.3
223 // https://issues.apache.org/activemq/browse/AMQ-2046
224 textMessage
.setText("");
230 protected void setArguments(Message message
) throws JMSException
{
233 public String
getQuery() {
237 public String
getCorrelationId() {
238 return correlationId
;
241 public void setCorrelationId(String correlationId
) {
242 this.correlationId
= correlationId
;