]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgent.java
Introduce JCR agent. Client agent not needed anymore.
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgent.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.jms;
18
19 import java.util.List;
20
21 import javax.jms.Destination;
22 import javax.jms.JMSException;
23 import javax.jms.Message;
24 import javax.jms.MessageListener;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.argeo.slc.SlcException;
29 import org.argeo.slc.core.runtime.DefaultAgent;
30 import org.argeo.slc.execution.ExecutionModuleDescriptor;
31 import org.argeo.slc.msg.ExecutionAnswer;
32 import org.argeo.slc.msg.MsgConstants;
33 import org.argeo.slc.msg.ReferenceList;
34 import org.argeo.slc.process.SlcExecution;
35 import org.argeo.slc.runtime.SlcAgentDescriptor;
36 import org.springframework.jms.JmsException;
37 import org.springframework.jms.core.JmsTemplate;
38 import org.springframework.jms.core.MessagePostProcessor;
39
40 /** JMS based implementation of an SLC Agent. */
41 public class JmsAgent extends DefaultAgent implements MessageListener {
42 public final static String PROPERTY_QUERY = "query";
43 public final static String QUERY_PING_ALL = "pingAll";
44
45 private final static Log log = LogFactory.getLog(JmsAgent.class);
46
47 private JmsTemplate jmsTemplate;
48 private Destination agentRegister;
49 private Destination agentUnregister;
50
51 private Destination responseDestination;
52
53 public void init() {
54 super.init();
55 try {
56 jmsTemplate.convertAndSend(agentRegister, getAgentDescriptor());
57 log.info("Agent #" + getAgentUuid() + " registered to "
58 + agentRegister);
59 } catch (JmsException e) {
60 log.warn("Could not register agent "
61 + getAgentDescriptor().getUuid()
62 + " to server: "
63 + e.getMessage()
64 + ". The agent will stay offline but will keep listening for a ping all sent by server.");
65 if (log.isTraceEnabled())
66 log.debug("Original error.", e);
67 }
68 }
69
70 public void dispose() {
71 try {
72 jmsTemplate.convertAndSend(agentUnregister, getAgentDescriptor());
73 log.info("Agent #" + getAgentUuid() + " unregistered from "
74 + agentUnregister);
75 } catch (JmsException e) {
76 log.warn("Could not unregister agent " + getAgentUuid() + ": "
77 + e.getMessage());
78 if (log.isTraceEnabled())
79 log.debug("Original error.", e);
80 }
81 super.dispose();
82 }
83
84 public void setAgentRegister(Destination agentRegister) {
85 this.agentRegister = agentRegister;
86 }
87
88 public void setAgentUnregister(Destination agentUnregister) {
89 this.agentUnregister = agentUnregister;
90 }
91
92 public String getMessageSelector() {
93 String messageSelector = "slc_agentId='" + getAgentUuid() + "'";
94 // if (log.isDebugEnabled())
95 // log.debug("Message selector: " + messageSelector);
96 return messageSelector;
97 }
98
99 public void onMessage(final Message message) {
100 final String query;
101 final String correlationId;
102 try {
103 query = message.getStringProperty(PROPERTY_QUERY);
104 correlationId = message.getJMSCorrelationID();
105 } catch (JMSException e1) {
106 throw new SlcException("Cannot analyze incoming message " + message);
107 }
108
109 final Object response;
110 final Destination destinationSend;
111 if (QUERY_PING_ALL.equals(query)) {
112 ReferenceList refList = (ReferenceList) convertFrom(message);
113 if (!refList.getReferences().contains(getAgentUuid())) {
114 response = getAgentDescriptor();
115 destinationSend = agentRegister;
116 log.info("Agent #" + getAgentUuid() + " registering to "
117 + agentRegister + " in reply to a " + QUERY_PING_ALL
118 + " query");
119 } else {
120 return;
121 }
122 } else {
123 response = process(query, message);
124 destinationSend = responseDestination;
125 }
126
127 // Send response
128 if (log.isTraceEnabled())
129 log.trace("About to send response " + response.getClass());
130 jmsTemplate.convertAndSend(destinationSend, response,
131 new MessagePostProcessor() {
132 public Message postProcessMessage(Message messageToSend)
133 throws JMSException {
134 messageToSend.setStringProperty(PROPERTY_QUERY, query);
135 messageToSend.setStringProperty(
136 MsgConstants.PROPERTY_SLC_AGENT_ID,
137 getAgentUuid());
138 messageToSend.setJMSCorrelationID(correlationId);
139 return messageToSend;
140 }
141 });
142 if (log.isTraceEnabled())
143 log.debug("Sent response to query '" + query
144 + "' with correlationId " + correlationId);
145 }
146
147 /** @return response */
148 public Object process(String query, Message message) {
149 try {
150 if ("getExecutionModuleDescriptor".equals(query)) {
151 String moduleName = message.getStringProperty("moduleName");
152 String version = message.getStringProperty("version");
153 return getExecutionModuleDescriptor(moduleName, version);
154 } else if ("listExecutionModuleDescriptors".equals(query)) {
155
156 List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
157 SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
158 getAgentDescriptor());
159 agentDescriptorToSend.setModuleDescriptors(lst);
160 return agentDescriptorToSend;
161 } else if ("runSlcExecution".equals(query)) {
162 final SlcExecution slcExecution = (SlcExecution) convertFrom(message);
163 new Thread() {
164 public void run() {
165 runSlcExecution(slcExecution);
166 }
167 }.start();
168 return ExecutionAnswer.ok("Execution started on agent "
169 + getAgentUuid());
170 } else if ("ping".equals(query)) {
171 return ExecutionAnswer.ok("Agent " + getAgentUuid()
172 + " is alive.");
173 } else {
174 throw new SlcException("Unsupported query " + query);
175 }
176 } catch (Exception e) {
177 log.error("Processing of query " + query + " failed", e);
178 return ExecutionAnswer.error(e);
179 }
180 }
181
182 protected Object convertFrom(Message message) {
183 try {
184 return jmsTemplate.getMessageConverter().fromMessage(message);
185 } catch (JMSException e) {
186 throw new SlcException("Cannot convert message", e);
187 }
188 }
189
190 public void setResponseDestination(Destination responseDestination) {
191 this.responseDestination = responseDestination;
192 }
193
194 public void setJmsTemplate(JmsTemplate jmsTemplate) {
195 this.jmsTemplate = jmsTemplate;
196 }
197
198 }