]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgentProxy.java
Introduce SLC RCP
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgentProxy.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.jms;
18
19 import java.util.List;
20 import java.util.UUID;
21
22 import javax.jms.Destination;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.Session;
26 import javax.jms.TextMessage;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.argeo.slc.SlcException;
31 import org.argeo.slc.execution.ExecutionModuleDescriptor;
32 import org.argeo.slc.msg.ExecutionAnswer;
33 import org.argeo.slc.msg.MsgConstants;
34 import org.argeo.slc.process.SlcExecution;
35 import org.argeo.slc.runtime.SlcAgent;
36 import org.argeo.slc.runtime.SlcAgentDescriptor;
37 import org.springframework.jms.core.JmsTemplate;
38 import org.springframework.jms.core.MessageCreator;
39
40 public class JmsAgentProxy implements SlcAgent {
41 private final static Log log = LogFactory.getLog(JmsAgentProxy.class);
42
43 private final String agentUuid;
44 private final Destination requestDestination;
45 private final Destination responseDestination;
46 private final JmsTemplate jmsTemplate;
47
48 public JmsAgentProxy(String agentUuid, Destination requestDestination,
49 Destination responseDestination, JmsTemplate jmsTemplate) {
50 this.agentUuid = agentUuid;
51 this.requestDestination = requestDestination;
52 this.responseDestination = responseDestination;
53 this.jmsTemplate = jmsTemplate;
54 }
55
56 public String getAgentUuid() {
57 return agentUuid;
58 }
59
60 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
61 final String moduleName, final String version) {
62 return (ExecutionModuleDescriptor) sendReceive(new AgentMC(
63 "getExecutionModuleDescriptor") {
64 public void setArguments(Message message) throws JMSException {
65 message.setStringProperty("moduleName", moduleName);
66 message.setStringProperty("version", version);
67 }
68 });
69 }
70
71 public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
72 return ((SlcAgentDescriptor) sendReceive(new AgentMC(
73 "listExecutionModuleDescriptors"))).getModuleDescriptors();
74 }
75
76 public void runSlcExecution(SlcExecution slcExecution) {
77 sendReceive(new AgentMC("runSlcExecution", slcExecution));
78 }
79
80 public boolean ping() {
81 Object response = sendReceive(new AgentMC("ping"), false);
82 if (response == null)
83 return false;
84 else {
85 ExecutionAnswer answer = (ExecutionAnswer) response;
86 return ExecutionAnswer.OK.equals(answer.getStatus());
87 }
88 }
89
90 protected Object sendReceive(AgentMC messageCreator) {
91 long begin = System.currentTimeMillis();
92 Object res;
93 try {
94 res = sendReceive(messageCreator, true);
95 } finally {
96 if (log.isTraceEnabled())
97 log.trace("To agent #" + agentUuid + " in "
98 + (System.currentTimeMillis() - begin) + " ms, query '"
99 + messageCreator.getQuery() + "', correlationId "
100 + messageCreator.getCorrelationId());
101 }
102 return res;
103 }
104
105 /**
106 * @param timeoutException
107 * if true throws an exception if reception timeouted, else
108 * return null
109 */
110 protected Object sendReceive(AgentMC messageCreator,
111 boolean timeoutException) {
112 String correlationId = UUID.randomUUID().toString();
113 messageCreator.setCorrelationId(correlationId);
114 send(messageCreator);
115
116 Object response = processResponse(messageCreator, timeoutException);
117
118 if (response instanceof ExecutionAnswer) {
119 ExecutionAnswer answer = (ExecutionAnswer) response;
120 if (ExecutionAnswer.ERROR.equals(answer.getStatus()))
121 throw new SlcException("Execution of '"
122 + messageCreator.getQuery() + "' failed on the agent "
123 + agentUuid + ": " + answer.getMessage()
124 + " (correlationId=" + correlationId + ")");
125 else
126 return answer;
127 } else {
128 return response;
129 }
130 }
131
132 protected void send(AgentMC messageCreator) {
133 jmsTemplate.send(requestDestination, messageCreator);
134 }
135
136 protected Object processResponse(AgentMC messageCreator,
137 boolean timeoutException) {
138 String correlationId = messageCreator.getCorrelationId();
139 String query = messageCreator.getQuery();
140 Message responseMsg = null;
141 try {
142 responseMsg = jmsTemplate.receiveSelected(responseDestination,
143 "JMSCorrelationID='" + correlationId + "'");
144 } catch (Exception e) {
145 throw new SlcException("Could not receive response from agent "
146 + agentUuid + " with correlationId " + correlationId
147 + " (query '" + query + "')", e);
148 }
149
150 if (responseMsg == null) {// timeout
151 if (timeoutException)
152 throw new SlcException("TIMEOUT: Query '" + query + "'"
153 + " with correlationId " + correlationId
154 + " sent to agent " + agentUuid + " timed out.");
155 else
156 return null;
157 }
158
159 try {
160 return fromMessage(responseMsg);
161 } catch (Exception e) {
162 throw new SlcException("Could not convert response from agent "
163 + agentUuid + " with correlationId " + correlationId
164 + " (query '" + query + "')", e);
165 }
166 }
167
168 protected Object fromMessage(Message message) throws JMSException {
169 return jmsTemplate.getMessageConverter().fromMessage(message);
170 }
171
172 protected Message toMessage(Object obj, Session session)
173 throws JMSException {
174 return jmsTemplate.getMessageConverter().toMessage(obj, session);
175 }
176
177 protected class AgentMC implements MessageCreator {
178 private final String query;
179 private Object body = null;
180 private String correlationId;
181
182 public AgentMC(String query) {
183 this.query = query;
184 }
185
186 public AgentMC(String query, Object body) {
187 this.query = query;
188 this.body = body;
189 }
190
191 public final Message createMessage(Session session) throws JMSException {
192 if (agentUuid == null)
193 throw new SlcException("Agent UUID not set");
194 if (correlationId == null)
195 throw new SlcException("JMSCorrelationID not set");
196 final Message msg;
197 if (body == null)
198 msg = session.createTextMessage();
199 else
200 msg = toMessage(body, session);
201 msg
202 .setStringProperty(MsgConstants.PROPERTY_SLC_AGENT_ID,
203 agentUuid);
204 msg.setStringProperty(JmsAgent.PROPERTY_QUERY, query);
205 msg.setJMSCorrelationID(correlationId);
206 setArguments(msg);
207 if (msg instanceof TextMessage) {
208 TextMessage textMessage = (TextMessage) msg;
209 if (textMessage.getText() == null) {
210 // TODO: remove workaround when upgrading to ActiveMQ 5.3
211 // Workaround for
212 // https://issues.apache.org/activemq/browse/AMQ-2046
213 textMessage.setText("");
214 }
215 }
216 return msg;
217 }
218
219 protected void setArguments(Message message) throws JMSException {
220 }
221
222 public String getQuery() {
223 return query;
224 }
225
226 public String getCorrelationId() {
227 return correlationId;
228 }
229
230 public void setCorrelationId(String correlationId) {
231 this.correlationId = correlationId;
232 }
233
234 }
235 }