]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgentProxy.java
Start working on serialized JMS
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgentProxy.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.jms;
18
19 import java.util.List;
20 import java.util.UUID;
21
22 import javax.jms.Destination;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.Session;
26 import javax.jms.TextMessage;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.argeo.slc.SlcException;
31 import org.argeo.slc.execution.ExecutionModuleDescriptor;
32 import org.argeo.slc.msg.ExecutionAnswer;
33 import org.argeo.slc.msg.MsgConstants;
34 import org.argeo.slc.process.SlcExecution;
35 import org.argeo.slc.runtime.SlcAgent;
36 import org.argeo.slc.runtime.SlcAgentDescriptor;
37 import org.springframework.jms.core.JmsTemplate;
38 import org.springframework.jms.core.MessageCreator;
39
40 public class JmsAgentProxy implements SlcAgent {
41 private final static Log log = LogFactory.getLog(JmsAgentProxy.class);
42
43 private final String agentUuid;
44 private final Destination requestDestination;
45 private final Destination responseDestination;
46 private final JmsTemplate jmsTemplate;
47
48 public JmsAgentProxy(String agentUuid, Destination requestDestination,
49 Destination responseDestination, JmsTemplate jmsTemplate) {
50 this.agentUuid = agentUuid;
51 this.requestDestination = requestDestination;
52 this.responseDestination = responseDestination;
53 this.jmsTemplate = jmsTemplate;
54 }
55
56 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
57 final String moduleName, final String version) {
58 return (ExecutionModuleDescriptor) sendReceive(new AgentMC(
59 "getExecutionModuleDescriptor") {
60 public void setArguments(Message message) throws JMSException {
61 message.setStringProperty("moduleName", moduleName);
62 message.setStringProperty("version", version);
63 }
64 });
65 }
66
67 public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
68 return ((SlcAgentDescriptor) sendReceive(new AgentMC(
69 "listExecutionModuleDescriptors"))).getModuleDescriptors();
70 }
71
72 public void runSlcExecution(SlcExecution slcExecution) {
73 sendReceive(new AgentMC("runSlcExecution", slcExecution));
74 }
75
76 public boolean ping() {
77 Object response = sendReceive(new AgentMC("ping"), false);
78 if (response == null)
79 return false;
80 else {
81 ExecutionAnswer answer = (ExecutionAnswer) response;
82 return ExecutionAnswer.OK.equals(answer.getStatus());
83 }
84 }
85
86 protected Object sendReceive(AgentMC messageCreator) {
87 long begin = System.currentTimeMillis();
88 Object res;
89 try {
90 res = sendReceive(messageCreator, true);
91 } finally {
92 if (log.isTraceEnabled())
93 log.trace("Agend proxy send/receive in "
94 + (System.currentTimeMillis() - begin) + " ms");
95 }
96 return res;
97 }
98
99 /**
100 * @param timeoutException
101 * if true throws an exception if reception timeouted, else
102 * return null
103 */
104 protected Object sendReceive(AgentMC messageCreator,
105 boolean timeoutException) {
106 String correlationId = UUID.randomUUID().toString();
107 messageCreator.setCorrelationId(correlationId);
108 send(messageCreator);
109
110 Object response = processResponse(messageCreator, timeoutException);
111
112 if (response instanceof ExecutionAnswer) {
113 ExecutionAnswer answer = (ExecutionAnswer) response;
114 if (ExecutionAnswer.ERROR.equals(answer.getStatus()))
115 throw new SlcException("Execution of '"
116 + messageCreator.getQuery() + "' failed on the agent "
117 + agentUuid + ": " + answer.getMessage()
118 + " (correlationId=" + correlationId + ")");
119 else
120 return answer;
121 } else {
122 return response;
123 }
124 }
125
126 protected void send(AgentMC messageCreator) {
127 jmsTemplate.send(requestDestination, messageCreator);
128 if (log.isTraceEnabled())
129 log.debug("Sent query '" + messageCreator.getQuery()
130 + "' with correlationId "
131 + messageCreator.getCorrelationId() + " to agent "
132 + agentUuid);
133 }
134
135 protected Object processResponse(AgentMC messageCreator,
136 boolean timeoutException) {
137 String correlationId = messageCreator.getCorrelationId();
138 String query = messageCreator.getQuery();
139 Message responseMsg = null;
140 try {
141 responseMsg = jmsTemplate.receiveSelected(responseDestination,
142 "JMSCorrelationID='" + correlationId + "'");
143 } catch (Exception e) {
144 throw new SlcException("Could not receive response from agent "
145 + agentUuid + " with correlationId " + correlationId
146 + " (query '" + query + "')", e);
147 }
148
149 if (responseMsg == null) {// timeout
150 if (timeoutException)
151 throw new SlcException("TIMEOUT: Query '" + query + "'"
152 + " with correlationId " + correlationId
153 + " sent to agent " + agentUuid + " timed out.");
154 else
155 return null;
156 }
157 if (log.isTraceEnabled())
158 log.debug("Received response for query '" + query
159 + "' with correlationId " + correlationId + " from agent "
160 + agentUuid);
161
162 try {
163 return fromMessage(responseMsg);
164 } catch (Exception e) {
165 throw new SlcException("Could not convert response from agent "
166 + agentUuid + " with correlationId " + correlationId
167 + " (query '" + query + "')", e);
168 }
169 }
170
171 protected Object fromMessage(Message message) throws JMSException {
172 return jmsTemplate.getMessageConverter().fromMessage(message);
173 }
174
175 protected Message toMessage(Object obj, Session session)
176 throws JMSException {
177 return jmsTemplate.getMessageConverter().toMessage(obj, session);
178 }
179
180 protected class AgentMC implements MessageCreator {
181 private final String query;
182 private Object body = null;
183 private String correlationId;
184
185 public AgentMC(String query) {
186 this.query = query;
187 }
188
189 public AgentMC(String query, Object body) {
190 this.query = query;
191 this.body = body;
192 }
193
194 public final Message createMessage(Session session) throws JMSException {
195 if (agentUuid == null)
196 throw new SlcException("Agent UUID not set");
197 if (correlationId == null)
198 throw new SlcException("JMSCorrelationID not set");
199 final Message msg;
200 if (body == null)
201 msg = session.createTextMessage();
202 else
203 msg = toMessage(body, session);
204 msg
205 .setStringProperty(MsgConstants.PROPERTY_SLC_AGENT_ID,
206 agentUuid);
207 msg.setStringProperty(JmsAgent.PROPERTY_QUERY, query);
208 msg.setJMSCorrelationID(correlationId);
209 setArguments(msg);
210 if (msg instanceof TextMessage) {
211 TextMessage textMessage = (TextMessage) msg;
212 if (textMessage.getText() == null) {
213 // TODO: remove workaround when upgrading to ActiveMQ 5.3
214 // Workaround for
215 // https://issues.apache.org/activemq/browse/AMQ-2046
216 textMessage.setText("");
217 }
218 }
219 return msg;
220 }
221
222 protected void setArguments(Message message) throws JMSException {
223 }
224
225 public String getQuery() {
226 return query;
227 }
228
229 public String getCorrelationId() {
230 return correlationId;
231 }
232
233 public void setCorrelationId(String correlationId) {
234 this.correlationId = correlationId;
235 }
236
237 }
238 }