]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgentProxy.java
d3027c93dba717cd308fead2bd5ccf93b5901309
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgentProxy.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.jms;
18
19 import java.util.List;
20 import java.util.UUID;
21
22 import javax.jms.Destination;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.Session;
26 import javax.jms.TextMessage;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.argeo.slc.SlcException;
31 import org.argeo.slc.execution.ExecutionModuleDescriptor;
32 import org.argeo.slc.execution.ExecutionProcess;
33 import org.argeo.slc.msg.ExecutionAnswer;
34 import org.argeo.slc.msg.MsgConstants;
35 import org.argeo.slc.process.SlcExecution;
36 import org.argeo.slc.runtime.SlcAgent;
37 import org.argeo.slc.runtime.SlcAgentDescriptor;
38 import org.springframework.jms.core.JmsTemplate;
39 import org.springframework.jms.core.MessageCreator;
40
41 public class JmsAgentProxy implements SlcAgent {
42 private final static Log log = LogFactory.getLog(JmsAgentProxy.class);
43
44 private final String agentUuid;
45 private final Destination requestDestination;
46 private final Destination responseDestination;
47 private final JmsTemplate jmsTemplate;
48
49 public JmsAgentProxy(String agentUuid, Destination requestDestination,
50 Destination responseDestination, JmsTemplate jmsTemplate) {
51 this.agentUuid = agentUuid;
52 this.requestDestination = requestDestination;
53 this.responseDestination = responseDestination;
54 this.jmsTemplate = jmsTemplate;
55 }
56
57 public String getAgentUuid() {
58 return agentUuid;
59 }
60
61 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
62 final String moduleName, final String version) {
63 return (ExecutionModuleDescriptor) sendReceive(new AgentMC(
64 "getExecutionModuleDescriptor") {
65 public void setArguments(Message message) throws JMSException {
66 message.setStringProperty("moduleName", moduleName);
67 message.setStringProperty("version", version);
68 }
69 });
70 }
71
72 public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
73 return ((SlcAgentDescriptor) sendReceive(new AgentMC(
74 "listExecutionModuleDescriptors"))).getModuleDescriptors();
75 }
76
77 public void runSlcExecution(SlcExecution slcExecution) {
78 process(slcExecution);
79 }
80
81 public void process(ExecutionProcess executionProcess) {
82 if (!(executionProcess instanceof SlcExecution))
83 throw new SlcException("Unsupported process type "
84 + executionProcess.getClass());
85 sendReceive(new AgentMC("runSlcExecution",
86 (SlcExecution) executionProcess));
87 }
88
89 public boolean ping() {
90 Object response = sendReceive(new AgentMC("ping"), false);
91 if (response == null)
92 return false;
93 else {
94 ExecutionAnswer answer = (ExecutionAnswer) response;
95 return ExecutionAnswer.OK.equals(answer.getStatus());
96 }
97 }
98
99 protected Object sendReceive(AgentMC messageCreator) {
100 long begin = System.currentTimeMillis();
101 Object res;
102 try {
103 res = sendReceive(messageCreator, true);
104 } finally {
105 if (log.isTraceEnabled())
106 log.trace("To agent #" + agentUuid + " in "
107 + (System.currentTimeMillis() - begin) + " ms, query '"
108 + messageCreator.getQuery() + "', correlationId "
109 + messageCreator.getCorrelationId());
110 }
111 return res;
112 }
113
114 /**
115 * @param timeoutException
116 * if true throws an exception if reception timeouted, else
117 * return null
118 */
119 protected Object sendReceive(AgentMC messageCreator,
120 boolean timeoutException) {
121 String correlationId = UUID.randomUUID().toString();
122 messageCreator.setCorrelationId(correlationId);
123 send(messageCreator);
124
125 Object response = processResponse(messageCreator, timeoutException);
126
127 if (response instanceof ExecutionAnswer) {
128 ExecutionAnswer answer = (ExecutionAnswer) response;
129 if (ExecutionAnswer.ERROR.equals(answer.getStatus()))
130 throw new SlcException("Execution of '"
131 + messageCreator.getQuery() + "' failed on the agent "
132 + agentUuid + ": " + answer.getMessage()
133 + " (correlationId=" + correlationId + ")");
134 else
135 return answer;
136 } else {
137 return response;
138 }
139 }
140
141 protected void send(AgentMC messageCreator) {
142 jmsTemplate.send(requestDestination, messageCreator);
143 }
144
145 protected Object processResponse(AgentMC messageCreator,
146 boolean timeoutException) {
147 String correlationId = messageCreator.getCorrelationId();
148 String query = messageCreator.getQuery();
149 Message responseMsg = null;
150 try {
151 responseMsg = jmsTemplate.receiveSelected(responseDestination,
152 "JMSCorrelationID='" + correlationId + "'");
153 } catch (Exception e) {
154 throw new SlcException("Could not receive response from agent "
155 + agentUuid + " with correlationId " + correlationId
156 + " (query '" + query + "')", e);
157 }
158
159 if (responseMsg == null) {// timeout
160 if (timeoutException)
161 throw new SlcException("TIMEOUT: Query '" + query + "'"
162 + " with correlationId " + correlationId
163 + " sent to agent " + agentUuid + " timed out.");
164 else
165 return null;
166 }
167
168 try {
169 return fromMessage(responseMsg);
170 } catch (Exception e) {
171 throw new SlcException("Could not convert response from agent "
172 + agentUuid + " with correlationId " + correlationId
173 + " (query '" + query + "')", e);
174 }
175 }
176
177 protected Object fromMessage(Message message) throws JMSException {
178 return jmsTemplate.getMessageConverter().fromMessage(message);
179 }
180
181 protected Message toMessage(Object obj, Session session)
182 throws JMSException {
183 return jmsTemplate.getMessageConverter().toMessage(obj, session);
184 }
185
186 protected class AgentMC implements MessageCreator {
187 private final String query;
188 private Object body = null;
189 private String correlationId;
190
191 public AgentMC(String query) {
192 this.query = query;
193 }
194
195 public AgentMC(String query, Object body) {
196 this.query = query;
197 this.body = body;
198 }
199
200 public final Message createMessage(Session session) throws JMSException {
201 if (agentUuid == null)
202 throw new SlcException("Agent UUID not set");
203 if (correlationId == null)
204 throw new SlcException("JMSCorrelationID not set");
205 final Message msg;
206 if (body == null)
207 msg = session.createTextMessage();
208 else
209 msg = toMessage(body, session);
210 msg.setStringProperty(MsgConstants.PROPERTY_SLC_AGENT_ID, agentUuid);
211 msg.setStringProperty(JmsAgent.PROPERTY_QUERY, query);
212 msg.setJMSCorrelationID(correlationId);
213 setArguments(msg);
214 if (msg instanceof TextMessage) {
215 TextMessage textMessage = (TextMessage) msg;
216 if (textMessage.getText() == null) {
217 // TODO: remove workaround when upgrading to ActiveMQ 5.3
218 // Workaround for
219 // https://issues.apache.org/activemq/browse/AMQ-2046
220 textMessage.setText("");
221 }
222 }
223 return msg;
224 }
225
226 protected void setArguments(Message message) throws JMSException {
227 }
228
229 public String getQuery() {
230 return query;
231 }
232
233 public String getCorrelationId() {
234 return correlationId;
235 }
236
237 public void setCorrelationId(String correlationId) {
238 this.correlationId = correlationId;
239 }
240
241 }
242 }