From 304125ea23a3570c78149816d76951bbb258707d Mon Sep 17 00:00:00 2001 From: Mathieu Baudier Date: Thu, 7 May 2009 08:39:19 +0000 Subject: [PATCH] Use durable subscribers git-svn-id: https://svn.argeo.org/slc/trunk@2413 4cfe0d0a-d680-48aa-b62c-e0a02a3f76cc --- .../web/mvc/WebSlcEventListenerRegister.java | 86 ++++++++++ .../web/mvc/event/PollEventController.java | 13 +- .../argeo/slc/jms/JmsSlcEventListener.java | 152 ++++++++++++++++-- .../argeo/slc/jms/JmsSlcEventPublisher.java | 17 +- .../argeo/slc/msg/event/SlcEventListener.java | 7 +- .../msg/event/SlcEventListenerRegister.java | 29 +--- .../META-INF/spring/activemq.xml | 24 +-- .../META-INF/spring/jms.xml | 12 +- .../WEB-INF/slc-service-servlet.xml | 4 +- 9 files changed, 277 insertions(+), 67 deletions(-) create mode 100644 runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/WebSlcEventListenerRegister.java diff --git a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/WebSlcEventListenerRegister.java b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/WebSlcEventListenerRegister.java new file mode 100644 index 000000000..7cc6e8b9d --- /dev/null +++ b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/WebSlcEventListenerRegister.java @@ -0,0 +1,86 @@ +package org.argeo.slc.web.mvc; + +import java.io.Serializable; +import java.util.List; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.argeo.slc.SlcException; +import org.argeo.slc.msg.event.SlcEvent; +import org.argeo.slc.msg.event.SlcEventListener; +import org.argeo.slc.msg.event.SlcEventListenerDescriptor; +import org.argeo.slc.msg.event.SlcEventListenerRegister; +import org.springframework.web.context.request.RequestContextHolder; + +public class WebSlcEventListenerRegister implements SlcEventListenerRegister, + Serializable { + private final static Log log = LogFactory + .getLog(WebSlcEventListenerRegister.class); + public final static String ATTR_EVENT_LISTENER = "slcEventListener"; + + static final long serialVersionUID = 1l; + + transient private SlcEventListener eventListener = null; + + private String clientId = null; + + /** Synchronized */ + private List descriptors = new Vector(); + + public synchronized void addEventListenerDescriptor( + SlcEventListenerDescriptor eventListenerDescriptor) { + if (descriptors.contains(eventListenerDescriptor)) + descriptors.remove(eventListenerDescriptor); + descriptors.add(eventListenerDescriptor); + } + + public synchronized void removeEventListenerDescriptor( + SlcEventListenerDescriptor eventListenerDescriptor) { + descriptors.remove(eventListenerDescriptor); + } + + // public synchronized List getDescriptorsCopy() + // { + // return new ArrayList(descriptors); + // } + + public SlcEvent listen(SlcEventListener eventListener, Long timeout) { + checkClientId(); + this.eventListener = eventListener; + return this.eventListener.listen(clientId, descriptors, timeout); + } + + public void init() { + clientId = getSessionId(); + checkClientId(); + + if (log.isDebugEnabled()) + log.debug("Initialized web event listener " + clientId); + } + + public void close() { + checkClientId(); + if (eventListener != null) + eventListener.close(clientId); + + if (log.isDebugEnabled()) + log.debug("Closed web event listener " + clientId); + } + + protected void checkClientId() { + String sessionId = getSessionId(); + if (clientId == null || !clientId.equals(sessionId)) + throw new SlcException("Client id " + clientId + + " not consistent with web session id " + sessionId); + } + + protected String getSessionId() { + return RequestContextHolder.currentRequestAttributes().getSessionId(); + } + + public String getClientId() { + return clientId; + } + +} diff --git a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/PollEventController.java b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/PollEventController.java index 0ebb2f646..daa38314a 100644 --- a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/PollEventController.java +++ b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/PollEventController.java @@ -14,7 +14,8 @@ import org.springframework.web.servlet.ModelAndView; public class PollEventController extends AbstractServiceController { private final static Log log = LogFactory.getLog(PollEventController.class); - private SlcEventListener eventListener; + private SlcEventListener eventListener = null; + private SlcEventListenerRegister eventListenerRegister; private Long defaultTimeout = 10000l; @@ -30,7 +31,7 @@ public class PollEventController extends AbstractServiceController { else timeout = defaultTimeout; - SlcEvent event = eventListener.listen(eventListenerRegister, timeout); + SlcEvent event = eventListenerRegister.listen(eventListener, timeout); if (event != null) { modelAndView.addObject("event", event); @@ -40,10 +41,6 @@ public class PollEventController extends AbstractServiceController { } } - public void setEventListener(SlcEventListener slcEventListener) { - this.eventListener = slcEventListener; - } - public void setEventListenerRegister( SlcEventListenerRegister eventListenerRegister) { this.eventListenerRegister = eventListenerRegister; @@ -53,4 +50,8 @@ public class PollEventController extends AbstractServiceController { this.defaultTimeout = defaultTimeout; } + public void setEventListener(SlcEventListener eventListener) { + this.eventListener = eventListener; + } + } diff --git a/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java b/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java index a05a367b4..0c702ab62 100644 --- a/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java +++ b/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java @@ -1,9 +1,20 @@ package org.argeo.slc.jms; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -13,23 +24,21 @@ import org.argeo.slc.msg.event.SlcEventListener; import org.argeo.slc.msg.event.SlcEventListenerDescriptor; import org.argeo.slc.msg.event.SlcEventListenerRegister; import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.support.JmsUtils; import org.springframework.jms.support.converter.MessageConverter; public class JmsSlcEventListener implements SlcEventListener { private final static Log log = LogFactory.getLog(JmsSlcEventListener.class); - private Destination eventsDestination; + private Topic eventsDestination; private ConnectionFactory jmsConnectionFactory; private MessageConverter messageConverter; - public SlcEvent listen(SlcEventListenerRegister register, Long timeout) { - JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory); - jmsTemplate.setMessageConverter(messageConverter); - jmsTemplate.setReceiveTimeout(timeout); - - List descriptors = register - .getDescriptorsCopy(); + private Map clients = Collections + .synchronizedMap(new HashMap()); + public SlcEvent listen(String clientId, + List descriptors, Long timeout) { if (descriptors.size() == 0) { // No listeners, just waiting try { @@ -44,8 +53,31 @@ public class JmsSlcEventListener implements SlcEventListener { if (log.isTraceEnabled()) log.debug("Selector: " + selector); - Object obj = jmsTemplate.receiveSelectedAndConvert( - eventsDestination, selector); + Object obj = null; + Session session = null; + TopicSubscriber topicSubscriber = null; + // MessageConsumer messageConsumer = null; + try { + // Connection connection = getClient(clientId).getConnection(); + // session = connection.createSession(false, + // Session.AUTO_ACKNOWLEDGE); + session = getClient(clientId).getSession(); + topicSubscriber = session.createDurableSubscriber( + eventsDestination, clientId, + createSelector(descriptors), true); + Message message = topicSubscriber.receive(timeout); + // messageConsumer = session.createConsumer(eventsDestination, + // createSelector(descriptors)); + // Message message = messageConsumer.receive(timeout); + obj = messageConverter.fromMessage(message); + } catch (JMSException e) { + throw new SlcException("Cannot poll events for client " + + clientId, e); + } finally { + // JmsUtils.closeMessageConsumer(messageConsumer); + JmsUtils.closeMessageConsumer(topicSubscriber); + // JmsUtils.closeSession(session); + } if (obj == null) return null; @@ -54,6 +86,28 @@ public class JmsSlcEventListener implements SlcEventListener { } } + /* + * public SlcEvent listen(SlcEventListenerRegister register, Long timeout) { + * + * JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory); + * jmsTemplate.setMessageConverter(messageConverter); + * jmsTemplate.setReceiveTimeout(timeout); + * + * List descriptors = register + * .getDescriptorsCopy(); + * + * if (descriptors.size() == 0) { // No listeners, just waiting try { + * Thread.sleep(timeout); } catch (InterruptedException e) { // silent } + * return null; } else { String selector = createSelector(descriptors); + * + * if (log.isTraceEnabled()) log.debug("Selector: " + selector); + * + * Object obj = jmsTemplate.receiveSelectedAndConvert( eventsDestination, + * selector); + * + * if (obj == null) return null; else return (SlcEvent) obj; } } + */ + /** Returns null if no filter */ protected String createSelector(List descriptors) { if (descriptors.size() == 0) @@ -79,7 +133,7 @@ public class JmsSlcEventListener implements SlcEventListener { return buf.toString(); } - public void setEventsDestination(Destination eventsDestination) { + public void setEventsDestination(Topic eventsDestination) { this.eventsDestination = eventsDestination; } @@ -91,4 +145,80 @@ public class JmsSlcEventListener implements SlcEventListener { this.messageConverter = messageConverter; } + public ListeningClient init(String clientId) { + Connection connection = null; + try { + connection = jmsConnectionFactory.createConnection(); + connection.setClientID(clientId); + connection.start(); + ListeningClient client = new ListeningClient(connection); + return client; + } catch (JMSException e) { + throw new SlcException("Could not init listening client " + + clientId, e); + } finally { + } + } + + public void close(String clientId) { + Session session = null; + ListeningClient client = getClient(clientId); + Connection connection = client.getConnection(); + try { + session = client.getSession(); + session.unsubscribe(clientId); + } catch (JMSException e) { + log.warn("Could not unsubscribe client " + clientId, e); + } finally { + JmsUtils.closeSession(session); + } + + // JmsUtils.closeSession(client.getSession()); + clients.remove(clientId); + + try { + connection.stop(); + connection.close(); + } catch (JMSException e) { + throw new SlcException("Could not close JMS connection for client " + + clientId, e); + } finally { + clients.remove(clientId); + } + } + + protected ListeningClient getClient(String clientId) { + ListeningClient client = clients.get(clientId); + if (client == null) { + // Lazy init + client = init(clientId); + clients.put(clientId, client); + } + return client; + } + + protected class ListeningClient { + private final Connection connection; + private final Session session; + + public ListeningClient(Connection connection) { + super(); + this.connection = connection; + try { + session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + } catch (JMSException e) { + throw new SlcException("Cannot create session"); + } + } + + public Connection getConnection() { + return connection; + } + + public Session getSession() { + return session; + } + + } } diff --git a/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventPublisher.java b/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventPublisher.java index bcf13325c..8d206faff 100644 --- a/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventPublisher.java +++ b/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventPublisher.java @@ -2,10 +2,12 @@ package org.argeo.slc.jms; import java.util.Map; +import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; +import org.argeo.slc.SlcException; import org.argeo.slc.msg.event.SlcEvent; import org.argeo.slc.msg.event.SlcEventPublisher; import org.springframework.jms.core.JmsTemplate; @@ -16,6 +18,10 @@ public class JmsSlcEventPublisher implements SlcEventPublisher { private JmsTemplate jmsTemplate; public void publish(final SlcEvent event) { + if (jmsTemplate.getDeliveryMode() != DeliveryMode.PERSISTENT) + throw new SlcException( + "Delivery mode has to be persistent in order to have durable subscription"); + jmsTemplate.convertAndSend(eventsDestination, event, new MessagePostProcessor() { @@ -28,17 +34,6 @@ public class JmsSlcEventPublisher implements SlcEventPublisher { return message; } }); - // jmsTemplate.send(eventsDestination, new MessageCreator() { - // public Message createMessage(Session session) throws JMSException { - // TextMessage msg = session.createTextMessage(); - // // TODO: remove workaround when upgrading to ActiveMQ 5.3 - // // Workaround for - // // https://issues.apache.org/activemq/browse/AMQ-2046 - // msg.setText(""); - // - // return msg; - // } - // }); } public void setEventsDestination(Destination eventsDestination) { diff --git a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/msg/event/SlcEventListener.java b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/msg/event/SlcEventListener.java index 75048f78e..e1f84383b 100644 --- a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/msg/event/SlcEventListener.java +++ b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/msg/event/SlcEventListener.java @@ -1,5 +1,7 @@ package org.argeo.slc.msg.event; +import java.util.List; + public interface SlcEventListener { /** * Blocks until an event is received or timeout is reached @@ -7,5 +9,8 @@ public interface SlcEventListener { * @return the event received or null if timeout was reached before * receiving one */ - public SlcEvent listen(SlcEventListenerRegister register, Long timeout); + public SlcEvent listen(String clientId, + List descriptors, Long timeout); + + public void close(String clientId); } diff --git a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/msg/event/SlcEventListenerRegister.java b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/msg/event/SlcEventListenerRegister.java index 52aa8c3b0..85e5cbd64 100644 --- a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/msg/event/SlcEventListenerRegister.java +++ b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/msg/event/SlcEventListenerRegister.java @@ -1,29 +1,12 @@ package org.argeo.slc.msg.event; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Vector; -public class SlcEventListenerRegister implements Serializable { - static final long serialVersionUID = 1l; +public interface SlcEventListenerRegister { + public void addEventListenerDescriptor( + SlcEventListenerDescriptor eventListenerDescriptor); - /** Synchronized */ - private List descriptors = new Vector(); + public void removeEventListenerDescriptor( + SlcEventListenerDescriptor eventListenerDescriptor); - public synchronized void addEventListenerDescriptor( - SlcEventListenerDescriptor eventListenerDescriptor) { - if (descriptors.contains(eventListenerDescriptor)) - descriptors.remove(eventListenerDescriptor); - descriptors.add(eventListenerDescriptor); - } - - public synchronized void removeEventListenerDescriptor( - SlcEventListenerDescriptor eventListenerDescriptor) { - descriptors.remove(eventListenerDescriptor); - } - - public synchronized List getDescriptorsCopy() { - return new ArrayList(descriptors); - } + public SlcEvent listen(SlcEventListener eventListener, Long timeout); } diff --git a/server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.activemq/META-INF/spring/activemq.xml b/server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.activemq/META-INF/spring/activemq.xml index bb8384bf4..22a953399 100644 --- a/server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.activemq/META-INF/spring/activemq.xml +++ b/server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.activemq/META-INF/spring/activemq.xml @@ -24,10 +24,19 @@ - - - + vm://localhost + + --> + + + + vm://localhost @@ -36,15 +45,6 @@ - diff --git a/server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.jms/META-INF/spring/jms.xml b/server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.jms/META-INF/spring/jms.xml index d141b7f87..0edb121ac 100644 --- a/server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.jms/META-INF/spring/jms.xml +++ b/server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.jms/META-INF/spring/jms.xml @@ -8,7 +8,17 @@ - + + + + + vm://localhost + + + diff --git a/server/org.argeo.slc.siteserver/bundles/org.argeo.slc.webapp.war/WEB-INF/slc-service-servlet.xml b/server/org.argeo.slc.siteserver/bundles/org.argeo.slc.webapp.war/WEB-INF/slc-service-servlet.xml index 2de1cd8d5..e87262aff 100644 --- a/server/org.argeo.slc.siteserver/bundles/org.argeo.slc.webapp.war/WEB-INF/slc-service-servlet.xml +++ b/server/org.argeo.slc.siteserver/bundles/org.argeo.slc.webapp.war/WEB-INF/slc-service-servlet.xml @@ -87,8 +87,8 @@ - + -- 2.39.2