]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgent.java
Start working on serialized JMS
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgent.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.jms;
18
19 import java.net.InetAddress;
20 import java.net.UnknownHostException;
21 import java.util.List;
22 import java.util.UUID;
23
24 import javax.jms.Destination;
25 import javax.jms.JMSException;
26 import javax.jms.Message;
27 import javax.jms.MessageListener;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.argeo.slc.SlcException;
32 import org.argeo.slc.core.runtime.DefaultAgent;
33 import org.argeo.slc.execution.ExecutionModuleDescriptor;
34 import org.argeo.slc.msg.ExecutionAnswer;
35 import org.argeo.slc.msg.MsgConstants;
36 import org.argeo.slc.msg.ReferenceList;
37 import org.argeo.slc.process.SlcExecution;
38 import org.argeo.slc.runtime.SlcAgentDescriptor;
39 import org.springframework.beans.factory.DisposableBean;
40 import org.springframework.beans.factory.InitializingBean;
41 import org.springframework.jms.JmsException;
42 import org.springframework.jms.core.JmsTemplate;
43 import org.springframework.jms.core.MessagePostProcessor;
44
45 /** JMS based implementation of SLC Agent. */
46 public class JmsAgent extends DefaultAgent implements InitializingBean,
47 DisposableBean, MessageListener {
48 public final static String PROPERTY_QUERY = "query";
49 public final static String QUERY_PING_ALL = "pingAll";
50
51 private final static Log log = LogFactory.getLog(JmsAgent.class);
52
53 private final SlcAgentDescriptor agentDescriptor;
54 private JmsTemplate jmsTemplate;
55 private Destination agentRegister;
56 private Destination agentUnregister;
57
58 private Destination responseDestination;
59
60 public JmsAgent() {
61 try {
62 agentDescriptor = new SlcAgentDescriptor();
63 agentDescriptor.setUuid(UUID.randomUUID().toString());
64 agentDescriptor.setHost(InetAddress.getLocalHost().getHostName());
65 } catch (UnknownHostException e) {
66 throw new SlcException("Unable to create agent descriptor.", e);
67 }
68 }
69
70 public void afterPropertiesSet() throws Exception {
71 try {
72 jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
73 log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
74 + agentRegister);
75 } catch (JmsException e) {
76 log
77 .warn("Could not register agent "
78 + agentDescriptor.getUuid()
79 + " to server: "
80 + e.getMessage()
81 + ". The agent will stay offline but will keep listening for a ping all sent by server.");
82 if (log.isTraceEnabled())
83 log.debug("Original error.", e);
84 }
85 }
86
87 public void destroy() throws Exception {
88 try {
89 jmsTemplate.convertAndSend(agentUnregister, agentDescriptor);
90 log.info("Agent #" + agentDescriptor.getUuid()
91 + " unregistered from " + agentUnregister);
92 } catch (JmsException e) {
93 log.warn("Could not unregister agent " + agentDescriptor.getUuid()
94 + ": " + e.getMessage());
95 if (log.isTraceEnabled())
96 log.debug("Original error.", e);
97 }
98 }
99
100 public void setAgentRegister(Destination agentRegister) {
101 this.agentRegister = agentRegister;
102 }
103
104 public void setAgentUnregister(Destination agentUnregister) {
105 this.agentUnregister = agentUnregister;
106 }
107
108 public String getMessageSelector() {
109 String messageSelector = "slc_agentId='" + agentDescriptor.getUuid()
110 + "'";
111 // if (log.isDebugEnabled())
112 // log.debug("Message selector: " + messageSelector);
113 return messageSelector;
114 }
115
116 public void onMessage(final Message message) {
117 final String query;
118 final String correlationId;
119 try {
120 query = message.getStringProperty(PROPERTY_QUERY);
121 correlationId = message.getJMSCorrelationID();
122 } catch (JMSException e1) {
123 throw new SlcException("Cannot analyze incoming message " + message);
124 }
125
126 final Object response;
127 final Destination destinationSend;
128 if (QUERY_PING_ALL.equals(query)) {
129 ReferenceList refList = (ReferenceList) convertFrom(message);
130 if (!refList.getReferences().contains(agentDescriptor.getUuid())) {
131 response = agentDescriptor;
132 destinationSend = agentRegister;
133 log.info("Agent #" + agentDescriptor.getUuid()
134 + " registering to " + agentRegister
135 + " in reply to a " + QUERY_PING_ALL + " query");
136 } else {
137 return;
138 }
139 } else {
140 response = process(query, message);
141 destinationSend = responseDestination;
142 }
143
144 // Send response
145 if (log.isTraceEnabled())
146 log.trace("About to send response " + response.getClass());
147 jmsTemplate.convertAndSend(destinationSend, response,
148 new MessagePostProcessor() {
149 public Message postProcessMessage(Message messageToSend)
150 throws JMSException {
151 messageToSend.setStringProperty(PROPERTY_QUERY, query);
152 messageToSend.setStringProperty(
153 MsgConstants.PROPERTY_SLC_AGENT_ID,
154 agentDescriptor.getUuid());
155 messageToSend.setJMSCorrelationID(correlationId);
156 return messageToSend;
157 }
158 });
159 if (log.isTraceEnabled())
160 log.debug("Sent response to query '" + query
161 + "' with correlationId " + correlationId);
162 }
163
164 /** @return response */
165 public Object process(String query, Message message) {
166 try {
167 if ("getExecutionModuleDescriptor".equals(query)) {
168 String moduleName = message.getStringProperty("moduleName");
169 String version = message.getStringProperty("version");
170 return getExecutionModuleDescriptor(moduleName, version);
171 } else if ("listExecutionModuleDescriptors".equals(query)) {
172
173 List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
174 SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
175 agentDescriptor);
176 agentDescriptorToSend.setModuleDescriptors(lst);
177 return agentDescriptorToSend;
178 } else if ("runSlcExecution".equals(query)) {
179 final SlcExecution slcExecution = (SlcExecution) convertFrom(message);
180 new Thread() {
181 public void run() {
182 runSlcExecution(slcExecution);
183 }
184 }.start();
185 return ExecutionAnswer.ok("Execution started on agent "
186 + agentDescriptor.getUuid());
187 } else if ("ping".equals(query)) {
188 return ExecutionAnswer.ok("Agent " + agentDescriptor.getUuid()
189 + " is alive.");
190 } else {
191 throw new SlcException("Unsupported query " + query);
192 }
193 } catch (Exception e) {
194 log.error("Processing of query " + query + " failed", e);
195 return ExecutionAnswer.error(e);
196 }
197 }
198
199 protected Object convertFrom(Message message) {
200 try {
201 return jmsTemplate.getMessageConverter().fromMessage(message);
202 } catch (JMSException e) {
203 throw new SlcException("Cannot convert message", e);
204 }
205 }
206
207 public void setResponseDestination(Destination responseDestination) {
208 this.responseDestination = responseDestination;
209 }
210
211 public void setJmsTemplate(JmsTemplate jmsTemplate) {
212 this.jmsTemplate = jmsTemplate;
213 }
214
215 }