1 package org
.argeo
.slc
.jms
;
3 import java
.net
.InetAddress
;
4 import java
.net
.UnknownHostException
;
5 import java
.util
.ArrayList
;
9 import javax
.jms
.Destination
;
10 import javax
.jms
.JMSException
;
11 import javax
.jms
.Message
;
12 import javax
.jms
.MessageProducer
;
13 import javax
.jms
.Session
;
14 import javax
.jms
.TextMessage
;
16 import org
.apache
.commons
.logging
.Log
;
17 import org
.apache
.commons
.logging
.LogFactory
;
18 import org
.argeo
.slc
.SlcException
;
19 import org
.argeo
.slc
.core
.runtime
.AbstractAgent
;
20 import org
.argeo
.slc
.execution
.ExecutionModule
;
21 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
22 import org
.argeo
.slc
.process
.SlcExecution
;
23 import org
.argeo
.slc
.runtime
.SlcAgent
;
24 import org
.argeo
.slc
.runtime
.SlcAgentDescriptor
;
25 import org
.springframework
.beans
.factory
.DisposableBean
;
26 import org
.springframework
.beans
.factory
.InitializingBean
;
27 import org
.springframework
.jms
.core
.JmsTemplate
;
28 import org
.springframework
.jms
.listener
.SessionAwareMessageListener
;
29 import org
.springframework
.jms
.support
.converter
.MessageConversionException
;
31 /** JMS based implementation of SLC Agent. */
32 public class JmsAgent
extends AbstractAgent
implements SlcAgent
,
33 InitializingBean
, DisposableBean
, SessionAwareMessageListener
{
34 public final static String PROPERTY_QUERY
= "query";
35 public final static String PROPERTY_SLC_AGENT_ID
= "slc_agentId";
37 private final static Log log
= LogFactory
.getLog(JmsAgent
.class);
39 private final SlcAgentDescriptor agentDescriptor
;
40 // private ConnectionFactory connectionFactory;
41 private JmsTemplate jmsTemplate
;
42 private Destination agentRegister
;
43 private Destination agentUnregister
;
45 // private Destination requestDestination;
46 private Destination responseDestination
;
48 // private MessageConverter messageConverter;
52 agentDescriptor
= new SlcAgentDescriptor();
53 agentDescriptor
.setUuid(UUID
.randomUUID().toString());
54 agentDescriptor
.setHost(InetAddress
.getLocalHost().getHostName());
55 } catch (UnknownHostException e
) {
56 throw new SlcException("Unable to create agent descriptor.", e
);
60 public void afterPropertiesSet() throws Exception
{
61 // Initialize JMS Template
62 // jmsTemplate = new JmsTemplate(connectionFactory);
63 // jmsTemplate.setMessageConverter(messageConverter);
65 jmsTemplate
.convertAndSend(agentRegister
, agentDescriptor
);
66 log
.info("Agent #" + agentDescriptor
.getUuid() + " registered to "
70 public void destroy() throws Exception
{
71 jmsTemplate
.convertAndSend(agentUnregister
, agentDescriptor
);
72 log
.info("Agent #" + agentDescriptor
.getUuid() + " unregistered from "
76 public void setAgentRegister(Destination agentRegister
) {
77 this.agentRegister
= agentRegister
;
80 public void setAgentUnregister(Destination agentUnregister
) {
81 this.agentUnregister
= agentUnregister
;
84 public String
getMessageSelector() {
85 String messageSelector
= "slc_agentId='" + agentDescriptor
.getUuid()
87 // if (log.isDebugEnabled())
88 // log.debug("Message selector: " + messageSelector);
89 return messageSelector
;
92 public void onMessage(Message message
, Session session
) throws JMSException
{
93 MessageProducer producer
= session
.createProducer(responseDestination
);
94 String query
= message
.getStringProperty(PROPERTY_QUERY
);
95 String correlationId
= message
.getJMSCorrelationID();
96 if (log
.isDebugEnabled())
97 log
.debug("Received query " + query
+ " with correlationId "
100 Message responseMsg
= null;
101 if ("getExecutionModuleDescriptor".equals(query
)) {
102 String moduleName
= message
.getStringProperty("moduleName");
103 String version
= message
.getStringProperty("version");
104 ExecutionModuleDescriptor emd
= getExecutionModuleDescriptor(
105 moduleName
, version
);
106 responseMsg
= jmsTemplate
.getMessageConverter().toMessage(emd
,
108 } else if ("listExecutionModuleDescriptors".equals(query
)) {
110 List
<ExecutionModuleDescriptor
> lst
= listExecutionModuleDescriptors();
111 SlcAgentDescriptor agentDescriptorToSend
= new SlcAgentDescriptor(
113 agentDescriptorToSend
.setModuleDescriptors(lst
);
114 responseMsg
= jmsTemplate
.getMessageConverter().toMessage(
115 agentDescriptorToSend
, session
);
116 } else if ("newExecution".equals(query
)) {
118 SlcExecution slcExecution
= (SlcExecution
) jmsTemplate
119 .getMessageConverter().fromMessage(message
);
120 runSlcExecution(slcExecution
);
124 // // FIXME: generalize
125 // SlcExecution slcExecution = (SlcExecution) jmsTemplate
126 // .getMessageConverter().fromMessage(message);
127 // runSlcExecution(slcExecution);
128 // } catch (MessageConversionException e) {
129 // if (log.isDebugEnabled())
130 // log.debug("Unsupported query " + query, e);
132 if (log
.isDebugEnabled())
133 log
.debug("Unsupported query " + query
);
137 if (responseMsg
!= null) {
138 responseMsg
.setJMSCorrelationID(correlationId
);
139 producer
.send(responseMsg
);
140 if (log
.isDebugEnabled())
141 log
.debug("Sent response to query " + query
142 + " with correlationId " + correlationId
+ ": "
148 public ExecutionModuleDescriptor
getExecutionModuleDescriptor(
149 String moduleName
, String version
) {
150 return getModulesManager().getExecutionModuleDescriptor(moduleName
,
154 public List
<ExecutionModuleDescriptor
> listExecutionModuleDescriptors() {
155 List
<ExecutionModule
> modules
= getModulesManager()
156 .listExecutionModules();
158 List
<ExecutionModuleDescriptor
> descriptors
= new ArrayList
<ExecutionModuleDescriptor
>();
159 for (ExecutionModule module
: modules
) {
160 ExecutionModuleDescriptor md
= new ExecutionModuleDescriptor();
161 md
.setName(module
.getName());
162 md
.setVersion(module
.getVersion());
168 public void setResponseDestination(Destination responseDestination
) {
169 this.responseDestination
= responseDestination
;
172 public void setJmsTemplate(JmsTemplate jmsTemplate
) {
173 this.jmsTemplate
= jmsTemplate
;