X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=runtime%2Forg.argeo.slc.support.activemq%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fjms%2FJmsSlcEventListener.java;h=eb0fa7b6083429f5e656a42dcaa87942c2078889;hb=96f732785d5236ab435f6be5da3d53ba66c9b86d;hp=5cea81d5841b88ca469052e6b5965aa22a5458b2;hpb=7a3833134109e2a119755a34ff357dea39ce40d6;p=gpl%2Fargeo-slc.git diff --git a/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java b/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java index 5cea81d58..eb0fa7b60 100644 --- a/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java +++ b/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java @@ -1,9 +1,7 @@ package org.argeo.slc.jms; -import java.util.Collections; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.UUID; import javax.jms.Connection; @@ -35,10 +33,14 @@ public class JmsSlcEventListener implements SlcEventListener { private String connectionClientId = getClass() + "#" + UUID.randomUUID().toString(); - private Map clients = Collections - .synchronizedMap(new HashMap()); + private List subscriberIds = new ArrayList(); - public SlcEvent listen(String clientId, + private Boolean isClosed = false; + + // private Map clients = Collections + // .synchronizedMap(new HashMap()); + + public SlcEvent listen(String subscriberId, List descriptors, Long timeout) { if (descriptors.size() == 0) { // No listeners, just waiting @@ -50,25 +52,44 @@ public class JmsSlcEventListener implements SlcEventListener { return null; } else { String selector = createSelector(descriptors); - if (log.isTraceEnabled()) log.debug("Selector: " + selector); Object obj = null; - Session session = null; - TopicSubscriber topicSubscriber = null; - try { - session = getClient(clientId).getSession(); - topicSubscriber = session.createDurableSubscriber( - eventsDestination, clientId, - createSelector(descriptors), true); - Message message = topicSubscriber.receive(timeout); - obj = messageConverter.fromMessage(message); - } catch (JMSException e) { - throw new SlcException("Cannot poll events for client " - + clientId, e); - } finally { - JmsUtils.closeMessageConsumer(topicSubscriber); + synchronized (subscriberIds) { + while (subscriberIds.contains(subscriberId)) { + try { + subscriberIds.wait(500); + if (isClosed) + return null; + } catch (InterruptedException e) { + // silent + } + } + + subscriberIds.add(subscriberId); + Session session = null; + TopicSubscriber topicSubscriber = null; + try { + // ListeningClient client = (ListeningClient) + // getClient(clientId); + session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + topicSubscriber = session.createDurableSubscriber( + eventsDestination, subscriberId, + createSelector(descriptors), true); + Message message = topicSubscriber.receive(timeout); + obj = messageConverter.fromMessage(message); + } catch (JMSException e) { + throw new SlcException("Cannot poll events for subscriber " + + subscriberId, e); + } finally { + JmsUtils.closeMessageConsumer(topicSubscriber); + JmsUtils.closeSession(session); + subscriberIds.remove(subscriberId); + subscriberIds.notifyAll(); + } + } if (obj == null) @@ -128,67 +149,67 @@ public class JmsSlcEventListener implements SlcEventListener { public void close() { ConnectionFactoryUtils.releaseConnection(connection, jmsConnectionFactory, true); - } - - public void close(String clientId) { - Session session = null; - ListeningClient client = getClient(clientId); - // Connection connection = client.getConnection(); - try { - session = client.getSession(); - session.unsubscribe(clientId); - } catch (JMSException e) { - log.warn("Could not unsubscribe client " + clientId, e); - } finally { - JmsUtils.closeSession(session); + isClosed = true; + synchronized (subscriberIds) { + subscriberIds.notifyAll(); } - - // JmsUtils.closeSession(client.getSession()); - clients.remove(clientId); - - // try { - // connection.stop(); - // connection.close(); - // } catch (JMSException e) { - // throw new SlcException("Could not close JMS connection for client " - // + clientId, e); - // } finally { - // clients.remove(clientId); - // } } - protected ListeningClient getClient(String clientId) { - ListeningClient client = clients.get(clientId); - if (client == null) { - // Lazy init - client = new ListeningClient(connection); - clients.put(clientId, client); - } - return client; + public boolean isClosed() { + return isClosed; } - protected class ListeningClient { - private final Connection connection; - private final Session session; - - public ListeningClient(Connection connection) { - super(); - this.connection = connection; - try { - session = connection.createSession(false, - Session.AUTO_ACKNOWLEDGE); - } catch (JMSException e) { - throw new SlcException("Cannot create session"); - } - } - - public Connection getConnection() { - return connection; - } - - public Session getSession() { - return session; - } - - } + // public void close(String clientId) { + // // Session session = null; + // // // ListeningClient client = getClient(clientId); + // // // Connection connection = client.getConnection(); + // // try { + // // session = client.getSession(); + // // session.unsubscribe(clientId); + // // } catch (JMSException e) { + // // log.warn("Could not unsubscribe client " + clientId, e); + // // } finally { + // // JmsUtils.closeSession(session); + // // } + // // + // // // synchronized (client) { + // // // clients.remove(clientId); + // // // client.notify(); + // // // } + // } + + // protected ListeningClient getClient(String clientId) { + // ListeningClient client = clients.get(clientId); + // if (client == null) { + // // Lazy init + // client = new ListeningClient(connection); + // clients.put(clientId, client); + // } + // return client; + // } + + // protected class ListeningClient { + // private final Connection connection; + // private final Session session; + // + // public ListeningClient(Connection connection) { + // super(); + // this.connection = connection; + // try { + // session = connection.createSession(false, + // Session.AUTO_ACKNOWLEDGE); + // } catch (JMSException e) { + // throw new SlcException("Cannot create session"); + // } + // } + // + // public Connection getConnection() { + // return connection; + // } + // + // public Session getSession() { + // return session; + // } + // + // } }