]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgentProxy.java
Add license headers
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgentProxy.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.jms;
18
19 import java.util.List;
20 import java.util.UUID;
21
22 import javax.jms.Destination;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.Session;
26 import javax.jms.TextMessage;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.argeo.slc.SlcException;
31 import org.argeo.slc.execution.ExecutionModuleDescriptor;
32 import org.argeo.slc.msg.ExecutionAnswer;
33 import org.argeo.slc.msg.MsgConstants;
34 import org.argeo.slc.process.SlcExecution;
35 import org.argeo.slc.runtime.SlcAgent;
36 import org.argeo.slc.runtime.SlcAgentDescriptor;
37 import org.springframework.jms.core.JmsTemplate;
38 import org.springframework.jms.core.MessageCreator;
39
40 public class JmsAgentProxy implements SlcAgent {
41 private final static Log log = LogFactory.getLog(JmsAgentProxy.class);
42
43 private final String agentUuid;
44 private final Destination requestDestination;
45 private final Destination responseDestination;
46 private final JmsTemplate jmsTemplate;
47
48 public JmsAgentProxy(String agentUuid, Destination requestDestination,
49 Destination responseDestination, JmsTemplate jmsTemplate) {
50 this.agentUuid = agentUuid;
51 this.requestDestination = requestDestination;
52 this.responseDestination = responseDestination;
53 this.jmsTemplate = jmsTemplate;
54 }
55
56 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
57 final String moduleName, final String version) {
58 return (ExecutionModuleDescriptor) sendReceive(new AgentMC(
59 "getExecutionModuleDescriptor") {
60 public void setArguments(Message message) throws JMSException {
61 message.setStringProperty("moduleName", moduleName);
62 message.setStringProperty("version", version);
63 }
64 });
65 }
66
67 public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
68 return ((SlcAgentDescriptor) sendReceive(new AgentMC(
69 "listExecutionModuleDescriptors"))).getModuleDescriptors();
70 }
71
72 public void runSlcExecution(SlcExecution slcExecution) {
73 sendReceive(new AgentMC("runSlcExecution", slcExecution));
74 }
75
76 public boolean ping() {
77 Object response = sendReceive(new AgentMC("ping"), false);
78 if (response == null)
79 return false;
80 else {
81 ExecutionAnswer answer = (ExecutionAnswer) response;
82 return ExecutionAnswer.OK.equals(answer.getStatus());
83 }
84 }
85
86 protected Object sendReceive(AgentMC messageCreator) {
87 return sendReceive(messageCreator, true);
88 }
89
90 /**
91 * @param timeoutException
92 * if true throws an exception if reception timeouted, else
93 * return null
94 */
95 protected Object sendReceive(AgentMC messageCreator,
96 boolean timeoutException) {
97 String correlationId = UUID.randomUUID().toString();
98 messageCreator.setCorrelationId(correlationId);
99 send(messageCreator);
100
101 Object response = processResponse(messageCreator, timeoutException);
102
103 if (response instanceof ExecutionAnswer) {
104 ExecutionAnswer answer = (ExecutionAnswer) response;
105 if (ExecutionAnswer.ERROR.equals(answer.getStatus()))
106 throw new SlcException("Execution of '"
107 + messageCreator.getQuery() + "' failed on the agent "
108 + agentUuid + ": " + answer.getMessage()
109 + " (correlationId=" + correlationId + ")");
110 else
111 return answer;
112 } else {
113 return response;
114 }
115 }
116
117 protected void send(AgentMC messageCreator) {
118 jmsTemplate.send(requestDestination, messageCreator);
119 if (log.isTraceEnabled())
120 log.debug("Sent query '" + messageCreator.getQuery()
121 + "' with correlationId "
122 + messageCreator.getCorrelationId() + " to agent "
123 + agentUuid);
124 }
125
126 protected Object processResponse(AgentMC messageCreator,
127 boolean timeoutException) {
128 String correlationId = messageCreator.getCorrelationId();
129 String query = messageCreator.getQuery();
130 Message responseMsg = null;
131 try {
132 responseMsg = jmsTemplate.receiveSelected(responseDestination,
133 "JMSCorrelationID='" + correlationId + "'");
134 } catch (Exception e) {
135 throw new SlcException("Could not receive response from agent "
136 + agentUuid + " with correlationId " + correlationId
137 + " (query '" + query + "')", e);
138 }
139
140 if (responseMsg == null) {// timeout
141 if (timeoutException)
142 throw new SlcException("TIMEOUT: Query '" + query + "'"
143 + " with correlationId " + correlationId
144 + " sent to agent " + agentUuid + " timed out.");
145 else
146 return null;
147 }
148 if (log.isTraceEnabled())
149 log.debug("Received response for query '" + query
150 + "' with correlationId " + correlationId + " from agent "
151 + agentUuid);
152
153 try {
154 return fromMessage(responseMsg);
155 } catch (Exception e) {
156 throw new SlcException("Could not convert response from agent "
157 + agentUuid + " with correlationId " + correlationId
158 + " (query '" + query + "')", e);
159 }
160 }
161
162 protected Object fromMessage(Message message) throws JMSException {
163 return jmsTemplate.getMessageConverter().fromMessage(message);
164 }
165
166 protected Message toMessage(Object obj, Session session)
167 throws JMSException {
168 return jmsTemplate.getMessageConverter().toMessage(obj, session);
169 }
170
171 protected class AgentMC implements MessageCreator {
172 private final String query;
173 private Object body = null;
174 private String correlationId;
175
176 public AgentMC(String query) {
177 this.query = query;
178 }
179
180 public AgentMC(String query, Object body) {
181 this.query = query;
182 this.body = body;
183 }
184
185 public final Message createMessage(Session session) throws JMSException {
186 if (agentUuid == null)
187 throw new SlcException("Agent UUID not set");
188 if (correlationId == null)
189 throw new SlcException("JMSCorrelationID not set");
190 final Message msg;
191 if (body == null)
192 msg = session.createTextMessage();
193 else
194 msg = toMessage(body, session);
195 msg.setStringProperty(MsgConstants.PROPERTY_SLC_AGENT_ID, agentUuid);
196 msg.setStringProperty(JmsAgent.PROPERTY_QUERY, query);
197 msg.setJMSCorrelationID(correlationId);
198 setArguments(msg);
199 if (msg instanceof TextMessage) {
200 TextMessage textMessage = (TextMessage) msg;
201 if (textMessage.getText() == null) {
202 // TODO: remove workaround when upgrading to ActiveMQ 5.3
203 // Workaround for
204 // https://issues.apache.org/activemq/browse/AMQ-2046
205 textMessage.setText("");
206 }
207 }
208 return msg;
209 }
210
211 protected void setArguments(Message message) throws JMSException {
212 }
213
214 public String getQuery() {
215 return query;
216 }
217
218 public String getCorrelationId() {
219 return correlationId;
220 }
221
222 public void setCorrelationId(String correlationId) {
223 this.correlationId = correlationId;
224 }
225
226 }
227 }