2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org
.argeo
.slc
.jms
;
19 import java
.util
.List
;
21 import javax
.jms
.Destination
;
22 import javax
.jms
.JMSException
;
23 import javax
.jms
.Message
;
24 import javax
.jms
.MessageListener
;
26 import org
.apache
.commons
.logging
.Log
;
27 import org
.apache
.commons
.logging
.LogFactory
;
28 import org
.argeo
.slc
.SlcException
;
29 import org
.argeo
.slc
.core
.runtime
.DefaultAgent
;
30 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
31 import org
.argeo
.slc
.msg
.ExecutionAnswer
;
32 import org
.argeo
.slc
.msg
.MsgConstants
;
33 import org
.argeo
.slc
.msg
.ReferenceList
;
34 import org
.argeo
.slc
.process
.SlcExecution
;
35 import org
.argeo
.slc
.runtime
.SlcAgentDescriptor
;
36 import org
.springframework
.jms
.JmsException
;
37 import org
.springframework
.jms
.core
.JmsTemplate
;
38 import org
.springframework
.jms
.core
.MessagePostProcessor
;
40 /** JMS based implementation of an SLC Agent. */
41 public class JmsAgent
extends DefaultAgent
implements MessageListener
{
42 public final static String PROPERTY_QUERY
= "query";
43 public final static String QUERY_PING_ALL
= "pingAll";
45 private final static Log log
= LogFactory
.getLog(JmsAgent
.class);
47 private JmsTemplate jmsTemplate
;
48 private Destination agentRegister
;
49 private Destination agentUnregister
;
51 private Destination responseDestination
;
56 jmsTemplate
.convertAndSend(agentRegister
, getAgentDescriptor());
57 log
.info("Agent #" + getAgentUuid() + " registered to "
59 } catch (JmsException e
) {
60 log
.warn("Could not register agent "
61 + getAgentDescriptor().getUuid()
64 + ". The agent will stay offline but will keep listening for a ping all sent by server.");
65 if (log
.isTraceEnabled())
66 log
.debug("Original error.", e
);
70 public void dispose() {
72 jmsTemplate
.convertAndSend(agentUnregister
, getAgentDescriptor());
73 log
.info("Agent #" + getAgentUuid() + " unregistered from "
75 } catch (JmsException e
) {
76 log
.warn("Could not unregister agent " + getAgentUuid() + ": "
78 if (log
.isTraceEnabled())
79 log
.debug("Original error.", e
);
84 public void setAgentRegister(Destination agentRegister
) {
85 this.agentRegister
= agentRegister
;
88 public void setAgentUnregister(Destination agentUnregister
) {
89 this.agentUnregister
= agentUnregister
;
92 public String
getMessageSelector() {
93 String messageSelector
= "slc_agentId='" + getAgentUuid() + "'";
94 // if (log.isDebugEnabled())
95 // log.debug("Message selector: " + messageSelector);
96 return messageSelector
;
99 public void onMessage(final Message message
) {
101 final String correlationId
;
103 query
= message
.getStringProperty(PROPERTY_QUERY
);
104 correlationId
= message
.getJMSCorrelationID();
105 } catch (JMSException e1
) {
106 throw new SlcException("Cannot analyze incoming message " + message
);
109 final Object response
;
110 final Destination destinationSend
;
111 if (QUERY_PING_ALL
.equals(query
)) {
112 ReferenceList refList
= (ReferenceList
) convertFrom(message
);
113 if (!refList
.getReferences().contains(getAgentUuid())) {
114 response
= getAgentDescriptor();
115 destinationSend
= agentRegister
;
116 log
.info("Agent #" + getAgentUuid() + " registering to "
117 + agentRegister
+ " in reply to a " + QUERY_PING_ALL
123 response
= process(query
, message
);
124 destinationSend
= responseDestination
;
128 if (log
.isTraceEnabled())
129 log
.trace("About to send response " + response
.getClass());
130 jmsTemplate
.convertAndSend(destinationSend
, response
,
131 new MessagePostProcessor() {
132 public Message
postProcessMessage(Message messageToSend
)
133 throws JMSException
{
134 messageToSend
.setStringProperty(PROPERTY_QUERY
, query
);
135 messageToSend
.setStringProperty(
136 MsgConstants
.PROPERTY_SLC_AGENT_ID
,
138 messageToSend
.setJMSCorrelationID(correlationId
);
139 return messageToSend
;
142 if (log
.isTraceEnabled())
143 log
.debug("Sent response to query '" + query
144 + "' with correlationId " + correlationId
);
147 /** @return response */
148 public Object
process(String query
, Message message
) {
150 if ("getExecutionModuleDescriptor".equals(query
)) {
151 String moduleName
= message
.getStringProperty("moduleName");
152 String version
= message
.getStringProperty("version");
153 return getExecutionModuleDescriptor(moduleName
, version
);
154 } else if ("listExecutionModuleDescriptors".equals(query
)) {
156 List
<ExecutionModuleDescriptor
> lst
= listExecutionModuleDescriptors();
157 SlcAgentDescriptor agentDescriptorToSend
= new SlcAgentDescriptor(
158 getAgentDescriptor());
159 agentDescriptorToSend
.setModuleDescriptors(lst
);
160 return agentDescriptorToSend
;
161 } else if ("runSlcExecution".equals(query
)) {
162 final SlcExecution slcExecution
= (SlcExecution
) convertFrom(message
);
165 process(slcExecution
);
168 return ExecutionAnswer
.ok("Execution started on agent "
170 } else if ("ping".equals(query
)) {
171 return ExecutionAnswer
.ok("Agent " + getAgentUuid()
174 throw new SlcException("Unsupported query " + query
);
176 } catch (Exception e
) {
177 log
.error("Processing of query " + query
+ " failed", e
);
178 return ExecutionAnswer
.error(e
);
182 protected Object
convertFrom(Message message
) {
184 return jmsTemplate
.getMessageConverter().fromMessage(message
);
185 } catch (JMSException e
) {
186 throw new SlcException("Cannot convert message", e
);
190 public void setResponseDestination(Destination responseDestination
) {
191 this.responseDestination
= responseDestination
;
194 public void setJmsTemplate(JmsTemplate jmsTemplate
) {
195 this.jmsTemplate
= jmsTemplate
;