]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgent.java
baddb1c604f075f4af5a5cc7667531331f48631a
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsAgent.java
1 package org.argeo.slc.jms;
2
3 import java.net.InetAddress;
4 import java.net.UnknownHostException;
5 import java.util.ArrayList;
6 import java.util.List;
7 import java.util.UUID;
8
9 import javax.jms.Destination;
10 import javax.jms.JMSException;
11 import javax.jms.Message;
12 import javax.jms.MessageProducer;
13 import javax.jms.Session;
14 import javax.jms.TextMessage;
15
16 import org.apache.commons.logging.Log;
17 import org.apache.commons.logging.LogFactory;
18 import org.argeo.slc.SlcException;
19 import org.argeo.slc.core.runtime.AbstractAgent;
20 import org.argeo.slc.execution.ExecutionModule;
21 import org.argeo.slc.execution.ExecutionModuleDescriptor;
22 import org.argeo.slc.process.SlcExecution;
23 import org.argeo.slc.runtime.SlcAgent;
24 import org.argeo.slc.runtime.SlcAgentDescriptor;
25 import org.springframework.beans.factory.DisposableBean;
26 import org.springframework.beans.factory.InitializingBean;
27 import org.springframework.jms.core.JmsTemplate;
28 import org.springframework.jms.listener.SessionAwareMessageListener;
29 import org.springframework.jms.support.converter.MessageConversionException;
30
31 /** JMS based implementation of SLC Agent. */
32 public class JmsAgent extends AbstractAgent implements SlcAgent,
33 InitializingBean, DisposableBean, SessionAwareMessageListener {
34 public final static String PROPERTY_QUERY = "query";
35 public final static String PROPERTY_SLC_AGENT_ID = "slc_agentId";
36
37 private final static Log log = LogFactory.getLog(JmsAgent.class);
38
39 private final SlcAgentDescriptor agentDescriptor;
40 // private ConnectionFactory connectionFactory;
41 private JmsTemplate jmsTemplate;
42 private Destination agentRegister;
43 private Destination agentUnregister;
44
45 // private Destination requestDestination;
46 private Destination responseDestination;
47
48 // private MessageConverter messageConverter;
49
50 public JmsAgent() {
51 try {
52 agentDescriptor = new SlcAgentDescriptor();
53 agentDescriptor.setUuid(UUID.randomUUID().toString());
54 agentDescriptor.setHost(InetAddress.getLocalHost().getHostName());
55 } catch (UnknownHostException e) {
56 throw new SlcException("Unable to create agent descriptor.", e);
57 }
58 }
59
60 public void afterPropertiesSet() throws Exception {
61 // Initialize JMS Template
62 // jmsTemplate = new JmsTemplate(connectionFactory);
63 // jmsTemplate.setMessageConverter(messageConverter);
64
65 jmsTemplate.convertAndSend(agentRegister, agentDescriptor);
66 log.info("Agent #" + agentDescriptor.getUuid() + " registered to "
67 + agentRegister);
68 }
69
70 public void destroy() throws Exception {
71 jmsTemplate.convertAndSend(agentUnregister, agentDescriptor);
72 log.info("Agent #" + agentDescriptor.getUuid() + " unregistered from "
73 + agentUnregister);
74 }
75
76 public void setAgentRegister(Destination agentRegister) {
77 this.agentRegister = agentRegister;
78 }
79
80 public void setAgentUnregister(Destination agentUnregister) {
81 this.agentUnregister = agentUnregister;
82 }
83
84 public String getMessageSelector() {
85 String messageSelector = "slc_agentId='" + agentDescriptor.getUuid()
86 + "'";
87 // if (log.isDebugEnabled())
88 // log.debug("Message selector: " + messageSelector);
89 return messageSelector;
90 }
91
92 public void onMessage(Message message, Session session) throws JMSException {
93 MessageProducer producer = session.createProducer(responseDestination);
94 String query = message.getStringProperty(PROPERTY_QUERY);
95 String correlationId = message.getJMSCorrelationID();
96 if (log.isDebugEnabled())
97 log.debug("Received query " + query + " with correlationId "
98 + correlationId);
99
100 Message responseMsg = null;
101 if ("getExecutionModuleDescriptor".equals(query)) {
102 String moduleName = message.getStringProperty("moduleName");
103 String version = message.getStringProperty("version");
104 ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
105 moduleName, version);
106 responseMsg = jmsTemplate.getMessageConverter().toMessage(emd,
107 session);
108 } else if ("listExecutionModuleDescriptors".equals(query)) {
109
110 List<ExecutionModuleDescriptor> lst = listExecutionModuleDescriptors();
111 SlcAgentDescriptor agentDescriptorToSend = new SlcAgentDescriptor(
112 agentDescriptor);
113 agentDescriptorToSend.setModuleDescriptors(lst);
114 responseMsg = jmsTemplate.getMessageConverter().toMessage(
115 agentDescriptorToSend, session);
116 } else if ("newExecution".equals(query)) {
117
118 SlcExecution slcExecution = (SlcExecution) jmsTemplate
119 .getMessageConverter().fromMessage(message);
120 runSlcExecution(slcExecution);
121 return;
122 } else {
123 // try {
124 // // FIXME: generalize
125 // SlcExecution slcExecution = (SlcExecution) jmsTemplate
126 // .getMessageConverter().fromMessage(message);
127 // runSlcExecution(slcExecution);
128 // } catch (MessageConversionException e) {
129 // if (log.isDebugEnabled())
130 // log.debug("Unsupported query " + query, e);
131 // }
132 if (log.isDebugEnabled())
133 log.debug("Unsupported query " + query);
134 return;
135 }
136
137 if (responseMsg != null) {
138 responseMsg.setJMSCorrelationID(correlationId);
139 producer.send(responseMsg);
140 if (log.isDebugEnabled())
141 log.debug("Sent response to query " + query
142 + " with correlationId " + correlationId + ": "
143 + responseMsg);
144 }
145
146 }
147
148 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
149 String moduleName, String version) {
150 return getModulesManager().getExecutionModuleDescriptor(moduleName,
151 version);
152 }
153
154 public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
155 List<ExecutionModule> modules = getModulesManager()
156 .listExecutionModules();
157
158 List<ExecutionModuleDescriptor> descriptors = new ArrayList<ExecutionModuleDescriptor>();
159 for (ExecutionModule module : modules) {
160 ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
161 md.setName(module.getName());
162 md.setVersion(module.getVersion());
163 descriptors.add(md);
164 }
165 return descriptors;
166 }
167
168 public void setResponseDestination(Destination responseDestination) {
169 this.responseDestination = responseDestination;
170 }
171
172 public void setJmsTemplate(JmsTemplate jmsTemplate) {
173 this.jmsTemplate = jmsTemplate;
174 }
175
176 }