<bean id="slcDefault.jms.listener.newExecution" parent="slcTemplate.jms.listenerContainer">
<property name="connectionFactory" ref="slcDefault.jms.connectionFactory" />
<property name="destination" ref="slcJms.destination.agent.newExecution" />
- <property name="messageListener">
- <bean parent="slcTemplate.jms.listenerAdapterCastor">
- <constructor-arg ref="modulesManager" />
- <property name="defaultListenerMethod" value="process" />
- </bean>
+ <property name="messageListener" ref="slcDefault.jms.agent">
+<!-- <bean parent="slcTemplate.jms.listenerAdapterCastor">
+ <constructor-arg ref="slcDefault.jms.agent" />
+ <property name="defaultListenerMethod" value="newExecution" />
+ </bean> -->
</property>
+ <!--
<property name="messageSelector">
<bean factory-bean="slcDefault.jms.agent" factory-method="getMessageSelector"/>
</property>
+ -->
</bean>
<bean id="slcDefault.jms.agent" class="org.argeo.slc.jms.JmsAgent">
<property name="jmsTemplate" ref="slcDefault.jms.jmsTemplateCastor" />
<property name="agentRegister" ref="slcJms.destination.agent.register" />
<property name="agentUnregister" ref="slcJms.destination.agent.unregister" />
+ <property name="messageConverter" ref="slcDefault.jms.castorMessageConverter" />
+ <property name="modulesManager" ref="modulesManager" />
</bean>
<property name="executionSpec" ref="basic.spec" />\r
<property name="executables">\r
<list>\r
- <ref bean="echo1" />\r
+ <bean parent="task.echo" scope="execution">\r
+ <property name="message" value="From basic @{testedComponentId}" />\r
+ <aop:scoped-proxy />\r
+ </bean>\r
<bean parent="testRun">\r
<property name="testDefinition" ref="testDef" />\r
<property name="testData">\r
\r
<bean id="echo1" parent="task.echo" scope="execution">\r
<property name="message"\r
- value="From main! @{testKey}, slc.flows=@{slc.flows}" />\r
+ value="From main! @{testKey}" />\r
<aop:scoped-proxy />\r
</bean>\r
</beans>
\ No newline at end of file
<bean class="org.argeo.slc.core.test.BasicTestData" scope="execution">\r
<aop:scoped-proxy />\r
<property name="expected" value="tata101" />\r
- <property name="reached" value="tata@{testKey}" />\r
+ <property name="reached" value="tata@{testedComponentId}" />\r
</bean>\r
</entry>\r
</map>\r
import java.util.UUID;
import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.converter.MessageConverter;
/** JMS based implementation of SLC Agent. */
public class JmsAgent extends AbstractAgent implements SlcAgent,
- InitializingBean, DisposableBean {
+ InitializingBean, DisposableBean, MessageListener {
private final static Log log = LogFactory.getLog(JmsAgent.class);
private final SlcAgentDescriptor agentDescriptor;
private Destination agentRegister;
private Destination agentUnregister;
+ private MessageConverter messageConverter;
+
public JmsAgent() {
try {
agentDescriptor = new SlcAgentDescriptor();
+ agentUnregister);
}
- public void newExecution(SlcExecution slcExecution) {
- log.info("Execute SlcExecution :" + slcExecution);
- runSlcExecution(slcExecution);
- }
-
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
this.agentUnregister = agentUnregister;
}
+ public void onMessage(Message message) {
+ // FIXME: we filter the messages on the client side,
+ // because of a weird problem with selector since moving to OSGi
+ try {
+ if (message.getStringProperty("slc-agentId").equals(
+ agentDescriptor.getUuid())) {
+ runSlcExecution((SlcExecution) messageConverter
+ .fromMessage(message));
+ }
+ } catch (JMSException e) {
+ throw new SlcException("Cannot convert message " + message, e);
+ }
+
+ }
+
public String getMessageSelector() {
- return "slc-agentId=" + agentDescriptor.getUuid();
+ String messageSelector = "slc-agentId=" + agentDescriptor.getUuid()
+ + "";
+ // String messageSelector = "slc-agentId LIKE '%'";
+ if (log.isDebugEnabled())
+ log.debug("Message selector: '" + messageSelector + "'");
+ return messageSelector;
+ }
+
+ public void setMessageConverter(MessageConverter messageConverter) {
+ this.messageConverter = messageConverter;
}
+
+
}
package org.argeo.slc.jms;
+import java.util.Enumeration;
+
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.argeo.slc.SlcException;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.xml.transform.StringSource;
public class MarshallerMessageConverter implements MessageConverter {
+ private final static Log log = LogFactory
+ .getLog(MarshallerMessageConverter.class);
+
private Marshaller marshaller;
private Unmarshaller unmarshaller;
public Object fromMessage(Message message) throws JMSException,
MessageConversionException {
+ if (log.isTraceEnabled()) {
+ Enumeration<String> names = message.getPropertyNames();
+ while (names.hasMoreElements()) {
+ String name = names.nextElement();
+ log.trace("JMS Property: " + name + "="
+ + message.getObjectProperty(name));
+ }
+ }
+
if (message instanceof TextMessage) {
+
String text = ((TextMessage) message).getText();
try {
return unmarshaller.unmarshal(new StringSource(text));
}
public void process(SlcExecution slcExecution) {
- log.info("SlcExecution " + slcExecution);
+ log.info("##\n## Process SLC Execution " + slcExecution+"\n##");
for(RealizedFlow flow : slcExecution.getRealizedFlows()) {
ExecutionModule module = getExecutionModule(flow.getModuleName(),
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.execution.ExecutionModulesManager;
import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.runtime.SlcApplication;
import org.argeo.slc.runtime.SlcExecutionContext;
public abstract class AbstractAgent {
private final static Log log = LogFactory.getLog(AbstractAgent.class);
- private SlcApplication<SlcExecutionContext> slcApplication;
+// private SlcApplication<SlcExecutionContext> slcApplication;
+
+ private ExecutionModulesManager modulesManager;
protected void runSlcExecution(final SlcExecution slcExecution) {
- // TODO: in a separate process
- Thread thread = new Thread("SlcExecution " + slcExecution.getUuid()) {
- public void run() {
- Properties props = new Properties();
- Map<String, String> attributes = slcExecution.getAttributes();
- for (String key : attributes.keySet()) {
- props.setProperty(key, attributes.get(key));
- if (log.isTraceEnabled())
- log.trace(key + "=" + props.getProperty(key));
- }
- slcApplication.execute(slcExecution, props, null, null);
- log.debug("Thread for SLC execution #" + slcExecution.getUuid()
- + " finished.");
- }
- };
- thread.start();
+ modulesManager.process(slcExecution);
+// Thread thread = new Thread("SlcExecution " + slcExecution.getUuid()) {
+// public void run() {
+// Properties props = new Properties();
+// Map<String, String> attributes = slcExecution.getAttributes();
+// for (String key : attributes.keySet()) {
+// props.setProperty(key, attributes.get(key));
+// if (log.isTraceEnabled())
+// log.trace(key + "=" + props.getProperty(key));
+// }
+// slcApplication.execute(slcExecution, props, null, null);
+// log.debug("Thread for SLC execution #" + slcExecution.getUuid()
+// + " finished.");
+// }
+// };
+// thread.start();
}
- public void setSlcApplication(
- SlcApplication<SlcExecutionContext> application) {
- this.slcApplication = application;
+ public void setModulesManager(ExecutionModulesManager modulesManager) {
+ this.modulesManager = modulesManager;
}
+
+// public void setSlcApplication(
+// SlcApplication<SlcExecutionContext> application) {
+// this.slcApplication = application;
+// }
+
}
log4j.logger.org.argeo.slc.execution.ExecutionContext=DEBUG
log4j.logger.org.argeo.slc.execution.SimpleExecutionSpec=DEBUG
-log4j.logger.org.springframework.osgi.web=WARN
+log4j.logger.org.springframework=WARN
+
+log4j.logger.org.apache.activemq=WARN
+log4j.logger.org.apache.activemq.transport=WARN
+log4j.logger.org.apache.activemq.selector=WARN
log4j.logger.org.apache.catalina=INFO
log4j.logger.org.apache.coyote=INFO
<import resource="classpath:org/argeo/slc/activemq/spring.xml" />
<!--
- Create embedded broker <amq:broker useJmx="false" persistent="false">
- <amq:transportConnectors> <amq:transportConnector
- uri="tcp://localhost:61616" /> </amq:transportConnectors>
- </amq:broker>
- -->
+ Create embedded broker-->
+ <amq:broker useJmx="false" persistent="false">
+ <amq:transportConnectors>
+ <amq:transportConnector uri="tcp://localhost:61616" />
+ </amq:transportConnectors>
+ </amq:broker>
+<!--
<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
<property name="config" value="classpath:activemq-conf.xml" />
<property name="start" value="true" />
</bean>
-
+ -->
<bean id="template.jms.listenerContainer" parent="slcTemplate.jms.listenerContainer"
abstract="true">
<!-- Active MQ -->
<context-param>
<param-name>org.apache.activemq.brokerURL</param-name>
- <param-value>vm://localhost</param-value>
+ <param-value>tcp://localhost:61616</param-value>
</context-param>
<context-param>
<property name="slc.osgi.start"
value="org.argeo.dep.osgi.catalina.start,org.springframework.osgi.extender,org.springframework.osgi.web.extender,org.springframework.osgi.samples.simplewebapp,org.argeo.slc.server.activemq,org.argeo.slc.server.hibernate,org.argeo.slc.webapp,org.argeo.slc.demo.basic,org.argeo.slc.demo.manager" />
- <property name="javaCommand"
- location="${java.home}${file.separator}bin${file.separator}java -Xmx256m" />
+ <property name="javaCommand" value="java -Xmx256m" />
<ant antfile="${equinoxDir}/org.argeo.slc.osgiboot-osgiboot.xml" />
</tasks>
</dependency>
</dependencies>
-<!--
- <repositories>
- <repository>
- <id>spring-osgified-artifacts</id>
- <snapshots>
- <enabled>true</enabled>
- </snapshots>
- <name>Springframework Maven OSGified Artifacts Repository</name>
- <url>http://maven.springframework.org/osgi</url>
- </repository>
- </repositories>
- -->
+ <!--
+ <repositories> <repository> <id>spring-osgified-artifacts</id>
+ <snapshots> <enabled>true</enabled> </snapshots> <name>Springframework
+ Maven OSGified Artifacts Repository</name>
+ <url>http://maven.springframework.org/osgi</url> </repository>
+ </repositories>
+ -->
</project>
\ No newline at end of file