]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java
Add license headers
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsSlcEventListener.java
index 0c702ab620e14fecd906f6978034c5ab95761216..07045edfa2c39047f3d0a81ef5c459a60469d28e 100644 (file)
@@ -1,17 +1,29 @@
+/*
+ * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.argeo.slc.jms;
 
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
+import java.util.UUID;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageConsumer;
 import javax.jms.Session;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
@@ -22,8 +34,7 @@ import org.argeo.slc.SlcException;
 import org.argeo.slc.msg.event.SlcEvent;
 import org.argeo.slc.msg.event.SlcEventListener;
 import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
-import org.argeo.slc.msg.event.SlcEventListenerRegister;
-import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.connection.ConnectionFactoryUtils;
 import org.springframework.jms.support.JmsUtils;
 import org.springframework.jms.support.converter.MessageConverter;
 
@@ -34,14 +45,24 @@ public class JmsSlcEventListener implements SlcEventListener {
        private ConnectionFactory jmsConnectionFactory;
        private MessageConverter messageConverter;
 
-       private Map<String, ListeningClient> clients = Collections
-                       .synchronizedMap(new HashMap<String, ListeningClient>());
+       private Connection connection = null;
+       private String connectionClientId = getClass() + "#"
+                       + UUID.randomUUID().toString();
+
+       private List<String> subscriberIds = new ArrayList<String>();
+
+       private Boolean isClosed = false;
+
+       // private Map<String, ListeningClient> clients = Collections
+       // .synchronizedMap(new HashMap<String, ListeningClient>());
 
-       public SlcEvent listen(String clientId,
+       public SlcEvent listen(String subscriberId,
                        List<SlcEventListenerDescriptor> descriptors, Long timeout) {
                if (descriptors.size() == 0) {
                        // No listeners, just waiting
                        try {
+                               if(log.isTraceEnabled())
+                                       log.trace("No event listener registered, sleeping...");
                                Thread.sleep(timeout);
                        } catch (InterruptedException e) {
                                // silent
@@ -49,34 +70,44 @@ public class JmsSlcEventListener implements SlcEventListener {
                        return null;
                } else {
                        String selector = createSelector(descriptors);
-
                        if (log.isTraceEnabled())
                                log.debug("Selector: " + selector);
 
                        Object obj = null;
-                       Session session = null;
-                       TopicSubscriber topicSubscriber = null;
-                       // MessageConsumer messageConsumer = null;
-                       try {
-                               // Connection connection = getClient(clientId).getConnection();
-                               // session = connection.createSession(false,
-                               // Session.AUTO_ACKNOWLEDGE);
-                               session = getClient(clientId).getSession();
-                               topicSubscriber = session.createDurableSubscriber(
-                                               eventsDestination, clientId,
-                                               createSelector(descriptors), true);
-                               Message message = topicSubscriber.receive(timeout);
-                               // messageConsumer = session.createConsumer(eventsDestination,
-                               // createSelector(descriptors));
-                               // Message message = messageConsumer.receive(timeout);
-                               obj = messageConverter.fromMessage(message);
-                       } catch (JMSException e) {
-                               throw new SlcException("Cannot poll events for client "
-                                               + clientId, e);
-                       } finally {
-                               // JmsUtils.closeMessageConsumer(messageConsumer);
-                               JmsUtils.closeMessageConsumer(topicSubscriber);
-                               // JmsUtils.closeSession(session);
+                       synchronized (subscriberIds) {
+                               while (subscriberIds.contains(subscriberId)) {
+                                       try {
+                                               subscriberIds.wait(500);
+                                               if (isClosed)
+                                                       return null;
+                                       } catch (InterruptedException e) {
+                                               // silent
+                                       }
+                               }
+
+                               subscriberIds.add(subscriberId);
+                               Session session = null;
+                               TopicSubscriber topicSubscriber = null;
+                               try {
+                                       // ListeningClient client = (ListeningClient)
+                                       // getClient(clientId);
+                                       session = connection.createSession(false,
+                                                       Session.AUTO_ACKNOWLEDGE);
+                                       topicSubscriber = session.createDurableSubscriber(
+                                                       eventsDestination, subscriberId,
+                                                       createSelector(descriptors), true);
+                                       Message message = topicSubscriber.receive(timeout);
+                                       obj = messageConverter.fromMessage(message);
+                               } catch (JMSException e) {
+                                       throw new SlcException("Cannot poll events for subscriber "
+                                                       + subscriberId, e);
+                               } finally {
+                                       JmsUtils.closeMessageConsumer(topicSubscriber);
+                                       JmsUtils.closeSession(session);
+                                       subscriberIds.remove(subscriberId);
+                                       subscriberIds.notifyAll();
+                               }
+
                        }
 
                        if (obj == null)
@@ -86,28 +117,6 @@ public class JmsSlcEventListener implements SlcEventListener {
                }
        }
 
-       /*
-        * public SlcEvent listen(SlcEventListenerRegister register, Long timeout) {
-        * 
-        * JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory);
-        * jmsTemplate.setMessageConverter(messageConverter);
-        * jmsTemplate.setReceiveTimeout(timeout);
-        * 
-        * List<SlcEventListenerDescriptor> descriptors = register
-        * .getDescriptorsCopy();
-        * 
-        * if (descriptors.size() == 0) { // No listeners, just waiting try {
-        * Thread.sleep(timeout); } catch (InterruptedException e) { // silent }
-        * return null; } else { String selector = createSelector(descriptors);
-        * 
-        * if (log.isTraceEnabled()) log.debug("Selector: " + selector);
-        * 
-        * Object obj = jmsTemplate.receiveSelectedAndConvert( eventsDestination,
-        * selector);
-        * 
-        * if (obj == null) return null; else return (SlcEvent) obj; } }
-        */
-
        /** Returns null if no filter */
        protected String createSelector(List<SlcEventListenerDescriptor> descriptors) {
                if (descriptors.size() == 0)
@@ -145,80 +154,80 @@ public class JmsSlcEventListener implements SlcEventListener {
                this.messageConverter = messageConverter;
        }
 
-       public ListeningClient init(String clientId) {
-               Connection connection = null;
+       public void init() {
                try {
                        connection = jmsConnectionFactory.createConnection();
-                       connection.setClientID(clientId);
+                       connection.setClientID(connectionClientId);
                        connection.start();
-                       ListeningClient client = new ListeningClient(connection);
-                       return client;
                } catch (JMSException e) {
-                       throw new SlcException("Could not init listening client "
-                                       + clientId, e);
-               } finally {
+                       throw new SlcException("Could not init connection", e);
                }
        }
 
-       public void close(String clientId) {
-               Session session = null;
-               ListeningClient client = getClient(clientId);
-               Connection connection = client.getConnection();
-               try {
-                       session = client.getSession();
-                       session.unsubscribe(clientId);
-               } catch (JMSException e) {
-                       log.warn("Could not unsubscribe client " + clientId, e);
-               } finally {
-                       JmsUtils.closeSession(session);
-               }
-
-               // JmsUtils.closeSession(client.getSession());
-               clients.remove(clientId);
-
-               try {
-                       connection.stop();
-                       connection.close();
-               } catch (JMSException e) {
-                       throw new SlcException("Could not close JMS connection for client "
-                                       + clientId, e);
-               } finally {
-                       clients.remove(clientId);
+       public void close() {
+               ConnectionFactoryUtils.releaseConnection(connection,
+                               jmsConnectionFactory, true);
+               isClosed = true;
+               synchronized (subscriberIds) {
+                       subscriberIds.notifyAll();
                }
        }
 
-       protected ListeningClient getClient(String clientId) {
-               ListeningClient client = clients.get(clientId);
-               if (client == null) {
-                       // Lazy init
-                       client = init(clientId);
-                       clients.put(clientId, client);
-               }
-               return client;
+       public boolean isClosed() {
+               return isClosed;
        }
 
-       protected class ListeningClient {
-               private final Connection connection;
-               private final Session session;
-
-               public ListeningClient(Connection connection) {
-                       super();
-                       this.connection = connection;
-                       try {
-                               session = connection.createSession(false,
-                                               Session.AUTO_ACKNOWLEDGE);
-                       } catch (JMSException e) {
-                               throw new SlcException("Cannot create session");
-                       }
-               }
-
-               public Connection getConnection() {
-                       return connection;
-               }
-
-               public Session getSession() {
-                       return session;
-               }
-
-       }
+       // public void close(String clientId) {
+       // // Session session = null;
+       // // // ListeningClient client = getClient(clientId);
+       // // // Connection connection = client.getConnection();
+       // // try {
+       // // session = client.getSession();
+       // // session.unsubscribe(clientId);
+       // // } catch (JMSException e) {
+       // // log.warn("Could not unsubscribe client " + clientId, e);
+       // // } finally {
+       // // JmsUtils.closeSession(session);
+       // // }
+       // //
+       // // // synchronized (client) {
+       // // // clients.remove(clientId);
+       // // // client.notify();
+       // // // }
+       // }
+
+       // protected ListeningClient getClient(String clientId) {
+       // ListeningClient client = clients.get(clientId);
+       // if (client == null) {
+       // // Lazy init
+       // client = new ListeningClient(connection);
+       // clients.put(clientId, client);
+       // }
+       // return client;
+       // }
+
+       // protected class ListeningClient {
+       // private final Connection connection;
+       // private final Session session;
+       //
+       // public ListeningClient(Connection connection) {
+       // super();
+       // this.connection = connection;
+       // try {
+       // session = connection.createSession(false,
+       // Session.AUTO_ACKNOWLEDGE);
+       // } catch (JMSException e) {
+       // throw new SlcException("Cannot create session");
+       // }
+       // }
+       //
+       // public Connection getConnection() {
+       // return connection;
+       // }
+       //
+       // public Session getSession() {
+       // return session;
+       // }
+       //
+       // }
 }