Use durable subscribers
authorMathieu Baudier <mbaudier@argeo.org>
Thu, 7 May 2009 08:39:19 +0000 (08:39 +0000)
committerMathieu Baudier <mbaudier@argeo.org>
Thu, 7 May 2009 08:39:19 +0000 (08:39 +0000)
git-svn-id: https://svn.argeo.org/slc/trunk@2413 4cfe0d0a-d680-48aa-b62c-e0a02a3f76cc

runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/WebSlcEventListenerRegister.java [new file with mode: 0644]
runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/PollEventController.java
runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java
runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventPublisher.java
runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/msg/event/SlcEventListener.java
runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/msg/event/SlcEventListenerRegister.java
server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.activemq/META-INF/spring/activemq.xml
server/org.argeo.slc.siteserver/bundles/org.argeo.slc.server.jms/META-INF/spring/jms.xml
server/org.argeo.slc.siteserver/bundles/org.argeo.slc.webapp.war/WEB-INF/slc-service-servlet.xml

diff --git a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/WebSlcEventListenerRegister.java b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/WebSlcEventListenerRegister.java
new file mode 100644 (file)
index 0000000..7cc6e8b
--- /dev/null
@@ -0,0 +1,86 @@
+package org.argeo.slc.web.mvc;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.msg.event.SlcEvent;
+import org.argeo.slc.msg.event.SlcEventListener;
+import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
+import org.argeo.slc.msg.event.SlcEventListenerRegister;
+import org.springframework.web.context.request.RequestContextHolder;
+
+public class WebSlcEventListenerRegister implements SlcEventListenerRegister,
+               Serializable {
+       private final static Log log = LogFactory
+                       .getLog(WebSlcEventListenerRegister.class);
+       public final static String ATTR_EVENT_LISTENER = "slcEventListener";
+
+       static final long serialVersionUID = 1l;
+
+       transient private SlcEventListener eventListener = null;
+
+       private String clientId = null;
+
+       /** Synchronized */
+       private List<SlcEventListenerDescriptor> descriptors = new Vector<SlcEventListenerDescriptor>();
+
+       public synchronized void addEventListenerDescriptor(
+                       SlcEventListenerDescriptor eventListenerDescriptor) {
+               if (descriptors.contains(eventListenerDescriptor))
+                       descriptors.remove(eventListenerDescriptor);
+               descriptors.add(eventListenerDescriptor);
+       }
+
+       public synchronized void removeEventListenerDescriptor(
+                       SlcEventListenerDescriptor eventListenerDescriptor) {
+               descriptors.remove(eventListenerDescriptor);
+       }
+
+       // public synchronized List<SlcEventListenerDescriptor> getDescriptorsCopy()
+       // {
+       // return new ArrayList<SlcEventListenerDescriptor>(descriptors);
+       // }
+
+       public SlcEvent listen(SlcEventListener eventListener, Long timeout) {
+               checkClientId();
+               this.eventListener = eventListener;
+               return this.eventListener.listen(clientId, descriptors, timeout);
+       }
+
+       public void init() {
+               clientId = getSessionId();
+               checkClientId();
+
+               if (log.isDebugEnabled())
+                       log.debug("Initialized web event listener " + clientId);
+       }
+
+       public void close() {
+               checkClientId();
+               if (eventListener != null)
+                       eventListener.close(clientId);
+
+               if (log.isDebugEnabled())
+                       log.debug("Closed web event listener " + clientId);
+       }
+
+       protected void checkClientId() {
+               String sessionId = getSessionId();
+               if (clientId == null || !clientId.equals(sessionId))
+                       throw new SlcException("Client id " + clientId
+                                       + " not consistent with web session id " + sessionId);
+       }
+
+       protected String getSessionId() {
+               return RequestContextHolder.currentRequestAttributes().getSessionId();
+       }
+
+       public String getClientId() {
+               return clientId;
+       }
+
+}
index 0ebb2f6469b202a53ee87f29b585adc0840304e0..daa38314a8cb0200ace709e1ec31a087570dc862 100644 (file)
@@ -14,7 +14,8 @@ import org.springframework.web.servlet.ModelAndView;
 public class PollEventController extends AbstractServiceController {
        private final static Log log = LogFactory.getLog(PollEventController.class);
 
-       private SlcEventListener eventListener;
+       private SlcEventListener eventListener = null;
+
        private SlcEventListenerRegister eventListenerRegister;
        private Long defaultTimeout = 10000l;
 
@@ -30,7 +31,7 @@ public class PollEventController extends AbstractServiceController {
                else
                        timeout = defaultTimeout;
 
-               SlcEvent event = eventListener.listen(eventListenerRegister, timeout);
+               SlcEvent event = eventListenerRegister.listen(eventListener, timeout);
                if (event != null) {
                        modelAndView.addObject("event", event);
 
@@ -40,10 +41,6 @@ public class PollEventController extends AbstractServiceController {
                }
        }
 
-       public void setEventListener(SlcEventListener slcEventListener) {
-               this.eventListener = slcEventListener;
-       }
-
        public void setEventListenerRegister(
                        SlcEventListenerRegister eventListenerRegister) {
                this.eventListenerRegister = eventListenerRegister;
@@ -53,4 +50,8 @@ public class PollEventController extends AbstractServiceController {
                this.defaultTimeout = defaultTimeout;
        }
 
+       public void setEventListener(SlcEventListener eventListener) {
+               this.eventListener = eventListener;
+       }
+
 }
index a05a367b4886f83025e48cc4f2b900910be5835c..0c702ab620e14fecd906f6978034c5ab95761216 100644 (file)
@@ -1,9 +1,20 @@
 package org.argeo.slc.jms;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -13,23 +24,21 @@ import org.argeo.slc.msg.event.SlcEventListener;
 import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
 import org.argeo.slc.msg.event.SlcEventListenerRegister;
 import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.JmsUtils;
 import org.springframework.jms.support.converter.MessageConverter;
 
 public class JmsSlcEventListener implements SlcEventListener {
        private final static Log log = LogFactory.getLog(JmsSlcEventListener.class);
 
-       private Destination eventsDestination;
+       private Topic eventsDestination;
        private ConnectionFactory jmsConnectionFactory;
        private MessageConverter messageConverter;
 
-       public SlcEvent listen(SlcEventListenerRegister register, Long timeout) {
-               JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory);
-               jmsTemplate.setMessageConverter(messageConverter);
-               jmsTemplate.setReceiveTimeout(timeout);
-
-               List<SlcEventListenerDescriptor> descriptors = register
-                               .getDescriptorsCopy();
+       private Map<String, ListeningClient> clients = Collections
+                       .synchronizedMap(new HashMap<String, ListeningClient>());
 
+       public SlcEvent listen(String clientId,
+                       List<SlcEventListenerDescriptor> descriptors, Long timeout) {
                if (descriptors.size() == 0) {
                        // No listeners, just waiting
                        try {
@@ -44,8 +53,31 @@ public class JmsSlcEventListener implements SlcEventListener {
                        if (log.isTraceEnabled())
                                log.debug("Selector: " + selector);
 
-                       Object obj = jmsTemplate.receiveSelectedAndConvert(
-                                       eventsDestination, selector);
+                       Object obj = null;
+                       Session session = null;
+                       TopicSubscriber topicSubscriber = null;
+                       // MessageConsumer messageConsumer = null;
+                       try {
+                               // Connection connection = getClient(clientId).getConnection();
+                               // session = connection.createSession(false,
+                               // Session.AUTO_ACKNOWLEDGE);
+                               session = getClient(clientId).getSession();
+                               topicSubscriber = session.createDurableSubscriber(
+                                               eventsDestination, clientId,
+                                               createSelector(descriptors), true);
+                               Message message = topicSubscriber.receive(timeout);
+                               // messageConsumer = session.createConsumer(eventsDestination,
+                               // createSelector(descriptors));
+                               // Message message = messageConsumer.receive(timeout);
+                               obj = messageConverter.fromMessage(message);
+                       } catch (JMSException e) {
+                               throw new SlcException("Cannot poll events for client "
+                                               + clientId, e);
+                       } finally {
+                               // JmsUtils.closeMessageConsumer(messageConsumer);
+                               JmsUtils.closeMessageConsumer(topicSubscriber);
+                               // JmsUtils.closeSession(session);
+                       }
 
                        if (obj == null)
                                return null;
@@ -54,6 +86,28 @@ public class JmsSlcEventListener implements SlcEventListener {
                }
        }
 
+       /*
+        * public SlcEvent listen(SlcEventListenerRegister register, Long timeout) {
+        * 
+        * JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory);
+        * jmsTemplate.setMessageConverter(messageConverter);
+        * jmsTemplate.setReceiveTimeout(timeout);
+        * 
+        * List<SlcEventListenerDescriptor> descriptors = register
+        * .getDescriptorsCopy();
+        * 
+        * if (descriptors.size() == 0) { // No listeners, just waiting try {
+        * Thread.sleep(timeout); } catch (InterruptedException e) { // silent }
+        * return null; } else { String selector = createSelector(descriptors);
+        * 
+        * if (log.isTraceEnabled()) log.debug("Selector: " + selector);
+        * 
+        * Object obj = jmsTemplate.receiveSelectedAndConvert( eventsDestination,
+        * selector);
+        * 
+        * if (obj == null) return null; else return (SlcEvent) obj; } }
+        */
+
        /** Returns null if no filter */
        protected String createSelector(List<SlcEventListenerDescriptor> descriptors) {
                if (descriptors.size() == 0)
@@ -79,7 +133,7 @@ public class JmsSlcEventListener implements SlcEventListener {
                return buf.toString();
        }
 
-       public void setEventsDestination(Destination eventsDestination) {
+       public void setEventsDestination(Topic eventsDestination) {
                this.eventsDestination = eventsDestination;
        }
 
@@ -91,4 +145,80 @@ public class JmsSlcEventListener implements SlcEventListener {
                this.messageConverter = messageConverter;
        }
 
+       public ListeningClient init(String clientId) {
+               Connection connection = null;
+               try {
+                       connection = jmsConnectionFactory.createConnection();
+                       connection.setClientID(clientId);
+                       connection.start();
+                       ListeningClient client = new ListeningClient(connection);
+                       return client;
+               } catch (JMSException e) {
+                       throw new SlcException("Could not init listening client "
+                                       + clientId, e);
+               } finally {
+               }
+       }
+
+       public void close(String clientId) {
+               Session session = null;
+               ListeningClient client = getClient(clientId);
+               Connection connection = client.getConnection();
+               try {
+                       session = client.getSession();
+                       session.unsubscribe(clientId);
+               } catch (JMSException e) {
+                       log.warn("Could not unsubscribe client " + clientId, e);
+               } finally {
+                       JmsUtils.closeSession(session);
+               }
+
+               // JmsUtils.closeSession(client.getSession());
+               clients.remove(clientId);
+
+               try {
+                       connection.stop();
+                       connection.close();
+               } catch (JMSException e) {
+                       throw new SlcException("Could not close JMS connection for client "
+                                       + clientId, e);
+               } finally {
+                       clients.remove(clientId);
+               }
+       }
+
+       protected ListeningClient getClient(String clientId) {
+               ListeningClient client = clients.get(clientId);
+               if (client == null) {
+                       // Lazy init
+                       client = init(clientId);
+                       clients.put(clientId, client);
+               }
+               return client;
+       }
+
+       protected class ListeningClient {
+               private final Connection connection;
+               private final Session session;
+
+               public ListeningClient(Connection connection) {
+                       super();
+                       this.connection = connection;
+                       try {
+                               session = connection.createSession(false,
+                                               Session.AUTO_ACKNOWLEDGE);
+                       } catch (JMSException e) {
+                               throw new SlcException("Cannot create session");
+                       }
+               }
+
+               public Connection getConnection() {
+                       return connection;
+               }
+
+               public Session getSession() {
+                       return session;
+               }
+
+       }
 }
index bcf13325cf666f28e1b436ce79adab00240baa5e..8d206faff49e03e020016e10427ad6463344bfc4 100644 (file)
@@ -2,10 +2,12 @@ package org.argeo.slc.jms;
 
 import java.util.Map;
 
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
+import org.argeo.slc.SlcException;
 import org.argeo.slc.msg.event.SlcEvent;
 import org.argeo.slc.msg.event.SlcEventPublisher;
 import org.springframework.jms.core.JmsTemplate;
@@ -16,6 +18,10 @@ public class JmsSlcEventPublisher implements SlcEventPublisher {
        private JmsTemplate jmsTemplate;
 
        public void publish(final SlcEvent event) {
+               if (jmsTemplate.getDeliveryMode() != DeliveryMode.PERSISTENT)
+                       throw new SlcException(
+                                       "Delivery mode has to be persistent in order to have durable subscription");
+
                jmsTemplate.convertAndSend(eventsDestination, event,
                                new MessagePostProcessor() {
 
@@ -28,17 +34,6 @@ public class JmsSlcEventPublisher implements SlcEventPublisher {
                                                return message;
                                        }
                                });
-               // jmsTemplate.send(eventsDestination, new MessageCreator() {
-               // public Message createMessage(Session session) throws JMSException {
-               // TextMessage msg = session.createTextMessage();
-               // // TODO: remove workaround when upgrading to ActiveMQ 5.3
-               // // Workaround for
-               // // https://issues.apache.org/activemq/browse/AMQ-2046
-               // msg.setText("");
-               //
-               // return msg;
-               // }
-               // });
        }
 
        public void setEventsDestination(Destination eventsDestination) {
index 75048f78e98e4932ada217962aa2490d764d2ad0..e1f84383b2606a63892d6d9147f60929ac925a70 100644 (file)
@@ -1,5 +1,7 @@
 package org.argeo.slc.msg.event;
 
+import java.util.List;
+
 public interface SlcEventListener {
        /**
         * Blocks until an event is received or timeout is reached
@@ -7,5 +9,8 @@ public interface SlcEventListener {
         * @return the event received or null if timeout was reached before
         *         receiving one
         */
-       public SlcEvent listen(SlcEventListenerRegister register, Long timeout);
+       public SlcEvent listen(String clientId,
+                       List<SlcEventListenerDescriptor> descriptors, Long timeout);
+
+       public void close(String clientId);
 }
index 52aa8c3b02f19b8468c90b6af9c4c99e0f415342..85e5cbd646fe9eb9d555088a5a44f48ec1da9f1e 100644 (file)
@@ -1,29 +1,12 @@
 package org.argeo.slc.msg.event;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Vector;
 
-public class SlcEventListenerRegister implements Serializable {
-       static final long serialVersionUID = 1l;
+public interface SlcEventListenerRegister {
+       public void addEventListenerDescriptor(
+                       SlcEventListenerDescriptor eventListenerDescriptor);
 
-       /** Synchronized */
-       private List<SlcEventListenerDescriptor> descriptors = new Vector<SlcEventListenerDescriptor>();
+       public void removeEventListenerDescriptor(
+                       SlcEventListenerDescriptor eventListenerDescriptor);
 
-       public synchronized void addEventListenerDescriptor(
-                       SlcEventListenerDescriptor eventListenerDescriptor) {
-               if (descriptors.contains(eventListenerDescriptor))
-                       descriptors.remove(eventListenerDescriptor);
-               descriptors.add(eventListenerDescriptor);
-       }
-
-       public synchronized void removeEventListenerDescriptor(
-                       SlcEventListenerDescriptor eventListenerDescriptor) {
-               descriptors.remove(eventListenerDescriptor);
-       }
-
-       public synchronized List<SlcEventListenerDescriptor> getDescriptorsCopy() {
-               return new ArrayList<SlcEventListenerDescriptor>(descriptors);
-       }
+       public SlcEvent listen(SlcEventListener eventListener, Long timeout);
 }
index bb8384bf458342e7ebc7e24df1cf4e69fc4ac784..22a9533993af09e213cf6b42f0cd5cc40492299d 100644 (file)
        </amq:broker>
 
        <!-- Connection Factory -->
-       <!--  -->
-       <bean id="jmsConnectionFactory"
+       <!--
+               <bean id="jmsConnectionFactory"
                class="org.springframework.jms.connection.CachingConnectionFactory">
-               <property name="targetConnectionFactory">
+               <property name="targetConnectionFactory"> <bean
+               class="org.apache.activemq.ActiveMQConnectionFactory"> <property
+               name="brokerURL"> <value>vm://localhost</value> </property> </bean>
+               </property> </bean>
+       -->
+
+       <!---->
+       <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
+               destroy-method="stop" depends-on="broker">
+               <property name="connectionFactory">
                        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                                <property name="brokerURL">
                                        <value>vm://localhost</value>
                </property>
        </bean>
 
-       <!--
-               <bean id="jmsConnectionFactory"
-               class="org.apache.activemq.pool.PooledConnectionFactory"
-               destroy-method="stop" depends-on="broker"> <property
-               name="connectionFactory"> <bean
-               class="org.apache.activemq.ActiveMQConnectionFactory"> <property
-               name="brokerURL"> <value>vm://localhost</value> </property> </bean>
-               </property> </bean>
-       -->
 
        <bean id="jmsTransactionManager"
                class="org.springframework.jms.connection.JmsTransactionManager">
index d141b7f8799e7a1e9fa29808f97fe18de083fc5f..0edb121ac361734e0ba329e9e1cb8a6c8d3b5e92 100644 (file)
@@ -8,7 +8,17 @@
 
        <!-- Events -->
        <bean id="jmsEventListener" class="org.argeo.slc.jms.JmsSlcEventListener">
-               <property name="jmsConnectionFactory" ref="jmsConnectionFactory" />
+               <property name="jmsConnectionFactory">
+                       <!--
+                               Need its own connection factory in order to set client ids (not
+                               possible on pool)
+                       -->
+                       <bean class="org.apache.activemq.ActiveMQConnectionFactory">
+                               <property name="brokerURL">
+                                       <value>vm://localhost</value>
+                               </property>
+                       </bean>
+               </property>
                <property name="eventsDestination" ref="slcJms.destination.events" />
                <property name="messageConverter" ref="slcDefault.jms.castorMessageConverter" />
        </bean>
index 2de1cd8d58d644220f850d7ed25495aeae439547..e87262affc6a034fcbee7d8fadbf5de877a261d3 100644 (file)
@@ -87,8 +87,8 @@
                <property name="eventListener" ref="eventListener" />
        </bean>
 
-       <bean name="eventListenerRegister" class="org.argeo.slc.msg.event.SlcEventListenerRegister"
-               scope="session">
+       <bean name="eventListenerRegister" class="org.argeo.slc.web.mvc.WebSlcEventListenerRegister"
+               scope="session" init-method="init" destroy-method="close">
                <aop:scoped-proxy />
        </bean>