2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org
.argeo
.slc
.jms
;
19 import java
.net
.InetAddress
;
20 import java
.net
.UnknownHostException
;
21 import java
.util
.List
;
22 import java
.util
.UUID
;
24 import javax
.jms
.Destination
;
25 import javax
.jms
.JMSException
;
26 import javax
.jms
.Message
;
27 import javax
.jms
.MessageListener
;
29 import org
.apache
.commons
.logging
.Log
;
30 import org
.apache
.commons
.logging
.LogFactory
;
31 import org
.argeo
.slc
.SlcException
;
32 import org
.argeo
.slc
.core
.runtime
.DefaultAgent
;
33 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
34 import org
.argeo
.slc
.msg
.ExecutionAnswer
;
35 import org
.argeo
.slc
.msg
.MsgConstants
;
36 import org
.argeo
.slc
.msg
.ReferenceList
;
37 import org
.argeo
.slc
.process
.SlcExecution
;
38 import org
.argeo
.slc
.runtime
.SlcAgentDescriptor
;
39 import org
.springframework
.beans
.factory
.DisposableBean
;
40 import org
.springframework
.beans
.factory
.InitializingBean
;
41 import org
.springframework
.jms
.JmsException
;
42 import org
.springframework
.jms
.core
.JmsTemplate
;
43 import org
.springframework
.jms
.core
.MessagePostProcessor
;
45 /** JMS based implementation of SLC Agent. */
46 public class JmsAgent
extends DefaultAgent
implements InitializingBean
,
47 DisposableBean
, MessageListener
{
48 public final static String PROPERTY_QUERY
= "query";
49 public final static String QUERY_PING_ALL
= "pingAll";
51 private final static Log log
= LogFactory
.getLog(JmsAgent
.class);
53 private final SlcAgentDescriptor agentDescriptor
;
54 private JmsTemplate jmsTemplate
;
55 private Destination agentRegister
;
56 private Destination agentUnregister
;
58 private Destination responseDestination
;
62 agentDescriptor
= new SlcAgentDescriptor();
63 agentDescriptor
.setUuid(UUID
.randomUUID().toString());
64 agentDescriptor
.setHost(InetAddress
.getLocalHost().getHostName());
65 } catch (UnknownHostException e
) {
66 throw new SlcException("Unable to create agent descriptor.", e
);
70 public void afterPropertiesSet() throws Exception
{
72 jmsTemplate
.convertAndSend(agentRegister
, agentDescriptor
);
73 log
.info("Agent #" + agentDescriptor
.getUuid() + " registered to "
75 } catch (JmsException e
) {
77 .warn("Could not register agent "
78 + agentDescriptor
.getUuid()
81 + ". The agent will stay offline but will keep listening for a ping all sent by server.");
82 if (log
.isTraceEnabled())
83 log
.debug("Original error.", e
);
87 public void destroy() throws Exception
{
89 jmsTemplate
.convertAndSend(agentUnregister
, agentDescriptor
);
90 log
.info("Agent #" + agentDescriptor
.getUuid()
91 + " unregistered from " + agentUnregister
);
92 } catch (JmsException e
) {
93 log
.warn("Could not unregister agent " + agentDescriptor
.getUuid()
94 + ": " + e
.getMessage());
95 if (log
.isTraceEnabled())
96 log
.debug("Original error.", e
);
100 public void setAgentRegister(Destination agentRegister
) {
101 this.agentRegister
= agentRegister
;
104 public void setAgentUnregister(Destination agentUnregister
) {
105 this.agentUnregister
= agentUnregister
;
108 public String
getMessageSelector() {
109 String messageSelector
= "slc_agentId='" + agentDescriptor
.getUuid()
111 // if (log.isDebugEnabled())
112 // log.debug("Message selector: " + messageSelector);
113 return messageSelector
;
116 public void onMessage(final Message message
) {
118 final String correlationId
;
120 query
= message
.getStringProperty(PROPERTY_QUERY
);
121 correlationId
= message
.getJMSCorrelationID();
122 } catch (JMSException e1
) {
123 throw new SlcException("Cannot analyze incoming message " + message
);
126 final Object response
;
127 final Destination destinationSend
;
128 if (QUERY_PING_ALL
.equals(query
)) {
129 ReferenceList refList
= (ReferenceList
) convertFrom(message
);
130 if (!refList
.getReferences().contains(agentDescriptor
.getUuid())) {
131 response
= agentDescriptor
;
132 destinationSend
= agentRegister
;
133 log
.info("Agent #" + agentDescriptor
.getUuid()
134 + " registering to " + agentRegister
135 + " in reply to a " + QUERY_PING_ALL
+ " query");
140 response
= process(query
, message
);
141 destinationSend
= responseDestination
;
145 jmsTemplate
.convertAndSend(destinationSend
, response
,
146 new MessagePostProcessor() {
147 public Message
postProcessMessage(Message messageToSend
)
148 throws JMSException
{
149 messageToSend
.setStringProperty(PROPERTY_QUERY
, query
);
150 messageToSend
.setStringProperty(
151 MsgConstants
.PROPERTY_SLC_AGENT_ID
,
152 agentDescriptor
.getUuid());
153 messageToSend
.setJMSCorrelationID(correlationId
);
154 return messageToSend
;
157 if (log
.isTraceEnabled())
158 log
.debug("Sent response to query '" + query
159 + "' with correlationId " + correlationId
);
162 /** @return response */
163 public Object
process(String query
, Message message
) {
165 if ("getExecutionModuleDescriptor".equals(query
)) {
166 String moduleName
= message
.getStringProperty("moduleName");
167 String version
= message
.getStringProperty("version");
168 return getExecutionModuleDescriptor(moduleName
, version
);
169 } else if ("listExecutionModuleDescriptors".equals(query
)) {
171 List
<ExecutionModuleDescriptor
> lst
= listExecutionModuleDescriptors();
172 SlcAgentDescriptor agentDescriptorToSend
= new SlcAgentDescriptor(
174 agentDescriptorToSend
.setModuleDescriptors(lst
);
175 return agentDescriptorToSend
;
176 } else if ("runSlcExecution".equals(query
)) {
177 final SlcExecution slcExecution
= (SlcExecution
) convertFrom(message
);
180 runSlcExecution(slcExecution
);
183 return ExecutionAnswer
.ok("Execution started on agent "
184 + agentDescriptor
.getUuid());
185 } else if ("ping".equals(query
)) {
186 return ExecutionAnswer
.ok("Agent " + agentDescriptor
.getUuid()
189 throw new SlcException("Unsupported query " + query
);
191 } catch (Exception e
) {
192 log
.error("Processing of query " + query
+ " failed", e
);
193 return ExecutionAnswer
.error(e
);
197 protected Object
convertFrom(Message message
) {
199 return jmsTemplate
.getMessageConverter().fromMessage(message
);
200 } catch (JMSException e
) {
201 throw new SlcException("Cannot convert message", e
);
205 public void setResponseDestination(Destination responseDestination
) {
206 this.responseDestination
= responseDestination
;
209 public void setJmsTemplate(JmsTemplate jmsTemplate
) {
210 this.jmsTemplate
= jmsTemplate
;