]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgentProxy.java
2ab9a38f869b893df6e6994355c33fe0cf16772e
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgentProxy.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.jms;
18
19 import java.util.List;
20 import java.util.UUID;
21
22 import javax.jms.Destination;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.Session;
26 import javax.jms.TextMessage;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.argeo.slc.SlcException;
31 import org.argeo.slc.execution.ExecutionModuleDescriptor;
32 import org.argeo.slc.msg.ExecutionAnswer;
33 import org.argeo.slc.msg.MsgConstants;
34 import org.argeo.slc.process.SlcExecution;
35 import org.argeo.slc.runtime.SlcAgent;
36 import org.argeo.slc.runtime.SlcAgentDescriptor;
37 import org.springframework.jms.core.JmsTemplate;
38 import org.springframework.jms.core.MessageCreator;
39
40 public class JmsAgentProxy implements SlcAgent {
41 private final static Log log = LogFactory.getLog(JmsAgentProxy.class);
42
43 private final String agentUuid;
44 private final Destination requestDestination;
45 private final Destination responseDestination;
46 private final JmsTemplate jmsTemplate;
47
48 public JmsAgentProxy(String agentUuid, Destination requestDestination,
49 Destination responseDestination, JmsTemplate jmsTemplate) {
50 this.agentUuid = agentUuid;
51 this.requestDestination = requestDestination;
52 this.responseDestination = responseDestination;
53 this.jmsTemplate = jmsTemplate;
54 }
55
56 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
57 final String moduleName, final String version) {
58 return (ExecutionModuleDescriptor) sendReceive(new AgentMC(
59 "getExecutionModuleDescriptor") {
60 public void setArguments(Message message) throws JMSException {
61 message.setStringProperty("moduleName", moduleName);
62 message.setStringProperty("version", version);
63 }
64 });
65 }
66
67 public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
68 return ((SlcAgentDescriptor) sendReceive(new AgentMC(
69 "listExecutionModuleDescriptors"))).getModuleDescriptors();
70 }
71
72 public void runSlcExecution(SlcExecution slcExecution) {
73 sendReceive(new AgentMC("runSlcExecution", slcExecution));
74 }
75
76 public boolean ping() {
77 Object response = sendReceive(new AgentMC("ping"), false);
78 if (response == null)
79 return false;
80 else {
81 ExecutionAnswer answer = (ExecutionAnswer) response;
82 return ExecutionAnswer.OK.equals(answer.getStatus());
83 }
84 }
85
86 protected Object sendReceive(AgentMC messageCreator) {
87 long begin = System.currentTimeMillis();
88 Object res;
89 try {
90 res = sendReceive(messageCreator, true);
91 } finally {
92 if (log.isTraceEnabled())
93 log.trace("To agent #" + agentUuid + " in "
94 + (System.currentTimeMillis() - begin) + " ms, query '"
95 + messageCreator.getQuery() + "', correlationId "
96 + messageCreator.getCorrelationId());
97 }
98 return res;
99 }
100
101 /**
102 * @param timeoutException
103 * if true throws an exception if reception timeouted, else
104 * return null
105 */
106 protected Object sendReceive(AgentMC messageCreator,
107 boolean timeoutException) {
108 String correlationId = UUID.randomUUID().toString();
109 messageCreator.setCorrelationId(correlationId);
110 send(messageCreator);
111
112 Object response = processResponse(messageCreator, timeoutException);
113
114 if (response instanceof ExecutionAnswer) {
115 ExecutionAnswer answer = (ExecutionAnswer) response;
116 if (ExecutionAnswer.ERROR.equals(answer.getStatus()))
117 throw new SlcException("Execution of '"
118 + messageCreator.getQuery() + "' failed on the agent "
119 + agentUuid + ": " + answer.getMessage()
120 + " (correlationId=" + correlationId + ")");
121 else
122 return answer;
123 } else {
124 return response;
125 }
126 }
127
128 protected void send(AgentMC messageCreator) {
129 jmsTemplate.send(requestDestination, messageCreator);
130 }
131
132 protected Object processResponse(AgentMC messageCreator,
133 boolean timeoutException) {
134 String correlationId = messageCreator.getCorrelationId();
135 String query = messageCreator.getQuery();
136 Message responseMsg = null;
137 try {
138 responseMsg = jmsTemplate.receiveSelected(responseDestination,
139 "JMSCorrelationID='" + correlationId + "'");
140 } catch (Exception e) {
141 throw new SlcException("Could not receive response from agent "
142 + agentUuid + " with correlationId " + correlationId
143 + " (query '" + query + "')", e);
144 }
145
146 if (responseMsg == null) {// timeout
147 if (timeoutException)
148 throw new SlcException("TIMEOUT: Query '" + query + "'"
149 + " with correlationId " + correlationId
150 + " sent to agent " + agentUuid + " timed out.");
151 else
152 return null;
153 }
154
155 try {
156 return fromMessage(responseMsg);
157 } catch (Exception e) {
158 throw new SlcException("Could not convert response from agent "
159 + agentUuid + " with correlationId " + correlationId
160 + " (query '" + query + "')", e);
161 }
162 }
163
164 protected Object fromMessage(Message message) throws JMSException {
165 return jmsTemplate.getMessageConverter().fromMessage(message);
166 }
167
168 protected Message toMessage(Object obj, Session session)
169 throws JMSException {
170 return jmsTemplate.getMessageConverter().toMessage(obj, session);
171 }
172
173 protected class AgentMC implements MessageCreator {
174 private final String query;
175 private Object body = null;
176 private String correlationId;
177
178 public AgentMC(String query) {
179 this.query = query;
180 }
181
182 public AgentMC(String query, Object body) {
183 this.query = query;
184 this.body = body;
185 }
186
187 public final Message createMessage(Session session) throws JMSException {
188 if (agentUuid == null)
189 throw new SlcException("Agent UUID not set");
190 if (correlationId == null)
191 throw new SlcException("JMSCorrelationID not set");
192 final Message msg;
193 if (body == null)
194 msg = session.createTextMessage();
195 else
196 msg = toMessage(body, session);
197 msg
198 .setStringProperty(MsgConstants.PROPERTY_SLC_AGENT_ID,
199 agentUuid);
200 msg.setStringProperty(JmsAgent.PROPERTY_QUERY, query);
201 msg.setJMSCorrelationID(correlationId);
202 setArguments(msg);
203 if (msg instanceof TextMessage) {
204 TextMessage textMessage = (TextMessage) msg;
205 if (textMessage.getText() == null) {
206 // TODO: remove workaround when upgrading to ActiveMQ 5.3
207 // Workaround for
208 // https://issues.apache.org/activemq/browse/AMQ-2046
209 textMessage.setText("");
210 }
211 }
212 return msg;
213 }
214
215 protected void setArguments(Message message) throws JMSException {
216 }
217
218 public String getQuery() {
219 return query;
220 }
221
222 public String getCorrelationId() {
223 return correlationId;
224 }
225
226 public void setCorrelationId(String correlationId) {
227 this.correlationId = correlationId;
228 }
229
230 }
231 }