1 package org
.argeo
.slc
.jms
;
3 import java
.net
.InetAddress
;
4 import java
.net
.UnknownHostException
;
5 import java
.util
.ArrayList
;
9 import javax
.jms
.Destination
;
10 import javax
.jms
.JMSException
;
11 import javax
.jms
.Message
;
12 import javax
.jms
.MessageListener
;
13 import javax
.jms
.TextMessage
;
15 import org
.apache
.commons
.logging
.Log
;
16 import org
.apache
.commons
.logging
.LogFactory
;
17 import org
.argeo
.slc
.SlcException
;
18 import org
.argeo
.slc
.core
.runtime
.AbstractAgent
;
19 import org
.argeo
.slc
.execution
.ExecutionModule
;
20 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
21 import org
.argeo
.slc
.msg
.ExecutionAnswer
;
22 import org
.argeo
.slc
.process
.SlcExecution
;
23 import org
.argeo
.slc
.runtime
.SlcAgent
;
24 import org
.argeo
.slc
.runtime
.SlcAgentDescriptor
;
25 import org
.springframework
.beans
.factory
.DisposableBean
;
26 import org
.springframework
.beans
.factory
.InitializingBean
;
27 import org
.springframework
.jms
.core
.JmsTemplate
;
28 import org
.springframework
.jms
.core
.MessagePostProcessor
;
30 /** JMS based implementation of SLC Agent. */
31 public class JmsAgent
extends AbstractAgent
implements SlcAgent
,
32 InitializingBean
, DisposableBean
, MessageListener
{
33 public final static String PROPERTY_QUERY
= "query";
34 public final static String PROPERTY_SLC_AGENT_ID
= "slc_agentId";
36 private final static Log log
= LogFactory
.getLog(JmsAgent
.class);
38 private final SlcAgentDescriptor agentDescriptor
;
39 private JmsTemplate jmsTemplate
;
40 private Destination agentRegister
;
41 private Destination agentUnregister
;
43 private Destination responseDestination
;
47 agentDescriptor
= new SlcAgentDescriptor();
48 agentDescriptor
.setUuid(UUID
.randomUUID().toString());
49 agentDescriptor
.setHost(InetAddress
.getLocalHost().getHostName());
50 } catch (UnknownHostException e
) {
51 throw new SlcException("Unable to create agent descriptor.", e
);
55 public void afterPropertiesSet() throws Exception
{
56 jmsTemplate
.convertAndSend(agentRegister
, agentDescriptor
);
57 log
.info("Agent #" + agentDescriptor
.getUuid() + " registered to "
61 public void destroy() throws Exception
{
62 jmsTemplate
.convertAndSend(agentUnregister
, agentDescriptor
);
63 log
.info("Agent #" + agentDescriptor
.getUuid() + " unregistered from "
67 public void setAgentRegister(Destination agentRegister
) {
68 this.agentRegister
= agentRegister
;
71 public void setAgentUnregister(Destination agentUnregister
) {
72 this.agentUnregister
= agentUnregister
;
75 public String
getMessageSelector() {
76 String messageSelector
= "slc_agentId='" + agentDescriptor
.getUuid()
78 // if (log.isDebugEnabled())
79 // log.debug("Message selector: " + messageSelector);
80 return messageSelector
;
83 public ExecutionModuleDescriptor
getExecutionModuleDescriptor(
84 String moduleName
, String version
) {
85 return getModulesManager().getExecutionModuleDescriptor(moduleName
,
89 public List
<ExecutionModuleDescriptor
> listExecutionModuleDescriptors() {
90 List
<ExecutionModule
> modules
= getModulesManager()
91 .listExecutionModules();
93 List
<ExecutionModuleDescriptor
> descriptors
= new ArrayList
<ExecutionModuleDescriptor
>();
94 for (ExecutionModule module
: modules
) {
95 ExecutionModuleDescriptor md
= new ExecutionModuleDescriptor();
96 md
.setName(module
.getName());
97 md
.setVersion(module
.getVersion());
103 public boolean ping() {
107 public void onMessage(final Message message
) {
109 final String correlationId
;
111 query
= message
.getStringProperty(PROPERTY_QUERY
);
112 correlationId
= message
.getJMSCorrelationID();
113 } catch (JMSException e1
) {
114 throw new SlcException("Cannot analyze incoming message " + message
);
117 final Object response
= process(query
, message
);
122 jmsTemplate
.convertAndSend(responseDestination
, response
,
123 new MessagePostProcessor() {
124 public Message
postProcessMessage(
125 Message messageToSend
) throws JMSException
{
130 .getStringProperty(PROPERTY_QUERY
));
131 messageToSend
.setStringProperty(
132 PROPERTY_SLC_AGENT_ID
, agentDescriptor
134 messageToSend
.setJMSCorrelationID(message
135 .getJMSCorrelationID());
136 return messageToSend
;
139 if (log
.isDebugEnabled())
140 log
.debug("Sent response to query " + query
141 + " with correlationId " + correlationId
);
147 /** @return response */
148 public Object
process(String query
, Message message
) {
150 if ("getExecutionModuleDescriptor".equals(query
)) {
151 String moduleName
= message
.getStringProperty("moduleName");
152 String version
= message
.getStringProperty("version");
153 return getExecutionModuleDescriptor(moduleName
, version
);
154 } else if ("listExecutionModuleDescriptors".equals(query
)) {
156 List
<ExecutionModuleDescriptor
> lst
= listExecutionModuleDescriptors();
157 SlcAgentDescriptor agentDescriptorToSend
= new SlcAgentDescriptor(
159 agentDescriptorToSend
.setModuleDescriptors(lst
);
160 return agentDescriptorToSend
;
161 } else if ("runSlcExecution".equals(query
)) {
162 SlcExecution slcExecution
= (SlcExecution
) convertFrom(message
);
163 runSlcExecution(slcExecution
);
164 return ExecutionAnswer
.ok("Execution started on agent "
165 + agentDescriptor
.getUuid());
166 } else if ("ping".equals(query
)) {
167 return ExecutionAnswer
.ok("Agent " + agentDescriptor
.getUuid()
170 throw new SlcException("Unsupported query " + query
);
172 } catch (Exception e
) {
173 log
.error("Processing of query " + query
+ " failed", e
);
174 return ExecutionAnswer
.error(e
);
178 protected Object
convertFrom(Message message
) throws JMSException
{
179 return jmsTemplate
.getMessageConverter().fromMessage(message
);
182 public void setResponseDestination(Destination responseDestination
) {
183 this.responseDestination
= responseDestination
;
186 public void setJmsTemplate(JmsTemplate jmsTemplate
) {
187 this.jmsTemplate
= jmsTemplate
;