]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgentProxy.java
Restructure OSGi launching
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgentProxy.java
1 package org.argeo.slc.jms;
2
3 import java.util.List;
4 import java.util.UUID;
5
6 import javax.jms.Destination;
7 import javax.jms.JMSException;
8 import javax.jms.Message;
9 import javax.jms.Session;
10 import javax.jms.TextMessage;
11
12 import org.apache.commons.logging.Log;
13 import org.apache.commons.logging.LogFactory;
14 import org.argeo.slc.SlcException;
15 import org.argeo.slc.execution.ExecutionModuleDescriptor;
16 import org.argeo.slc.msg.ExecutionAnswer;
17 import org.argeo.slc.msg.MsgConstants;
18 import org.argeo.slc.process.SlcExecution;
19 import org.argeo.slc.runtime.SlcAgent;
20 import org.argeo.slc.runtime.SlcAgentDescriptor;
21 import org.springframework.jms.core.JmsTemplate;
22 import org.springframework.jms.core.MessageCreator;
23
24 public class JmsAgentProxy implements SlcAgent {
25 private final static Log log = LogFactory.getLog(JmsAgentProxy.class);
26
27 private final String agentUuid;
28 private final Destination requestDestination;
29 private final Destination responseDestination;
30 private final JmsTemplate jmsTemplate;
31
32 public JmsAgentProxy(String agentUuid, Destination requestDestination,
33 Destination responseDestination, JmsTemplate jmsTemplate) {
34 this.agentUuid = agentUuid;
35 this.requestDestination = requestDestination;
36 this.responseDestination = responseDestination;
37 this.jmsTemplate = jmsTemplate;
38 }
39
40 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
41 final String moduleName, final String version) {
42 return (ExecutionModuleDescriptor) sendReceive(new AgentMC(
43 "getExecutionModuleDescriptor") {
44 public void setArguments(Message message) throws JMSException {
45 message.setStringProperty("moduleName", moduleName);
46 message.setStringProperty("version", version);
47 }
48 });
49 }
50
51 public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
52 return ((SlcAgentDescriptor) sendReceive(new AgentMC(
53 "listExecutionModuleDescriptors"))).getModuleDescriptors();
54 }
55
56 public void runSlcExecution(SlcExecution slcExecution) {
57 sendReceive(new AgentMC("runSlcExecution", slcExecution));
58 }
59
60 public boolean ping() {
61 Object response = sendReceive(new AgentMC("ping"), false);
62 if (response == null)
63 return false;
64 else {
65 ExecutionAnswer answer = (ExecutionAnswer) response;
66 return ExecutionAnswer.OK.equals(answer.getStatus());
67 }
68 }
69
70 protected Object sendReceive(AgentMC messageCreator) {
71 return sendReceive(messageCreator, true);
72 }
73
74 /**
75 * @param timeoutException
76 * if true throws an exception if reception timeouted, else
77 * return null
78 */
79 protected Object sendReceive(AgentMC messageCreator,
80 boolean timeoutException) {
81 String correlationId = UUID.randomUUID().toString();
82 messageCreator.setCorrelationId(correlationId);
83 send(messageCreator);
84
85 Object response = processResponse(messageCreator, timeoutException);
86
87 if (response instanceof ExecutionAnswer) {
88 ExecutionAnswer answer = (ExecutionAnswer) response;
89 if (ExecutionAnswer.ERROR.equals(answer.getStatus()))
90 throw new SlcException("Execution of '"
91 + messageCreator.getQuery() + "' failed on the agent "
92 + agentUuid + ": " + answer.getMessage()
93 + " (correlationId=" + correlationId + ")");
94 else
95 return answer;
96 } else {
97 return response;
98 }
99 }
100
101 protected void send(AgentMC messageCreator) {
102 jmsTemplate.send(requestDestination, messageCreator);
103 if (log.isTraceEnabled())
104 log.debug("Sent query '" + messageCreator.getQuery()
105 + "' with correlationId "
106 + messageCreator.getCorrelationId() + " to agent "
107 + agentUuid);
108 }
109
110 protected Object processResponse(AgentMC messageCreator,
111 boolean timeoutException) {
112 String correlationId = messageCreator.getCorrelationId();
113 String query = messageCreator.getQuery();
114 Message responseMsg = null;
115 try {
116 responseMsg = jmsTemplate.receiveSelected(responseDestination,
117 "JMSCorrelationID='" + correlationId + "'");
118 } catch (Exception e) {
119 throw new SlcException("Could not receive response from agent "
120 + agentUuid + " with correlationId " + correlationId
121 + " (query '" + query + "')", e);
122 }
123
124 if (responseMsg == null) {// timeout
125 if (timeoutException)
126 throw new SlcException("TIMEOUT: Query '" + query + "'"
127 + " with correlationId " + correlationId
128 + " sent to agent " + agentUuid + " timed out.");
129 else
130 return null;
131 }
132 if (log.isTraceEnabled())
133 log.debug("Received response for query '" + query
134 + "' with correlationId " + correlationId + " from agent "
135 + agentUuid);
136
137 try {
138 return fromMessage(responseMsg);
139 } catch (Exception e) {
140 throw new SlcException("Could not convert response from agent "
141 + agentUuid + " with correlationId " + correlationId
142 + " (query '" + query + "')", e);
143 }
144 }
145
146 protected Object fromMessage(Message message) throws JMSException {
147 return jmsTemplate.getMessageConverter().fromMessage(message);
148 }
149
150 protected Message toMessage(Object obj, Session session)
151 throws JMSException {
152 return jmsTemplate.getMessageConverter().toMessage(obj, session);
153 }
154
155 protected class AgentMC implements MessageCreator {
156 private final String query;
157 private Object body = null;
158 private String correlationId;
159
160 public AgentMC(String query) {
161 this.query = query;
162 }
163
164 public AgentMC(String query, Object body) {
165 this.query = query;
166 this.body = body;
167 }
168
169 public final Message createMessage(Session session) throws JMSException {
170 if (agentUuid == null)
171 throw new SlcException("Agent UUID not set");
172 if (correlationId == null)
173 throw new SlcException("JMSCorrelationID not set");
174 final Message msg;
175 if (body == null)
176 msg = session.createTextMessage();
177 else
178 msg = toMessage(body, session);
179 msg.setStringProperty(MsgConstants.PROPERTY_SLC_AGENT_ID, agentUuid);
180 msg.setStringProperty(JmsAgent.PROPERTY_QUERY, query);
181 msg.setJMSCorrelationID(correlationId);
182 setArguments(msg);
183 if (msg instanceof TextMessage) {
184 TextMessage textMessage = (TextMessage) msg;
185 if (textMessage.getText() == null) {
186 // TODO: remove workaround when upgrading to ActiveMQ 5.3
187 // Workaround for
188 // https://issues.apache.org/activemq/browse/AMQ-2046
189 textMessage.setText("");
190 }
191 }
192 return msg;
193 }
194
195 protected void setArguments(Message message) throws JMSException {
196 }
197
198 public String getQuery() {
199 return query;
200 }
201
202 public String getCorrelationId() {
203 return correlationId;
204 }
205
206 public void setCorrelationId(String correlationId) {
207 this.correlationId = correlationId;
208 }
209
210 }
211 }