]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java
Stabilize attachments and events
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsSlcEventListener.java
index 5cea81d5841b88ca469052e6b5965aa22a5458b2..eb0fa7b6083429f5e656a42dcaa87942c2078889 100644 (file)
@@ -1,9 +1,7 @@
 package org.argeo.slc.jms;
 
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
 import javax.jms.Connection;
@@ -35,10 +33,14 @@ public class JmsSlcEventListener implements SlcEventListener {
        private String connectionClientId = getClass() + "#"
                        + UUID.randomUUID().toString();
 
-       private Map<String, ListeningClient> clients = Collections
-                       .synchronizedMap(new HashMap<String, ListeningClient>());
+       private List<String> subscriberIds = new ArrayList<String>();
 
-       public SlcEvent listen(String clientId,
+       private Boolean isClosed = false;
+
+       // private Map<String, ListeningClient> clients = Collections
+       // .synchronizedMap(new HashMap<String, ListeningClient>());
+
+       public SlcEvent listen(String subscriberId,
                        List<SlcEventListenerDescriptor> descriptors, Long timeout) {
                if (descriptors.size() == 0) {
                        // No listeners, just waiting
@@ -50,25 +52,44 @@ public class JmsSlcEventListener implements SlcEventListener {
                        return null;
                } else {
                        String selector = createSelector(descriptors);
-
                        if (log.isTraceEnabled())
                                log.debug("Selector: " + selector);
 
                        Object obj = null;
-                       Session session = null;
-                       TopicSubscriber topicSubscriber = null;
-                       try {
-                               session = getClient(clientId).getSession();
-                               topicSubscriber = session.createDurableSubscriber(
-                                               eventsDestination, clientId,
-                                               createSelector(descriptors), true);
-                               Message message = topicSubscriber.receive(timeout);
-                               obj = messageConverter.fromMessage(message);
-                       } catch (JMSException e) {
-                               throw new SlcException("Cannot poll events for client "
-                                               + clientId, e);
-                       } finally {
-                               JmsUtils.closeMessageConsumer(topicSubscriber);
+                       synchronized (subscriberIds) {
+                               while (subscriberIds.contains(subscriberId)) {
+                                       try {
+                                               subscriberIds.wait(500);
+                                               if (isClosed)
+                                                       return null;
+                                       } catch (InterruptedException e) {
+                                               // silent
+                                       }
+                               }
+
+                               subscriberIds.add(subscriberId);
+                               Session session = null;
+                               TopicSubscriber topicSubscriber = null;
+                               try {
+                                       // ListeningClient client = (ListeningClient)
+                                       // getClient(clientId);
+                                       session = connection.createSession(false,
+                                                       Session.AUTO_ACKNOWLEDGE);
+                                       topicSubscriber = session.createDurableSubscriber(
+                                                       eventsDestination, subscriberId,
+                                                       createSelector(descriptors), true);
+                                       Message message = topicSubscriber.receive(timeout);
+                                       obj = messageConverter.fromMessage(message);
+                               } catch (JMSException e) {
+                                       throw new SlcException("Cannot poll events for subscriber "
+                                                       + subscriberId, e);
+                               } finally {
+                                       JmsUtils.closeMessageConsumer(topicSubscriber);
+                                       JmsUtils.closeSession(session);
+                                       subscriberIds.remove(subscriberId);
+                                       subscriberIds.notifyAll();
+                               }
+
                        }
 
                        if (obj == null)
@@ -128,67 +149,67 @@ public class JmsSlcEventListener implements SlcEventListener {
        public void close() {
                ConnectionFactoryUtils.releaseConnection(connection,
                                jmsConnectionFactory, true);
-       }
-
-       public void close(String clientId) {
-               Session session = null;
-               ListeningClient client = getClient(clientId);
-               // Connection connection = client.getConnection();
-               try {
-                       session = client.getSession();
-                       session.unsubscribe(clientId);
-               } catch (JMSException e) {
-                       log.warn("Could not unsubscribe client " + clientId, e);
-               } finally {
-                       JmsUtils.closeSession(session);
+               isClosed = true;
+               synchronized (subscriberIds) {
+                       subscriberIds.notifyAll();
                }
-
-               // JmsUtils.closeSession(client.getSession());
-               clients.remove(clientId);
-
-               // try {
-               // connection.stop();
-               // connection.close();
-               // } catch (JMSException e) {
-               // throw new SlcException("Could not close JMS connection for client "
-               // + clientId, e);
-               // } finally {
-               // clients.remove(clientId);
-               // }
        }
 
-       protected ListeningClient getClient(String clientId) {
-               ListeningClient client = clients.get(clientId);
-               if (client == null) {
-                       // Lazy init
-                       client = new ListeningClient(connection);
-                       clients.put(clientId, client);
-               }
-               return client;
+       public boolean isClosed() {
+               return isClosed;
        }
 
-       protected class ListeningClient {
-               private final Connection connection;
-               private final Session session;
-
-               public ListeningClient(Connection connection) {
-                       super();
-                       this.connection = connection;
-                       try {
-                               session = connection.createSession(false,
-                                               Session.AUTO_ACKNOWLEDGE);
-                       } catch (JMSException e) {
-                               throw new SlcException("Cannot create session");
-                       }
-               }
-
-               public Connection getConnection() {
-                       return connection;
-               }
-
-               public Session getSession() {
-                       return session;
-               }
-
-       }
+       // public void close(String clientId) {
+       // // Session session = null;
+       // // // ListeningClient client = getClient(clientId);
+       // // // Connection connection = client.getConnection();
+       // // try {
+       // // session = client.getSession();
+       // // session.unsubscribe(clientId);
+       // // } catch (JMSException e) {
+       // // log.warn("Could not unsubscribe client " + clientId, e);
+       // // } finally {
+       // // JmsUtils.closeSession(session);
+       // // }
+       // //
+       // // // synchronized (client) {
+       // // // clients.remove(clientId);
+       // // // client.notify();
+       // // // }
+       // }
+
+       // protected ListeningClient getClient(String clientId) {
+       // ListeningClient client = clients.get(clientId);
+       // if (client == null) {
+       // // Lazy init
+       // client = new ListeningClient(connection);
+       // clients.put(clientId, client);
+       // }
+       // return client;
+       // }
+
+       // protected class ListeningClient {
+       // private final Connection connection;
+       // private final Session session;
+       //
+       // public ListeningClient(Connection connection) {
+       // super();
+       // this.connection = connection;
+       // try {
+       // session = connection.createSession(false,
+       // Session.AUTO_ACKNOWLEDGE);
+       // } catch (JMSException e) {
+       // throw new SlcException("Cannot create session");
+       // }
+       // }
+       //
+       // public Connection getConnection() {
+       // return connection;
+       // }
+       //
+       // public Session getSession() {
+       // return session;
+       // }
+       //
+       // }
 }