]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgent.java
Make JMS Agents more robust
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgent.java
1 package org.argeo.slc.jms;
2
3 import java.net.InetAddress;
4 import java.net.UnknownHostException;
5 import java.util.ArrayList;
6 import java.util.List;
7 import java.util.UUID;
8
9 import javax.jms.Destination;
10 import javax.jms.JMSException;
11 import javax.jms.Message;
12 import javax.jms.MessageListener;
13
14 import org.apache.commons.logging.Log;
15 import org.apache.commons.logging.LogFactory;
16 import org.argeo.slc.SlcException;
17 import org.argeo.slc.core.runtime.AbstractAgent;
18 import org.argeo.slc.execution.ExecutionModule;
19 import org.argeo.slc.execution.ExecutionModuleDescriptor;
20 import org.argeo.slc.msg.ExecutionAnswer;
21 import org.argeo.slc.msg.ReferenceList;
22 import org.argeo.slc.process.SlcExecution;
23 import org.argeo.slc.runtime.SlcAgent;
24 import org.argeo.slc.runtime.SlcAgentDescriptor;
25 import org.springframework.beans.factory.DisposableBean;
26 import org.springframework.beans.factory.InitializingBean;
27 import org.springframework.jms.JmsException;
28 import org.springframework.jms.core.JmsTemplate;
29 import org.springframework.jms.core.MessagePostProcessor;
30
31 /** JMS based implementation of SLC Agent. */
32 public class JmsAgent extends AbstractAgent implements SlcAgent,
33 InitializingBean, DisposableBean, MessageListener {
34 public final static String PROPERTY_QUERY = "query";
35 public final static String PROPERTY_SLC_AGENT_ID = "slc_agentId";
36
37 public final static String QUERY_PING_ALL = "pingAll";
38
39 private final static Log log = LogFactory.getLog(JmsAgent.class);
40
41 private final SlcAgentDescriptor agentDescriptor;
42 private JmsTemplate jmsTemplate;
43 private Destination agentRegister;
44 private Destination agentUnregister;
45
46 private Destination responseDestination;
47
48 public JmsAgent() {
49 try {
50 agentDescriptor = new SlcAgentDescriptor();
51 agentDescriptor.setUuid(UUID.randomUUID().toString());
52 agentDescriptor.setHost(InetAddress.getLocalHost().getHostName());
53 } catch (UnknownHostException e) {
54 throw new SlcException("Unable to create agent descriptor.", e);
55 }
56 }
57
58 public void afterPropertiesSet() throws Exception {
59 try {
60 jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
61 log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
62 + agentRegister);
63 } catch (JmsException e) {
64 log
65 .warn("Could not register agent "
66 + agentDescriptor.getUuid()
67 + " to server: "
68 + e.getMessage()
69 + ". The agent will stay offline but will keep listening for a ping all sent by server.");
70 if (log.isTraceEnabled())
71 log.debug("Original error.", e);
72 }
73 }
74
75 public void destroy() throws Exception {
76 try {
77 jmsTemplate.convertAndSend(agentUnregister, agentDescriptor);
78 log.info("Agent #" + agentDescriptor.getUuid()
79 + " unregistered from " + agentUnregister);
80 } catch (JmsException e) {
81 log.warn("Could not unregister agent " + agentDescriptor.getUuid()
82 + ": " + e.getMessage());
83 if (log.isTraceEnabled())
84 log.debug("Original error.", e);
85 }
86 }
87
88 public void setAgentRegister(Destination agentRegister) {
89 this.agentRegister = agentRegister;
90 }
91
92 public void setAgentUnregister(Destination agentUnregister) {
93 this.agentUnregister = agentUnregister;
94 }
95
96 public String getMessageSelector() {
97 String messageSelector = "slc_agentId='" + agentDescriptor.getUuid()
98 + "'";
99 // if (log.isDebugEnabled())
100 // log.debug("Message selector: " + messageSelector);
101 return messageSelector;
102 }
103
104 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
105 String moduleName, String version) {
106 return getModulesManager().getExecutionModuleDescriptor(moduleName,
107 version);
108 }
109
110 public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
111 List<ExecutionModule> modules = getModulesManager()
112 .listExecutionModules();
113
114 List<ExecutionModuleDescriptor> descriptors = new ArrayList<ExecutionModuleDescriptor>();
115 for (ExecutionModule module : modules) {
116 ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
117 md.setName(module.getName());
118 md.setVersion(module.getVersion());
119 descriptors.add(md);
120 }
121 return descriptors;
122 }
123
124 public boolean ping() {
125 return true;
126 }
127
128 public void onMessage(final Message message) {
129 final String query;
130 final String correlationId;
131 try {
132 query = message.getStringProperty(PROPERTY_QUERY);
133 correlationId = message.getJMSCorrelationID();
134 } catch (JMSException e1) {
135 throw new SlcException("Cannot analyze incoming message " + message);
136 }
137
138 final Object response;
139 final Destination destinationSend;
140 if (QUERY_PING_ALL.equals(query)) {
141 ReferenceList refList = (ReferenceList) convertFrom(message);
142 if (!refList.getReferences().contains(agentDescriptor.getUuid())) {
143 response = agentDescriptor;
144 destinationSend = agentRegister;
145 log.info("Agent #" + agentDescriptor.getUuid()
146 + " registering to " + agentRegister
147 + " in reply to a " + QUERY_PING_ALL + " query");
148 } else {
149 return;
150 }
151 } else {
152 response = process(query, message);
153 destinationSend = responseDestination;
154 }
155
156 // Send response
157 jmsTemplate.convertAndSend(destinationSend, response,
158 new MessagePostProcessor() {
159 public Message postProcessMessage(Message messageToSend)
160 throws JMSException {
161 messageToSend.setStringProperty(PROPERTY_QUERY, query);
162 messageToSend.setStringProperty(PROPERTY_SLC_AGENT_ID,
163 agentDescriptor.getUuid());
164 messageToSend.setJMSCorrelationID(correlationId);
165 return messageToSend;
166 }
167 });
168 if (log.isTraceEnabled())
169 log.debug("Sent response to query '" + query
170 + "' with correlationId " + correlationId);
171 }
172
173 /** @return response */
174 public Object process(String query, Message message) {
175 try {
176 if ("getExecutionModuleDescriptor".equals(query)) {
177 String moduleName = message.getStringProperty("moduleName");
178 String version = message.getStringProperty("version");
179 return getExecutionModuleDescriptor(moduleName, version);
180 } else if ("listExecutionModuleDescriptors".equals(query)) {
181
182 List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
183 SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
184 agentDescriptor);
185 agentDescriptorToSend.setModuleDescriptors(lst);
186 return agentDescriptorToSend;
187 } else if ("runSlcExecution".equals(query)) {
188 final SlcExecution slcExecution = (SlcExecution) convertFrom(message);
189 new Thread() {
190 public void run() {
191 runSlcExecution(slcExecution);
192 }
193 }.start();
194 return ExecutionAnswer.ok("Execution started on agent "
195 + agentDescriptor.getUuid());
196 } else if ("ping".equals(query)) {
197 return ExecutionAnswer.ok("Agent " + agentDescriptor.getUuid()
198 + " is alive.");
199 } else {
200 throw new SlcException("Unsupported query " + query);
201 }
202 } catch (Exception e) {
203 log.error("Processing of query " + query + " failed", e);
204 return ExecutionAnswer.error(e);
205 }
206 }
207
208 protected Object convertFrom(Message message) {
209 try {
210 return jmsTemplate.getMessageConverter().fromMessage(message);
211 } catch (JMSException e) {
212 throw new SlcException("Cannot convert message", e);
213 }
214 }
215
216 public void setResponseDestination(Destination responseDestination) {
217 this.responseDestination = responseDestination;
218 }
219
220 public void setJmsTemplate(JmsTemplate jmsTemplate) {
221 this.jmsTemplate = jmsTemplate;
222 }
223
224 }