]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgent.java
Improve SystemCall
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgent.java
1 package org.argeo.slc.jms;
2
3 import java.net.InetAddress;
4 import java.net.UnknownHostException;
5 import java.util.ArrayList;
6 import java.util.List;
7 import java.util.UUID;
8
9 import javax.jms.Destination;
10 import javax.jms.JMSException;
11 import javax.jms.Message;
12 import javax.jms.MessageListener;
13
14 import org.apache.commons.logging.Log;
15 import org.apache.commons.logging.LogFactory;
16 import org.argeo.slc.SlcException;
17 import org.argeo.slc.core.runtime.AbstractAgent;
18 import org.argeo.slc.execution.ExecutionModule;
19 import org.argeo.slc.execution.ExecutionModuleDescriptor;
20 import org.argeo.slc.msg.ExecutionAnswer;
21 import org.argeo.slc.msg.MsgConstants;
22 import org.argeo.slc.msg.ReferenceList;
23 import org.argeo.slc.process.SlcExecution;
24 import org.argeo.slc.runtime.SlcAgent;
25 import org.argeo.slc.runtime.SlcAgentDescriptor;
26 import org.springframework.beans.factory.DisposableBean;
27 import org.springframework.beans.factory.InitializingBean;
28 import org.springframework.jms.JmsException;
29 import org.springframework.jms.core.JmsTemplate;
30 import org.springframework.jms.core.MessagePostProcessor;
31
32 /** JMS based implementation of SLC Agent. */
33 public class JmsAgent extends AbstractAgent implements SlcAgent,
34 InitializingBean, DisposableBean, MessageListener {
35 public final static String PROPERTY_QUERY = "query";
36 public final static String QUERY_PING_ALL = "pingAll";
37
38 private final static Log log = LogFactory.getLog(JmsAgent.class);
39
40 private final SlcAgentDescriptor agentDescriptor;
41 private JmsTemplate jmsTemplate;
42 private Destination agentRegister;
43 private Destination agentUnregister;
44
45 private Destination responseDestination;
46
47 public JmsAgent() {
48 try {
49 agentDescriptor = new SlcAgentDescriptor();
50 agentDescriptor.setUuid(UUID.randomUUID().toString());
51 agentDescriptor.setHost(InetAddress.getLocalHost().getHostName());
52 } catch (UnknownHostException e) {
53 throw new SlcException("Unable to create agent descriptor.", e);
54 }
55 }
56
57 public void afterPropertiesSet() throws Exception {
58 try {
59 jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
60 log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
61 + agentRegister);
62 } catch (JmsException e) {
63 log
64 .warn("Could not register agent "
65 + agentDescriptor.getUuid()
66 + " to server: "
67 + e.getMessage()
68 + ". The agent will stay offline but will keep listening for a ping all sent by server.");
69 if (log.isTraceEnabled())
70 log.debug("Original error.", e);
71 }
72 }
73
74 public void destroy() throws Exception {
75 try {
76 jmsTemplate.convertAndSend(agentUnregister, agentDescriptor);
77 log.info("Agent #" + agentDescriptor.getUuid()
78 + " unregistered from " + agentUnregister);
79 } catch (JmsException e) {
80 log.warn("Could not unregister agent " + agentDescriptor.getUuid()
81 + ": " + e.getMessage());
82 if (log.isTraceEnabled())
83 log.debug("Original error.", e);
84 }
85 }
86
87 public void setAgentRegister(Destination agentRegister) {
88 this.agentRegister = agentRegister;
89 }
90
91 public void setAgentUnregister(Destination agentUnregister) {
92 this.agentUnregister = agentUnregister;
93 }
94
95 public String getMessageSelector() {
96 String messageSelector = "slc_agentId='" + agentDescriptor.getUuid()
97 + "'";
98 // if (log.isDebugEnabled())
99 // log.debug("Message selector: " + messageSelector);
100 return messageSelector;
101 }
102
103 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
104 String moduleName, String version) {
105 return getModulesManager().getExecutionModuleDescriptor(moduleName,
106 version);
107 }
108
109 public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
110 List<ExecutionModule> modules = getModulesManager()
111 .listExecutionModules();
112
113 List<ExecutionModuleDescriptor> descriptors = new ArrayList<ExecutionModuleDescriptor>();
114 for (ExecutionModule module : modules) {
115 ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
116 md.setName(module.getName());
117 md.setVersion(module.getVersion());
118 descriptors.add(md);
119 }
120 return descriptors;
121 }
122
123 public boolean ping() {
124 return true;
125 }
126
127 public void onMessage(final Message message) {
128 final String query;
129 final String correlationId;
130 try {
131 query = message.getStringProperty(PROPERTY_QUERY);
132 correlationId = message.getJMSCorrelationID();
133 } catch (JMSException e1) {
134 throw new SlcException("Cannot analyze incoming message " + message);
135 }
136
137 final Object response;
138 final Destination destinationSend;
139 if (QUERY_PING_ALL.equals(query)) {
140 ReferenceList refList = (ReferenceList) convertFrom(message);
141 if (!refList.getReferences().contains(agentDescriptor.getUuid())) {
142 response = agentDescriptor;
143 destinationSend = agentRegister;
144 log.info("Agent #" + agentDescriptor.getUuid()
145 + " registering to " + agentRegister
146 + " in reply to a " + QUERY_PING_ALL + " query");
147 } else {
148 return;
149 }
150 } else {
151 response = process(query, message);
152 destinationSend = responseDestination;
153 }
154
155 // Send response
156 jmsTemplate.convertAndSend(destinationSend, response,
157 new MessagePostProcessor() {
158 public Message postProcessMessage(Message messageToSend)
159 throws JMSException {
160 messageToSend.setStringProperty(PROPERTY_QUERY, query);
161 messageToSend.setStringProperty(MsgConstants.PROPERTY_SLC_AGENT_ID,
162 agentDescriptor.getUuid());
163 messageToSend.setJMSCorrelationID(correlationId);
164 return messageToSend;
165 }
166 });
167 if (log.isTraceEnabled())
168 log.debug("Sent response to query '" + query
169 + "' with correlationId " + correlationId);
170 }
171
172 /** @return response */
173 public Object process(String query, Message message) {
174 try {
175 if ("getExecutionModuleDescriptor".equals(query)) {
176 String moduleName = message.getStringProperty("moduleName");
177 String version = message.getStringProperty("version");
178 return getExecutionModuleDescriptor(moduleName, version);
179 } else if ("listExecutionModuleDescriptors".equals(query)) {
180
181 List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
182 SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
183 agentDescriptor);
184 agentDescriptorToSend.setModuleDescriptors(lst);
185 return agentDescriptorToSend;
186 } else if ("runSlcExecution".equals(query)) {
187 final SlcExecution slcExecution = (SlcExecution) convertFrom(message);
188 new Thread() {
189 public void run() {
190 runSlcExecution(slcExecution);
191 }
192 }.start();
193 return ExecutionAnswer.ok("Execution started on agent "
194 + agentDescriptor.getUuid());
195 } else if ("ping".equals(query)) {
196 return ExecutionAnswer.ok("Agent " + agentDescriptor.getUuid()
197 + " is alive.");
198 } else {
199 throw new SlcException("Unsupported query " + query);
200 }
201 } catch (Exception e) {
202 log.error("Processing of query " + query + " failed", e);
203 return ExecutionAnswer.error(e);
204 }
205 }
206
207 protected Object convertFrom(Message message) {
208 try {
209 return jmsTemplate.getMessageConverter().fromMessage(message);
210 } catch (JMSException e) {
211 throw new SlcException("Cannot convert message", e);
212 }
213 }
214
215 public void setResponseDestination(Destination responseDestination) {
216 this.responseDestination = responseDestination;
217 }
218
219 public void setJmsTemplate(JmsTemplate jmsTemplate) {
220 this.jmsTemplate = jmsTemplate;
221 }
222
223 }