]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgent.java
Flows automatically registered in JCR
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgent.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.jms;
18
19 import java.util.List;
20
21 import javax.jms.Destination;
22 import javax.jms.JMSException;
23 import javax.jms.Message;
24 import javax.jms.MessageListener;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.argeo.slc.SlcException;
29 import org.argeo.slc.core.runtime.DefaultAgent;
30 import org.argeo.slc.execution.ExecutionModuleDescriptor;
31 import org.argeo.slc.msg.ExecutionAnswer;
32 import org.argeo.slc.msg.MsgConstants;
33 import org.argeo.slc.msg.ReferenceList;
34 import org.argeo.slc.process.SlcExecution;
35 import org.argeo.slc.runtime.SlcAgentDescriptor;
36 import org.springframework.beans.factory.DisposableBean;
37 import org.springframework.beans.factory.InitializingBean;
38 import org.springframework.jms.JmsException;
39 import org.springframework.jms.core.JmsTemplate;
40 import org.springframework.jms.core.MessagePostProcessor;
41
42 /** JMS based implementation of SLC Agent. */
43 public class JmsAgent extends DefaultAgent implements InitializingBean,
44 DisposableBean, MessageListener {
45 public final static String PROPERTY_QUERY = "query";
46 public final static String QUERY_PING_ALL = "pingAll";
47
48 private final static Log log = LogFactory.getLog(JmsAgent.class);
49
50 private JmsTemplate jmsTemplate;
51 private Destination agentRegister;
52 private Destination agentUnregister;
53
54 private Destination responseDestination;
55
56 public void afterPropertiesSet() throws Exception {
57 try {
58 jmsTemplate.convertAndSend(agentRegister, getAgentDescriptor());
59 log.info("Agent #" + getAgentUuid() + " registered to "
60 + agentRegister);
61 } catch (JmsException e) {
62 log
63 .warn("Could not register agent "
64 + getAgentDescriptor().getUuid()
65 + " to server: "
66 + e.getMessage()
67 + ". The agent will stay offline but will keep listening for a ping all sent by server.");
68 if (log.isTraceEnabled())
69 log.debug("Original error.", e);
70 }
71 }
72
73 public void destroy() throws Exception {
74 try {
75 jmsTemplate.convertAndSend(agentUnregister, getAgentDescriptor());
76 log.info("Agent #" + getAgentUuid() + " unregistered from "
77 + agentUnregister);
78 } catch (JmsException e) {
79 log.warn("Could not unregister agent " + getAgentUuid() + ": "
80 + e.getMessage());
81 if (log.isTraceEnabled())
82 log.debug("Original error.", e);
83 }
84 }
85
86 public void setAgentRegister(Destination agentRegister) {
87 this.agentRegister = agentRegister;
88 }
89
90 public void setAgentUnregister(Destination agentUnregister) {
91 this.agentUnregister = agentUnregister;
92 }
93
94 public String getMessageSelector() {
95 String messageSelector = "slc_agentId='" + getAgentUuid() + "'";
96 // if (log.isDebugEnabled())
97 // log.debug("Message selector: " + messageSelector);
98 return messageSelector;
99 }
100
101 public void onMessage(final Message message) {
102 final String query;
103 final String correlationId;
104 try {
105 query = message.getStringProperty(PROPERTY_QUERY);
106 correlationId = message.getJMSCorrelationID();
107 } catch (JMSException e1) {
108 throw new SlcException("Cannot analyze incoming message " + message);
109 }
110
111 final Object response;
112 final Destination destinationSend;
113 if (QUERY_PING_ALL.equals(query)) {
114 ReferenceList refList = (ReferenceList) convertFrom(message);
115 if (!refList.getReferences().contains(getAgentUuid())) {
116 response = getAgentDescriptor();
117 destinationSend = agentRegister;
118 log.info("Agent #" + getAgentUuid() + " registering to "
119 + agentRegister + " in reply to a " + QUERY_PING_ALL
120 + " query");
121 } else {
122 return;
123 }
124 } else {
125 response = process(query, message);
126 destinationSend = responseDestination;
127 }
128
129 // Send response
130 if (log.isTraceEnabled())
131 log.trace("About to send response " + response.getClass());
132 jmsTemplate.convertAndSend(destinationSend, response,
133 new MessagePostProcessor() {
134 public Message postProcessMessage(Message messageToSend)
135 throws JMSException {
136 messageToSend.setStringProperty(PROPERTY_QUERY, query);
137 messageToSend.setStringProperty(
138 MsgConstants.PROPERTY_SLC_AGENT_ID,
139 getAgentUuid());
140 messageToSend.setJMSCorrelationID(correlationId);
141 return messageToSend;
142 }
143 });
144 if (log.isTraceEnabled())
145 log.debug("Sent response to query '" + query
146 + "' with correlationId " + correlationId);
147 }
148
149 /** @return response */
150 public Object process(String query, Message message) {
151 try {
152 if ("getExecutionModuleDescriptor".equals(query)) {
153 String moduleName = message.getStringProperty("moduleName");
154 String version = message.getStringProperty("version");
155 return getExecutionModuleDescriptor(moduleName, version);
156 } else if ("listExecutionModuleDescriptors".equals(query)) {
157
158 List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
159 SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
160 getAgentDescriptor());
161 agentDescriptorToSend.setModuleDescriptors(lst);
162 return agentDescriptorToSend;
163 } else if ("runSlcExecution".equals(query)) {
164 final SlcExecution slcExecution = (SlcExecution) convertFrom(message);
165 new Thread() {
166 public void run() {
167 runSlcExecution(slcExecution);
168 }
169 }.start();
170 return ExecutionAnswer.ok("Execution started on agent "
171 + getAgentUuid());
172 } else if ("ping".equals(query)) {
173 return ExecutionAnswer.ok("Agent " + getAgentUuid()
174 + " is alive.");
175 } else {
176 throw new SlcException("Unsupported query " + query);
177 }
178 } catch (Exception e) {
179 log.error("Processing of query " + query + " failed", e);
180 return ExecutionAnswer.error(e);
181 }
182 }
183
184 protected Object convertFrom(Message message) {
185 try {
186 return jmsTemplate.getMessageConverter().fromMessage(message);
187 } catch (JMSException e) {
188 throw new SlcException("Cannot convert message", e);
189 }
190 }
191
192 public void setResponseDestination(Destination responseDestination) {
193 this.responseDestination = responseDestination;
194 }
195
196 public void setJmsTemplate(JmsTemplate jmsTemplate) {
197 this.jmsTemplate = jmsTemplate;
198 }
199
200 }