]> git.argeo.org Git - gpl/argeo-slc.git/blob - JmsAgentProxy.java
8c08519371afdfe06c246c96775b3fbbaf892ef7
[gpl/argeo-slc.git] / JmsAgentProxy.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.jms;
18
19 import java.util.List;
20 import java.util.UUID;
21
22 import javax.jms.Destination;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.Session;
26 import javax.jms.TextMessage;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.argeo.slc.SlcException;
31 import org.argeo.slc.execution.ExecutionModuleDescriptor;
32 import org.argeo.slc.execution.ExecutionProcess;
33 import org.argeo.slc.msg.ExecutionAnswer;
34 import org.argeo.slc.msg.MsgConstants;
35 import org.argeo.slc.process.SlcExecution;
36 import org.argeo.slc.runtime.SlcAgent;
37 import org.argeo.slc.runtime.SlcAgentDescriptor;
38 import org.springframework.jms.core.JmsTemplate;
39 import org.springframework.jms.core.MessageCreator;
40
41 public class JmsAgentProxy implements SlcAgent {
42 private final static Log log = LogFactory.getLog(JmsAgentProxy.class);
43
44 private final String agentUuid;
45 private final Destination requestDestination;
46 private final Destination responseDestination;
47 private final JmsTemplate jmsTemplate;
48
49 public JmsAgentProxy(String agentUuid, Destination requestDestination,
50 Destination responseDestination, JmsTemplate jmsTemplate) {
51 this.agentUuid = agentUuid;
52 this.requestDestination = requestDestination;
53 this.responseDestination = responseDestination;
54 this.jmsTemplate = jmsTemplate;
55 }
56
57 public String getAgentUuid() {
58 return agentUuid;
59 }
60
61 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
62 final String moduleName, final String version) {
63 return (ExecutionModuleDescriptor) sendReceive(new AgentMC(
64 "getExecutionModuleDescriptor") {
65 public void setArguments(Message message) throws JMSException {
66 message.setStringProperty("moduleName", moduleName);
67 message.setStringProperty("version", version);
68 }
69 });
70 }
71
72 public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
73 return ((SlcAgentDescriptor) sendReceive(new AgentMC(
74 "listExecutionModuleDescriptors"))).getModuleDescriptors();
75 }
76
77 public void runSlcExecution(SlcExecution slcExecution) {
78 process(slcExecution);
79 }
80
81 public void process(ExecutionProcess executionProcess) {
82 if (!(executionProcess instanceof SlcExecution))
83 throw new SlcException("Unsupported process type "
84 + executionProcess.getClass());
85 sendReceive(new AgentMC("runSlcExecution",
86 (SlcExecution) executionProcess));
87 }
88
89 public boolean ping() {
90 Object response = sendReceive(new AgentMC("ping"), false);
91 if (response == null)
92 return false;
93 else {
94 ExecutionAnswer answer = (ExecutionAnswer) response;
95 return ExecutionAnswer.OK.equals(answer.getStatus());
96 }
97 }
98
99 public void kill(ExecutionProcess process) {
100 throw new UnsupportedOperationException();
101 }
102
103 protected Object sendReceive(AgentMC messageCreator) {
104 long begin = System.currentTimeMillis();
105 Object res;
106 try {
107 res = sendReceive(messageCreator, true);
108 } finally {
109 if (log.isTraceEnabled())
110 log.trace("To agent #" + agentUuid + " in "
111 + (System.currentTimeMillis() - begin) + " ms, query '"
112 + messageCreator.getQuery() + "', correlationId "
113 + messageCreator.getCorrelationId());
114 }
115 return res;
116 }
117
118 /**
119 * @param timeoutException
120 * if true throws an exception if reception timeouted, else
121 * return null
122 */
123 protected Object sendReceive(AgentMC messageCreator,
124 boolean timeoutException) {
125 String correlationId = UUID.randomUUID().toString();
126 messageCreator.setCorrelationId(correlationId);
127 send(messageCreator);
128
129 Object response = processResponse(messageCreator, timeoutException);
130
131 if (response instanceof ExecutionAnswer) {
132 ExecutionAnswer answer = (ExecutionAnswer) response;
133 if (ExecutionAnswer.ERROR.equals(answer.getStatus()))
134 throw new SlcException("Execution of '"
135 + messageCreator.getQuery() + "' failed on the agent "
136 + agentUuid + ": " + answer.getMessage()
137 + " (correlationId=" + correlationId + ")");
138 else
139 return answer;
140 } else {
141 return response;
142 }
143 }
144
145 protected void send(AgentMC messageCreator) {
146 jmsTemplate.send(requestDestination, messageCreator);
147 }
148
149 protected Object processResponse(AgentMC messageCreator,
150 boolean timeoutException) {
151 String correlationId = messageCreator.getCorrelationId();
152 String query = messageCreator.getQuery();
153 Message responseMsg = null;
154 try {
155 responseMsg = jmsTemplate.receiveSelected(responseDestination,
156 "JMSCorrelationID='" + correlationId + "'");
157 } catch (Exception e) {
158 throw new SlcException("Could not receive response from agent "
159 + agentUuid + " with correlationId " + correlationId
160 + " (query '" + query + "')", e);
161 }
162
163 if (responseMsg == null) {// timeout
164 if (timeoutException)
165 throw new SlcException("TIMEOUT: Query '" + query + "'"
166 + " with correlationId " + correlationId
167 + " sent to agent " + agentUuid + " timed out.");
168 else
169 return null;
170 }
171
172 try {
173 return fromMessage(responseMsg);
174 } catch (Exception e) {
175 throw new SlcException("Could not convert response from agent "
176 + agentUuid + " with correlationId " + correlationId
177 + " (query '" + query + "')", e);
178 }
179 }
180
181 protected Object fromMessage(Message message) throws JMSException {
182 return jmsTemplate.getMessageConverter().fromMessage(message);
183 }
184
185 protected Message toMessage(Object obj, Session session)
186 throws JMSException {
187 return jmsTemplate.getMessageConverter().toMessage(obj, session);
188 }
189
190 protected class AgentMC implements MessageCreator {
191 private final String query;
192 private Object body = null;
193 private String correlationId;
194
195 public AgentMC(String query) {
196 this.query = query;
197 }
198
199 public AgentMC(String query, Object body) {
200 this.query = query;
201 this.body = body;
202 }
203
204 public final Message createMessage(Session session) throws JMSException {
205 if (agentUuid == null)
206 throw new SlcException("Agent UUID not set");
207 if (correlationId == null)
208 throw new SlcException("JMSCorrelationID not set");
209 final Message msg;
210 if (body == null)
211 msg = session.createTextMessage();
212 else
213 msg = toMessage(body, session);
214 msg.setStringProperty(MsgConstants.PROPERTY_SLC_AGENT_ID, agentUuid);
215 msg.setStringProperty(JmsAgent.PROPERTY_QUERY, query);
216 msg.setJMSCorrelationID(correlationId);
217 setArguments(msg);
218 if (msg instanceof TextMessage) {
219 TextMessage textMessage = (TextMessage) msg;
220 if (textMessage.getText() == null) {
221 // TODO: remove workaround when upgrading to ActiveMQ 5.3
222 // Workaround for
223 // https://issues.apache.org/activemq/browse/AMQ-2046
224 textMessage.setText("");
225 }
226 }
227 return msg;
228 }
229
230 protected void setArguments(Message message) throws JMSException {
231 }
232
233 public String getQuery() {
234 return query;
235 }
236
237 public String getCorrelationId() {
238 return correlationId;
239 }
240
241 public void setCorrelationId(String correlationId) {
242 this.correlationId = correlationId;
243 }
244
245 }
246 }