1 package org
.argeo
.slc
.jms
;
3 import java
.net
.InetAddress
;
4 import java
.net
.UnknownHostException
;
5 import java
.util
.ArrayList
;
9 import javax
.jms
.Destination
;
10 import javax
.jms
.JMSException
;
11 import javax
.jms
.Message
;
12 import javax
.jms
.MessageListener
;
14 import org
.apache
.commons
.logging
.Log
;
15 import org
.apache
.commons
.logging
.LogFactory
;
16 import org
.argeo
.slc
.SlcException
;
17 import org
.argeo
.slc
.core
.runtime
.AbstractAgent
;
18 import org
.argeo
.slc
.execution
.ExecutionModule
;
19 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
20 import org
.argeo
.slc
.msg
.ExecutionAnswer
;
21 import org
.argeo
.slc
.msg
.MsgConstants
;
22 import org
.argeo
.slc
.msg
.ReferenceList
;
23 import org
.argeo
.slc
.process
.SlcExecution
;
24 import org
.argeo
.slc
.runtime
.SlcAgent
;
25 import org
.argeo
.slc
.runtime
.SlcAgentDescriptor
;
26 import org
.springframework
.beans
.factory
.DisposableBean
;
27 import org
.springframework
.beans
.factory
.InitializingBean
;
28 import org
.springframework
.jms
.JmsException
;
29 import org
.springframework
.jms
.core
.JmsTemplate
;
30 import org
.springframework
.jms
.core
.MessagePostProcessor
;
32 /** JMS based implementation of SLC Agent. */
33 public class JmsAgent
extends AbstractAgent
implements SlcAgent
,
34 InitializingBean
, DisposableBean
, MessageListener
{
35 public final static String PROPERTY_QUERY
= "query";
36 public final static String QUERY_PING_ALL
= "pingAll";
38 private final static Log log
= LogFactory
.getLog(JmsAgent
.class);
40 private final SlcAgentDescriptor agentDescriptor
;
41 private JmsTemplate jmsTemplate
;
42 private Destination agentRegister
;
43 private Destination agentUnregister
;
45 private Destination responseDestination
;
49 agentDescriptor
= new SlcAgentDescriptor();
50 agentDescriptor
.setUuid(UUID
.randomUUID().toString());
51 agentDescriptor
.setHost(InetAddress
.getLocalHost().getHostName());
52 } catch (UnknownHostException e
) {
53 throw new SlcException("Unable to create agent descriptor.", e
);
57 public void afterPropertiesSet() throws Exception
{
59 jmsTemplate
.convertAndSend(agentRegister
, agentDescriptor
);
60 log
.info("Agent #" + agentDescriptor
.getUuid() + " registered to "
62 } catch (JmsException e
) {
64 .warn("Could not register agent "
65 + agentDescriptor
.getUuid()
68 + ". The agent will stay offline but will keep listening for a ping all sent by server.");
69 if (log
.isTraceEnabled())
70 log
.debug("Original error.", e
);
74 public void destroy() throws Exception
{
76 jmsTemplate
.convertAndSend(agentUnregister
, agentDescriptor
);
77 log
.info("Agent #" + agentDescriptor
.getUuid()
78 + " unregistered from " + agentUnregister
);
79 } catch (JmsException e
) {
80 log
.warn("Could not unregister agent " + agentDescriptor
.getUuid()
81 + ": " + e
.getMessage());
82 if (log
.isTraceEnabled())
83 log
.debug("Original error.", e
);
87 public void setAgentRegister(Destination agentRegister
) {
88 this.agentRegister
= agentRegister
;
91 public void setAgentUnregister(Destination agentUnregister
) {
92 this.agentUnregister
= agentUnregister
;
95 public String
getMessageSelector() {
96 String messageSelector
= "slc_agentId='" + agentDescriptor
.getUuid()
98 // if (log.isDebugEnabled())
99 // log.debug("Message selector: " + messageSelector);
100 return messageSelector
;
103 public ExecutionModuleDescriptor
getExecutionModuleDescriptor(
104 String moduleName
, String version
) {
105 return getModulesManager().getExecutionModuleDescriptor(moduleName
,
109 public List
<ExecutionModuleDescriptor
> listExecutionModuleDescriptors() {
110 List
<ExecutionModule
> modules
= getModulesManager()
111 .listExecutionModules();
113 List
<ExecutionModuleDescriptor
> descriptors
= new ArrayList
<ExecutionModuleDescriptor
>();
114 for (ExecutionModule module
: modules
) {
115 ExecutionModuleDescriptor md
= new ExecutionModuleDescriptor();
116 md
.setName(module
.getName());
117 md
.setVersion(module
.getVersion());
123 public boolean ping() {
127 public void onMessage(final Message message
) {
129 final String correlationId
;
131 query
= message
.getStringProperty(PROPERTY_QUERY
);
132 correlationId
= message
.getJMSCorrelationID();
133 } catch (JMSException e1
) {
134 throw new SlcException("Cannot analyze incoming message " + message
);
137 final Object response
;
138 final Destination destinationSend
;
139 if (QUERY_PING_ALL
.equals(query
)) {
140 ReferenceList refList
= (ReferenceList
) convertFrom(message
);
141 if (!refList
.getReferences().contains(agentDescriptor
.getUuid())) {
142 response
= agentDescriptor
;
143 destinationSend
= agentRegister
;
144 log
.info("Agent #" + agentDescriptor
.getUuid()
145 + " registering to " + agentRegister
146 + " in reply to a " + QUERY_PING_ALL
+ " query");
151 response
= process(query
, message
);
152 destinationSend
= responseDestination
;
156 jmsTemplate
.convertAndSend(destinationSend
, response
,
157 new MessagePostProcessor() {
158 public Message
postProcessMessage(Message messageToSend
)
159 throws JMSException
{
160 messageToSend
.setStringProperty(PROPERTY_QUERY
, query
);
161 messageToSend
.setStringProperty(MsgConstants
.PROPERTY_SLC_AGENT_ID
,
162 agentDescriptor
.getUuid());
163 messageToSend
.setJMSCorrelationID(correlationId
);
164 return messageToSend
;
167 if (log
.isTraceEnabled())
168 log
.debug("Sent response to query '" + query
169 + "' with correlationId " + correlationId
);
172 /** @return response */
173 public Object
process(String query
, Message message
) {
175 if ("getExecutionModuleDescriptor".equals(query
)) {
176 String moduleName
= message
.getStringProperty("moduleName");
177 String version
= message
.getStringProperty("version");
178 return getExecutionModuleDescriptor(moduleName
, version
);
179 } else if ("listExecutionModuleDescriptors".equals(query
)) {
181 List
<ExecutionModuleDescriptor
> lst
= listExecutionModuleDescriptors();
182 SlcAgentDescriptor agentDescriptorToSend
= new SlcAgentDescriptor(
184 agentDescriptorToSend
.setModuleDescriptors(lst
);
185 return agentDescriptorToSend
;
186 } else if ("runSlcExecution".equals(query
)) {
187 final SlcExecution slcExecution
= (SlcExecution
) convertFrom(message
);
190 runSlcExecution(slcExecution
);
193 return ExecutionAnswer
.ok("Execution started on agent "
194 + agentDescriptor
.getUuid());
195 } else if ("ping".equals(query
)) {
196 return ExecutionAnswer
.ok("Agent " + agentDescriptor
.getUuid()
199 throw new SlcException("Unsupported query " + query
);
201 } catch (Exception e
) {
202 log
.error("Processing of query " + query
+ " failed", e
);
203 return ExecutionAnswer
.error(e
);
207 protected Object
convertFrom(Message message
) {
209 return jmsTemplate
.getMessageConverter().fromMessage(message
);
210 } catch (JMSException e
) {
211 throw new SlcException("Cannot convert message", e
);
215 public void setResponseDestination(Destination responseDestination
) {
216 this.responseDestination
= responseDestination
;
219 public void setJmsTemplate(JmsTemplate jmsTemplate
) {
220 this.jmsTemplate
= jmsTemplate
;