]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgent.java
6882ebaa45b3e16bb7be69d8cf05fac52b285dfc
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgent.java
1 package org.argeo.slc.jms;
2
3 import java.net.InetAddress;
4 import java.net.UnknownHostException;
5 import java.util.ArrayList;
6 import java.util.List;
7 import java.util.UUID;
8
9 import javax.jms.Destination;
10 import javax.jms.JMSException;
11 import javax.jms.Message;
12 import javax.jms.MessageListener;
13 import javax.jms.TextMessage;
14
15 import org.apache.commons.logging.Log;
16 import org.apache.commons.logging.LogFactory;
17 import org.argeo.slc.SlcException;
18 import org.argeo.slc.core.runtime.AbstractAgent;
19 import org.argeo.slc.execution.ExecutionModule;
20 import org.argeo.slc.execution.ExecutionModuleDescriptor;
21 import org.argeo.slc.msg.ExecutionAnswer;
22 import org.argeo.slc.process.SlcExecution;
23 import org.argeo.slc.runtime.SlcAgent;
24 import org.argeo.slc.runtime.SlcAgentDescriptor;
25 import org.springframework.beans.factory.DisposableBean;
26 import org.springframework.beans.factory.InitializingBean;
27 import org.springframework.jms.core.JmsTemplate;
28 import org.springframework.jms.core.MessagePostProcessor;
29
30 /** JMS based implementation of SLC Agent. */
31 public class JmsAgent extends AbstractAgent implements SlcAgent,
32 InitializingBean, DisposableBean, MessageListener {
33 public final static String PROPERTY_QUERY = "query";
34 public final static String PROPERTY_SLC_AGENT_ID = "slc_agentId";
35
36 private final static Log log = LogFactory.getLog(JmsAgent.class);
37
38 private final SlcAgentDescriptor agentDescriptor;
39 private JmsTemplate jmsTemplate;
40 private Destination agentRegister;
41 private Destination agentUnregister;
42
43 private Destination responseDestination;
44
45 public JmsAgent() {
46 try {
47 agentDescriptor = new SlcAgentDescriptor();
48 agentDescriptor.setUuid(UUID.randomUUID().toString());
49 agentDescriptor.setHost(InetAddress.getLocalHost().getHostName());
50 } catch (UnknownHostException e) {
51 throw new SlcException("Unable to create agent descriptor.", e);
52 }
53 }
54
55 public void afterPropertiesSet() throws Exception {
56 jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
57 log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
58 + agentRegister);
59 }
60
61 public void destroy() throws Exception {
62 jmsTemplate.convertAndSend(agentUnregister, agentDescriptor);
63 log.info("Agent #" + agentDescriptor.getUuid() + " unregistered from "
64 + agentUnregister);
65 }
66
67 public void setAgentRegister(Destination agentRegister) {
68 this.agentRegister = agentRegister;
69 }
70
71 public void setAgentUnregister(Destination agentUnregister) {
72 this.agentUnregister = agentUnregister;
73 }
74
75 public String getMessageSelector() {
76 String messageSelector = "slc_agentId='" + agentDescriptor.getUuid()
77 + "'";
78 // if (log.isDebugEnabled())
79 // log.debug("Message selector: " + messageSelector);
80 return messageSelector;
81 }
82
83 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
84 String moduleName, String version) {
85 return getModulesManager().getExecutionModuleDescriptor(moduleName,
86 version);
87 }
88
89 public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
90 List<ExecutionModule> modules = getModulesManager()
91 .listExecutionModules();
92
93 List<ExecutionModuleDescriptor> descriptors = new ArrayList<ExecutionModuleDescriptor>();
94 for (ExecutionModule module : modules) {
95 ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
96 md.setName(module.getName());
97 md.setVersion(module.getVersion());
98 descriptors.add(md);
99 }
100 return descriptors;
101 }
102
103 public boolean ping() {
104 return true;
105 }
106
107 public void onMessage(final Message message) {
108 final String query;
109 final String correlationId;
110 try {
111 query = message.getStringProperty(PROPERTY_QUERY);
112 correlationId = message.getJMSCorrelationID();
113 } catch (JMSException e1) {
114 throw new SlcException("Cannot analyze incoming message " + message);
115 }
116
117 final Object response = process(query, message);
118
119 new Thread() {
120 public void run() {
121 // Send response
122 jmsTemplate.convertAndSend(responseDestination, response,
123 new MessagePostProcessor() {
124 public Message postProcessMessage(
125 Message messageToSend) throws JMSException {
126 messageToSend
127 .setStringProperty(
128 PROPERTY_QUERY,
129 message
130 .getStringProperty(PROPERTY_QUERY));
131 messageToSend.setStringProperty(
132 PROPERTY_SLC_AGENT_ID, agentDescriptor
133 .getUuid());
134 messageToSend.setJMSCorrelationID(message
135 .getJMSCorrelationID());
136 return messageToSend;
137 }
138 });
139 if (log.isDebugEnabled())
140 log.debug("Sent response to query " + query
141 + " with correlationId " + correlationId);
142 }
143 }.start();
144
145 }
146
147 /** @return response */
148 public Object process(String query, Message message) {
149 try {
150 if ("getExecutionModuleDescriptor".equals(query)) {
151 String moduleName = message.getStringProperty("moduleName");
152 String version = message.getStringProperty("version");
153 return getExecutionModuleDescriptor(moduleName, version);
154 } else if ("listExecutionModuleDescriptors".equals(query)) {
155
156 List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
157 SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
158 agentDescriptor);
159 agentDescriptorToSend.setModuleDescriptors(lst);
160 return agentDescriptorToSend;
161 } else if ("runSlcExecution".equals(query)) {
162 SlcExecution slcExecution = (SlcExecution) convertFrom(message);
163 runSlcExecution(slcExecution);
164 return ExecutionAnswer.ok("Execution started on agent "
165 + agentDescriptor.getUuid());
166 } else if ("ping".equals(query)) {
167 return ExecutionAnswer.ok("Agent " + agentDescriptor.getUuid()
168 + " is alive.");
169 } else {
170 throw new SlcException("Unsupported query " + query);
171 }
172 } catch (Exception e) {
173 log.error("Processing of query " + query + " failed", e);
174 return ExecutionAnswer.error(e);
175 }
176 }
177
178 protected Object convertFrom(Message message) throws JMSException {
179 return jmsTemplate.getMessageConverter().fromMessage(message);
180 }
181
182 public void setResponseDestination(Destination responseDestination) {
183 this.responseDestination = responseDestination;
184 }
185
186 public void setJmsTemplate(JmsTemplate jmsTemplate) {
187 this.jmsTemplate = jmsTemplate;
188 }
189
190 }