]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgent.java
47e218301ff08b1c7eae374398a277c140c0eb10
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgent.java
1 package org.argeo.slc.jms;
2
3 import java.net.InetAddress;
4 import java.net.UnknownHostException;
5 import java.util.List;
6 import java.util.UUID;
7
8 import javax.jms.Destination;
9 import javax.jms.JMSException;
10 import javax.jms.Message;
11 import javax.jms.MessageListener;
12
13 import org.apache.commons.logging.Log;
14 import org.apache.commons.logging.LogFactory;
15 import org.argeo.slc.SlcException;
16 import org.argeo.slc.core.runtime.AbstractAgent;
17 import org.argeo.slc.execution.ExecutionModuleDescriptor;
18 import org.argeo.slc.msg.ExecutionAnswer;
19 import org.argeo.slc.msg.MsgConstants;
20 import org.argeo.slc.msg.ReferenceList;
21 import org.argeo.slc.process.SlcExecution;
22 import org.argeo.slc.runtime.SlcAgent;
23 import org.argeo.slc.runtime.SlcAgentDescriptor;
24 import org.springframework.beans.factory.DisposableBean;
25 import org.springframework.beans.factory.InitializingBean;
26 import org.springframework.jms.JmsException;
27 import org.springframework.jms.core.JmsTemplate;
28 import org.springframework.jms.core.MessagePostProcessor;
29
30 /** JMS based implementation of SLC Agent. */
31 public class JmsAgent extends AbstractAgent implements SlcAgent,
32 InitializingBean, DisposableBean, MessageListener {
33 public final static String PROPERTY_QUERY = "query";
34 public final static String QUERY_PING_ALL = "pingAll";
35
36 private final static Log log = LogFactory.getLog(JmsAgent.class);
37
38 private final SlcAgentDescriptor agentDescriptor;
39 private JmsTemplate jmsTemplate;
40 private Destination agentRegister;
41 private Destination agentUnregister;
42
43 private Destination responseDestination;
44
45 public JmsAgent() {
46 try {
47 agentDescriptor = new SlcAgentDescriptor();
48 agentDescriptor.setUuid(UUID.randomUUID().toString());
49 agentDescriptor.setHost(InetAddress.getLocalHost().getHostName());
50 } catch (UnknownHostException e) {
51 throw new SlcException("Unable to create agent descriptor.", e);
52 }
53 }
54
55 public void afterPropertiesSet() throws Exception {
56 try {
57 jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
58 log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
59 + agentRegister);
60 } catch (JmsException e) {
61 log
62 .warn("Could not register agent "
63 + agentDescriptor.getUuid()
64 + " to server: "
65 + e.getMessage()
66 + ". The agent will stay offline but will keep listening for a ping all sent by server.");
67 if (log.isTraceEnabled())
68 log.debug("Original error.", e);
69 }
70 }
71
72 public void destroy() throws Exception {
73 try {
74 jmsTemplate.convertAndSend(agentUnregister, agentDescriptor);
75 log.info("Agent #" + agentDescriptor.getUuid()
76 + " unregistered from " + agentUnregister);
77 } catch (JmsException e) {
78 log.warn("Could not unregister agent " + agentDescriptor.getUuid()
79 + ": " + e.getMessage());
80 if (log.isTraceEnabled())
81 log.debug("Original error.", e);
82 }
83 }
84
85 public void setAgentRegister(Destination agentRegister) {
86 this.agentRegister = agentRegister;
87 }
88
89 public void setAgentUnregister(Destination agentUnregister) {
90 this.agentUnregister = agentUnregister;
91 }
92
93 public String getMessageSelector() {
94 String messageSelector = "slc_agentId='" + agentDescriptor.getUuid()
95 + "'";
96 // if (log.isDebugEnabled())
97 // log.debug("Message selector: " + messageSelector);
98 return messageSelector;
99 }
100
101 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
102 String moduleName, String version) {
103 return getModulesManager().getExecutionModuleDescriptor(moduleName,
104 version);
105 }
106
107 public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
108 return getModulesManager().listExecutionModules();
109 }
110
111 public boolean ping() {
112 return true;
113 }
114
115 public void onMessage(final Message message) {
116 final String query;
117 final String correlationId;
118 try {
119 query = message.getStringProperty(PROPERTY_QUERY);
120 correlationId = message.getJMSCorrelationID();
121 } catch (JMSException e1) {
122 throw new SlcException("Cannot analyze incoming message " + message);
123 }
124
125 final Object response;
126 final Destination destinationSend;
127 if (QUERY_PING_ALL.equals(query)) {
128 ReferenceList refList = (ReferenceList) convertFrom(message);
129 if (!refList.getReferences().contains(agentDescriptor.getUuid())) {
130 response = agentDescriptor;
131 destinationSend = agentRegister;
132 log.info("Agent #" + agentDescriptor.getUuid()
133 + " registering to " + agentRegister
134 + " in reply to a " + QUERY_PING_ALL + " query");
135 } else {
136 return;
137 }
138 } else {
139 response = process(query, message);
140 destinationSend = responseDestination;
141 }
142
143 // Send response
144 jmsTemplate.convertAndSend(destinationSend, response,
145 new MessagePostProcessor() {
146 public Message postProcessMessage(Message messageToSend)
147 throws JMSException {
148 messageToSend.setStringProperty(PROPERTY_QUERY, query);
149 messageToSend.setStringProperty(
150 MsgConstants.PROPERTY_SLC_AGENT_ID,
151 agentDescriptor.getUuid());
152 messageToSend.setJMSCorrelationID(correlationId);
153 return messageToSend;
154 }
155 });
156 if (log.isTraceEnabled())
157 log.debug("Sent response to query '" + query
158 + "' with correlationId " + correlationId);
159 }
160
161 /** @return response */
162 public Object process(String query, Message message) {
163 try {
164 if ("getExecutionModuleDescriptor".equals(query)) {
165 String moduleName = message.getStringProperty("moduleName");
166 String version = message.getStringProperty("version");
167 return getExecutionModuleDescriptor(moduleName, version);
168 } else if ("listExecutionModuleDescriptors".equals(query)) {
169
170 List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
171 SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
172 agentDescriptor);
173 agentDescriptorToSend.setModuleDescriptors(lst);
174 return agentDescriptorToSend;
175 } else if ("runSlcExecution".equals(query)) {
176 final SlcExecution slcExecution = (SlcExecution) convertFrom(message);
177 new Thread() {
178 public void run() {
179 runSlcExecution(slcExecution);
180 }
181 }.start();
182 return ExecutionAnswer.ok("Execution started on agent "
183 + agentDescriptor.getUuid());
184 } else if ("ping".equals(query)) {
185 return ExecutionAnswer.ok("Agent " + agentDescriptor.getUuid()
186 + " is alive.");
187 } else {
188 throw new SlcException("Unsupported query " + query);
189 }
190 } catch (Exception e) {
191 log.error("Processing of query " + query + " failed", e);
192 return ExecutionAnswer.error(e);
193 }
194 }
195
196 protected Object convertFrom(Message message) {
197 try {
198 return jmsTemplate.getMessageConverter().fromMessage(message);
199 } catch (JMSException e) {
200 throw new SlcException("Cannot convert message", e);
201 }
202 }
203
204 public void setResponseDestination(Destination responseDestination) {
205 this.responseDestination = responseDestination;
206 }
207
208 public void setJmsTemplate(JmsTemplate jmsTemplate) {
209 this.jmsTemplate = jmsTemplate;
210 }
211
212 }