]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java
Use durable subscribers
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsSlcEventListener.java
index a05a367b4886f83025e48cc4f2b900910be5835c..0c702ab620e14fecd906f6978034c5ab95761216 100644 (file)
@@ -1,9 +1,20 @@
 package org.argeo.slc.jms;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -13,23 +24,21 @@ import org.argeo.slc.msg.event.SlcEventListener;
 import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
 import org.argeo.slc.msg.event.SlcEventListenerRegister;
 import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.JmsUtils;
 import org.springframework.jms.support.converter.MessageConverter;
 
 public class JmsSlcEventListener implements SlcEventListener {
        private final static Log log = LogFactory.getLog(JmsSlcEventListener.class);
 
-       private Destination eventsDestination;
+       private Topic eventsDestination;
        private ConnectionFactory jmsConnectionFactory;
        private MessageConverter messageConverter;
 
-       public SlcEvent listen(SlcEventListenerRegister register, Long timeout) {
-               JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory);
-               jmsTemplate.setMessageConverter(messageConverter);
-               jmsTemplate.setReceiveTimeout(timeout);
-
-               List<SlcEventListenerDescriptor> descriptors = register
-                               .getDescriptorsCopy();
+       private Map<String, ListeningClient> clients = Collections
+                       .synchronizedMap(new HashMap<String, ListeningClient>());
 
+       public SlcEvent listen(String clientId,
+                       List<SlcEventListenerDescriptor> descriptors, Long timeout) {
                if (descriptors.size() == 0) {
                        // No listeners, just waiting
                        try {
@@ -44,8 +53,31 @@ public class JmsSlcEventListener implements SlcEventListener {
                        if (log.isTraceEnabled())
                                log.debug("Selector: " + selector);
 
-                       Object obj = jmsTemplate.receiveSelectedAndConvert(
-                                       eventsDestination, selector);
+                       Object obj = null;
+                       Session session = null;
+                       TopicSubscriber topicSubscriber = null;
+                       // MessageConsumer messageConsumer = null;
+                       try {
+                               // Connection connection = getClient(clientId).getConnection();
+                               // session = connection.createSession(false,
+                               // Session.AUTO_ACKNOWLEDGE);
+                               session = getClient(clientId).getSession();
+                               topicSubscriber = session.createDurableSubscriber(
+                                               eventsDestination, clientId,
+                                               createSelector(descriptors), true);
+                               Message message = topicSubscriber.receive(timeout);
+                               // messageConsumer = session.createConsumer(eventsDestination,
+                               // createSelector(descriptors));
+                               // Message message = messageConsumer.receive(timeout);
+                               obj = messageConverter.fromMessage(message);
+                       } catch (JMSException e) {
+                               throw new SlcException("Cannot poll events for client "
+                                               + clientId, e);
+                       } finally {
+                               // JmsUtils.closeMessageConsumer(messageConsumer);
+                               JmsUtils.closeMessageConsumer(topicSubscriber);
+                               // JmsUtils.closeSession(session);
+                       }
 
                        if (obj == null)
                                return null;
@@ -54,6 +86,28 @@ public class JmsSlcEventListener implements SlcEventListener {
                }
        }
 
+       /*
+        * public SlcEvent listen(SlcEventListenerRegister register, Long timeout) {
+        * 
+        * JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory);
+        * jmsTemplate.setMessageConverter(messageConverter);
+        * jmsTemplate.setReceiveTimeout(timeout);
+        * 
+        * List<SlcEventListenerDescriptor> descriptors = register
+        * .getDescriptorsCopy();
+        * 
+        * if (descriptors.size() == 0) { // No listeners, just waiting try {
+        * Thread.sleep(timeout); } catch (InterruptedException e) { // silent }
+        * return null; } else { String selector = createSelector(descriptors);
+        * 
+        * if (log.isTraceEnabled()) log.debug("Selector: " + selector);
+        * 
+        * Object obj = jmsTemplate.receiveSelectedAndConvert( eventsDestination,
+        * selector);
+        * 
+        * if (obj == null) return null; else return (SlcEvent) obj; } }
+        */
+
        /** Returns null if no filter */
        protected String createSelector(List<SlcEventListenerDescriptor> descriptors) {
                if (descriptors.size() == 0)
@@ -79,7 +133,7 @@ public class JmsSlcEventListener implements SlcEventListener {
                return buf.toString();
        }
 
-       public void setEventsDestination(Destination eventsDestination) {
+       public void setEventsDestination(Topic eventsDestination) {
                this.eventsDestination = eventsDestination;
        }
 
@@ -91,4 +145,80 @@ public class JmsSlcEventListener implements SlcEventListener {
                this.messageConverter = messageConverter;
        }
 
+       public ListeningClient init(String clientId) {
+               Connection connection = null;
+               try {
+                       connection = jmsConnectionFactory.createConnection();
+                       connection.setClientID(clientId);
+                       connection.start();
+                       ListeningClient client = new ListeningClient(connection);
+                       return client;
+               } catch (JMSException e) {
+                       throw new SlcException("Could not init listening client "
+                                       + clientId, e);
+               } finally {
+               }
+       }
+
+       public void close(String clientId) {
+               Session session = null;
+               ListeningClient client = getClient(clientId);
+               Connection connection = client.getConnection();
+               try {
+                       session = client.getSession();
+                       session.unsubscribe(clientId);
+               } catch (JMSException e) {
+                       log.warn("Could not unsubscribe client " + clientId, e);
+               } finally {
+                       JmsUtils.closeSession(session);
+               }
+
+               // JmsUtils.closeSession(client.getSession());
+               clients.remove(clientId);
+
+               try {
+                       connection.stop();
+                       connection.close();
+               } catch (JMSException e) {
+                       throw new SlcException("Could not close JMS connection for client "
+                                       + clientId, e);
+               } finally {
+                       clients.remove(clientId);
+               }
+       }
+
+       protected ListeningClient getClient(String clientId) {
+               ListeningClient client = clients.get(clientId);
+               if (client == null) {
+                       // Lazy init
+                       client = init(clientId);
+                       clients.put(clientId, client);
+               }
+               return client;
+       }
+
+       protected class ListeningClient {
+               private final Connection connection;
+               private final Session session;
+
+               public ListeningClient(Connection connection) {
+                       super();
+                       this.connection = connection;
+                       try {
+                               session = connection.createSession(false,
+                                               Session.AUTO_ACKNOWLEDGE);
+                       } catch (JMSException e) {
+                               throw new SlcException("Cannot create session");
+                       }
+               }
+
+               public Connection getConnection() {
+                       return connection;
+               }
+
+               public Session getSession() {
+                       return session;
+               }
+
+       }
 }