1 package org
.argeo
.slc
.jms
;
3 import java
.net
.InetAddress
;
4 import java
.net
.UnknownHostException
;
5 import java
.util
.ArrayList
;
9 import javax
.jms
.Destination
;
10 import javax
.jms
.JMSException
;
11 import javax
.jms
.Message
;
12 import javax
.jms
.MessageListener
;
14 import org
.apache
.commons
.logging
.Log
;
15 import org
.apache
.commons
.logging
.LogFactory
;
16 import org
.argeo
.slc
.SlcException
;
17 import org
.argeo
.slc
.core
.runtime
.AbstractAgent
;
18 import org
.argeo
.slc
.execution
.ExecutionModule
;
19 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
20 import org
.argeo
.slc
.msg
.ExecutionAnswer
;
21 import org
.argeo
.slc
.msg
.ReferenceList
;
22 import org
.argeo
.slc
.process
.SlcExecution
;
23 import org
.argeo
.slc
.runtime
.SlcAgent
;
24 import org
.argeo
.slc
.runtime
.SlcAgentDescriptor
;
25 import org
.springframework
.beans
.factory
.DisposableBean
;
26 import org
.springframework
.beans
.factory
.InitializingBean
;
27 import org
.springframework
.jms
.JmsException
;
28 import org
.springframework
.jms
.core
.JmsTemplate
;
29 import org
.springframework
.jms
.core
.MessagePostProcessor
;
31 /** JMS based implementation of SLC Agent. */
32 public class JmsAgent
extends AbstractAgent
implements SlcAgent
,
33 InitializingBean
, DisposableBean
, MessageListener
{
34 public final static String PROPERTY_QUERY
= "query";
35 public final static String PROPERTY_SLC_AGENT_ID
= "slc_agentId";
37 public final static String QUERY_PING_ALL
= "pingAll";
39 private final static Log log
= LogFactory
.getLog(JmsAgent
.class);
41 private final SlcAgentDescriptor agentDescriptor
;
42 private JmsTemplate jmsTemplate
;
43 private Destination agentRegister
;
44 private Destination agentUnregister
;
46 private Destination responseDestination
;
50 agentDescriptor
= new SlcAgentDescriptor();
51 agentDescriptor
.setUuid(UUID
.randomUUID().toString());
52 agentDescriptor
.setHost(InetAddress
.getLocalHost().getHostName());
53 } catch (UnknownHostException e
) {
54 throw new SlcException("Unable to create agent descriptor.", e
);
58 public void afterPropertiesSet() throws Exception
{
60 jmsTemplate
.convertAndSend(agentRegister
, agentDescriptor
);
61 log
.info("Agent #" + agentDescriptor
.getUuid() + " registered to "
63 } catch (JmsException e
) {
65 .warn("Could not register agent "
66 + agentDescriptor
.getUuid()
69 + ". The agent will stay offline but will keep listening for a ping all sent by server.");
70 if (log
.isTraceEnabled())
71 log
.debug("Original error.", e
);
75 public void destroy() throws Exception
{
77 jmsTemplate
.convertAndSend(agentUnregister
, agentDescriptor
);
78 log
.info("Agent #" + agentDescriptor
.getUuid()
79 + " unregistered from " + agentUnregister
);
80 } catch (JmsException e
) {
81 log
.warn("Could not unregister agent " + agentDescriptor
.getUuid()
82 + ": " + e
.getMessage());
83 if (log
.isTraceEnabled())
84 log
.debug("Original error.", e
);
88 public void setAgentRegister(Destination agentRegister
) {
89 this.agentRegister
= agentRegister
;
92 public void setAgentUnregister(Destination agentUnregister
) {
93 this.agentUnregister
= agentUnregister
;
96 public String
getMessageSelector() {
97 String messageSelector
= "slc_agentId='" + agentDescriptor
.getUuid()
99 // if (log.isDebugEnabled())
100 // log.debug("Message selector: " + messageSelector);
101 return messageSelector
;
104 public ExecutionModuleDescriptor
getExecutionModuleDescriptor(
105 String moduleName
, String version
) {
106 return getModulesManager().getExecutionModuleDescriptor(moduleName
,
110 public List
<ExecutionModuleDescriptor
> listExecutionModuleDescriptors() {
111 List
<ExecutionModule
> modules
= getModulesManager()
112 .listExecutionModules();
114 List
<ExecutionModuleDescriptor
> descriptors
= new ArrayList
<ExecutionModuleDescriptor
>();
115 for (ExecutionModule module
: modules
) {
116 ExecutionModuleDescriptor md
= new ExecutionModuleDescriptor();
117 md
.setName(module
.getName());
118 md
.setVersion(module
.getVersion());
124 public boolean ping() {
128 public void onMessage(final Message message
) {
130 final String correlationId
;
132 query
= message
.getStringProperty(PROPERTY_QUERY
);
133 correlationId
= message
.getJMSCorrelationID();
134 } catch (JMSException e1
) {
135 throw new SlcException("Cannot analyze incoming message " + message
);
138 final Object response
;
139 final Destination destinationSend
;
140 if (QUERY_PING_ALL
.equals(query
)) {
141 ReferenceList refList
= (ReferenceList
) convertFrom(message
);
142 if (!refList
.getReferences().contains(agentDescriptor
.getUuid())) {
143 response
= agentDescriptor
;
144 destinationSend
= agentRegister
;
145 log
.info("Agent #" + agentDescriptor
.getUuid()
146 + " registering to " + agentRegister
147 + " in reply to a " + QUERY_PING_ALL
+ " query");
152 response
= process(query
, message
);
153 destinationSend
= responseDestination
;
157 jmsTemplate
.convertAndSend(destinationSend
, response
,
158 new MessagePostProcessor() {
159 public Message
postProcessMessage(Message messageToSend
)
160 throws JMSException
{
161 messageToSend
.setStringProperty(PROPERTY_QUERY
, query
);
162 messageToSend
.setStringProperty(PROPERTY_SLC_AGENT_ID
,
163 agentDescriptor
.getUuid());
164 messageToSend
.setJMSCorrelationID(correlationId
);
165 return messageToSend
;
168 if (log
.isTraceEnabled())
169 log
.debug("Sent response to query '" + query
170 + "' with correlationId " + correlationId
);
173 /** @return response */
174 public Object
process(String query
, Message message
) {
176 if ("getExecutionModuleDescriptor".equals(query
)) {
177 String moduleName
= message
.getStringProperty("moduleName");
178 String version
= message
.getStringProperty("version");
179 return getExecutionModuleDescriptor(moduleName
, version
);
180 } else if ("listExecutionModuleDescriptors".equals(query
)) {
182 List
<ExecutionModuleDescriptor
> lst
= listExecutionModuleDescriptors();
183 SlcAgentDescriptor agentDescriptorToSend
= new SlcAgentDescriptor(
185 agentDescriptorToSend
.setModuleDescriptors(lst
);
186 return agentDescriptorToSend
;
187 } else if ("runSlcExecution".equals(query
)) {
188 final SlcExecution slcExecution
= (SlcExecution
) convertFrom(message
);
191 runSlcExecution(slcExecution
);
194 return ExecutionAnswer
.ok("Execution started on agent "
195 + agentDescriptor
.getUuid());
196 } else if ("ping".equals(query
)) {
197 return ExecutionAnswer
.ok("Agent " + agentDescriptor
.getUuid()
200 throw new SlcException("Unsupported query " + query
);
202 } catch (Exception e
) {
203 log
.error("Processing of query " + query
+ " failed", e
);
204 return ExecutionAnswer
.error(e
);
208 protected Object
convertFrom(Message message
) {
210 return jmsTemplate
.getMessageConverter().fromMessage(message
);
211 } catch (JMSException e
) {
212 throw new SlcException("Cannot convert message", e
);
216 public void setResponseDestination(Destination responseDestination
) {
217 this.responseDestination
= responseDestination
;
220 public void setJmsTemplate(JmsTemplate jmsTemplate
) {
221 this.jmsTemplate
= jmsTemplate
;