]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgent.java
Improve agent naming
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgent.java
1 package org.argeo.slc.jms;
2
3 import java.net.InetAddress;
4 import java.net.UnknownHostException;
5 import java.util.List;
6 import java.util.UUID;
7
8 import javax.jms.Destination;
9 import javax.jms.JMSException;
10 import javax.jms.Message;
11 import javax.jms.MessageListener;
12
13 import org.apache.commons.logging.Log;
14 import org.apache.commons.logging.LogFactory;
15 import org.argeo.slc.SlcException;
16 import org.argeo.slc.core.runtime.DefaultAgent;
17 import org.argeo.slc.execution.ExecutionModuleDescriptor;
18 import org.argeo.slc.msg.ExecutionAnswer;
19 import org.argeo.slc.msg.MsgConstants;
20 import org.argeo.slc.msg.ReferenceList;
21 import org.argeo.slc.process.SlcExecution;
22 import org.argeo.slc.runtime.SlcAgentDescriptor;
23 import org.springframework.beans.factory.DisposableBean;
24 import org.springframework.beans.factory.InitializingBean;
25 import org.springframework.jms.JmsException;
26 import org.springframework.jms.core.JmsTemplate;
27 import org.springframework.jms.core.MessagePostProcessor;
28
29 /** JMS based implementation of SLC Agent. */
30 public class JmsAgent extends DefaultAgent implements InitializingBean,
31 DisposableBean, MessageListener {
32 public final static String PROPERTY_QUERY = "query";
33 public final static String QUERY_PING_ALL = "pingAll";
34
35 private final static Log log = LogFactory.getLog(JmsAgent.class);
36
37 private final SlcAgentDescriptor agentDescriptor;
38 private JmsTemplate jmsTemplate;
39 private Destination agentRegister;
40 private Destination agentUnregister;
41
42 private Destination responseDestination;
43
44 public JmsAgent() {
45 try {
46 agentDescriptor = new SlcAgentDescriptor();
47 agentDescriptor.setUuid(UUID.randomUUID().toString());
48 agentDescriptor.setHost(InetAddress.getLocalHost().getHostName());
49 } catch (UnknownHostException e) {
50 throw new SlcException("Unable to create agent descriptor.", e);
51 }
52 }
53
54 public void afterPropertiesSet() throws Exception {
55 try {
56 jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
57 log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
58 + agentRegister);
59 } catch (JmsException e) {
60 log
61 .warn("Could not register agent "
62 + agentDescriptor.getUuid()
63 + " to server: "
64 + e.getMessage()
65 + ". The agent will stay offline but will keep listening for a ping all sent by server.");
66 if (log.isTraceEnabled())
67 log.debug("Original error.", e);
68 }
69 }
70
71 public void destroy() throws Exception {
72 try {
73 jmsTemplate.convertAndSend(agentUnregister, agentDescriptor);
74 log.info("Agent #" + agentDescriptor.getUuid()
75 + " unregistered from " + agentUnregister);
76 } catch (JmsException e) {
77 log.warn("Could not unregister agent " + agentDescriptor.getUuid()
78 + ": " + e.getMessage());
79 if (log.isTraceEnabled())
80 log.debug("Original error.", e);
81 }
82 }
83
84 public void setAgentRegister(Destination agentRegister) {
85 this.agentRegister = agentRegister;
86 }
87
88 public void setAgentUnregister(Destination agentUnregister) {
89 this.agentUnregister = agentUnregister;
90 }
91
92 public String getMessageSelector() {
93 String messageSelector = "slc_agentId='" + agentDescriptor.getUuid()
94 + "'";
95 // if (log.isDebugEnabled())
96 // log.debug("Message selector: " + messageSelector);
97 return messageSelector;
98 }
99
100 public void onMessage(final Message message) {
101 final String query;
102 final String correlationId;
103 try {
104 query = message.getStringProperty(PROPERTY_QUERY);
105 correlationId = message.getJMSCorrelationID();
106 } catch (JMSException e1) {
107 throw new SlcException("Cannot analyze incoming message " + message);
108 }
109
110 final Object response;
111 final Destination destinationSend;
112 if (QUERY_PING_ALL.equals(query)) {
113 ReferenceList refList = (ReferenceList) convertFrom(message);
114 if (!refList.getReferences().contains(agentDescriptor.getUuid())) {
115 response = agentDescriptor;
116 destinationSend = agentRegister;
117 log.info("Agent #" + agentDescriptor.getUuid()
118 + " registering to " + agentRegister
119 + " in reply to a " + QUERY_PING_ALL + " query");
120 } else {
121 return;
122 }
123 } else {
124 response = process(query, message);
125 destinationSend = responseDestination;
126 }
127
128 // Send response
129 jmsTemplate.convertAndSend(destinationSend, response,
130 new MessagePostProcessor() {
131 public Message postProcessMessage(Message messageToSend)
132 throws JMSException {
133 messageToSend.setStringProperty(PROPERTY_QUERY, query);
134 messageToSend.setStringProperty(
135 MsgConstants.PROPERTY_SLC_AGENT_ID,
136 agentDescriptor.getUuid());
137 messageToSend.setJMSCorrelationID(correlationId);
138 return messageToSend;
139 }
140 });
141 if (log.isTraceEnabled())
142 log.debug("Sent response to query '" + query
143 + "' with correlationId " + correlationId);
144 }
145
146 /** @return response */
147 public Object process(String query, Message message) {
148 try {
149 if ("getExecutionModuleDescriptor".equals(query)) {
150 String moduleName = message.getStringProperty("moduleName");
151 String version = message.getStringProperty("version");
152 return getExecutionModuleDescriptor(moduleName, version);
153 } else if ("listExecutionModuleDescriptors".equals(query)) {
154
155 List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
156 SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
157 agentDescriptor);
158 agentDescriptorToSend.setModuleDescriptors(lst);
159 return agentDescriptorToSend;
160 } else if ("runSlcExecution".equals(query)) {
161 final SlcExecution slcExecution = (SlcExecution) convertFrom(message);
162 new Thread() {
163 public void run() {
164 runSlcExecution(slcExecution);
165 }
166 }.start();
167 return ExecutionAnswer.ok("Execution started on agent "
168 + agentDescriptor.getUuid());
169 } else if ("ping".equals(query)) {
170 return ExecutionAnswer.ok("Agent " + agentDescriptor.getUuid()
171 + " is alive.");
172 } else {
173 throw new SlcException("Unsupported query " + query);
174 }
175 } catch (Exception e) {
176 log.error("Processing of query " + query + " failed", e);
177 return ExecutionAnswer.error(e);
178 }
179 }
180
181 protected Object convertFrom(Message message) {
182 try {
183 return jmsTemplate.getMessageConverter().fromMessage(message);
184 } catch (JMSException e) {
185 throw new SlcException("Cannot convert message", e);
186 }
187 }
188
189 public void setResponseDestination(Destination responseDestination) {
190 this.responseDestination = responseDestination;
191 }
192
193 public void setJmsTemplate(JmsTemplate jmsTemplate) {
194 this.jmsTemplate = jmsTemplate;
195 }
196
197 }