package org.argeo.slc.jms;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import javax.jms.Connection;
private String connectionClientId = getClass() + "#"
+ UUID.randomUUID().toString();
- private Map<String, ListeningClient> clients = Collections
- .synchronizedMap(new HashMap<String, ListeningClient>());
+ private List<String> subscriberIds = new ArrayList<String>();
- public SlcEvent listen(String clientId,
+ private Boolean isClosed = false;
+
+ // private Map<String, ListeningClient> clients = Collections
+ // .synchronizedMap(new HashMap<String, ListeningClient>());
+
+ public SlcEvent listen(String subscriberId,
List<SlcEventListenerDescriptor> descriptors, Long timeout) {
if (descriptors.size() == 0) {
// No listeners, just waiting
return null;
} else {
String selector = createSelector(descriptors);
-
if (log.isTraceEnabled())
log.debug("Selector: " + selector);
Object obj = null;
- Session session = null;
- TopicSubscriber topicSubscriber = null;
- try {
- session = getClient(clientId).getSession();
- topicSubscriber = session.createDurableSubscriber(
- eventsDestination, clientId,
- createSelector(descriptors), true);
- Message message = topicSubscriber.receive(timeout);
- obj = messageConverter.fromMessage(message);
- } catch (JMSException e) {
- throw new SlcException("Cannot poll events for client "
- + clientId, e);
- } finally {
- JmsUtils.closeMessageConsumer(topicSubscriber);
+ synchronized (subscriberIds) {
+ while (subscriberIds.contains(subscriberId)) {
+ try {
+ subscriberIds.wait(500);
+ if (isClosed)
+ return null;
+ } catch (InterruptedException e) {
+ // silent
+ }
+ }
+
+ subscriberIds.add(subscriberId);
+ Session session = null;
+ TopicSubscriber topicSubscriber = null;
+ try {
+ // ListeningClient client = (ListeningClient)
+ // getClient(clientId);
+ session = connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ topicSubscriber = session.createDurableSubscriber(
+ eventsDestination, subscriberId,
+ createSelector(descriptors), true);
+ Message message = topicSubscriber.receive(timeout);
+ obj = messageConverter.fromMessage(message);
+ } catch (JMSException e) {
+ throw new SlcException("Cannot poll events for subscriber "
+ + subscriberId, e);
+ } finally {
+ JmsUtils.closeMessageConsumer(topicSubscriber);
+ JmsUtils.closeSession(session);
+ subscriberIds.remove(subscriberId);
+ subscriberIds.notifyAll();
+ }
+
}
if (obj == null)
public void close() {
ConnectionFactoryUtils.releaseConnection(connection,
jmsConnectionFactory, true);
- }
-
- public void close(String clientId) {
- Session session = null;
- ListeningClient client = getClient(clientId);
- // Connection connection = client.getConnection();
- try {
- session = client.getSession();
- session.unsubscribe(clientId);
- } catch (JMSException e) {
- log.warn("Could not unsubscribe client " + clientId, e);
- } finally {
- JmsUtils.closeSession(session);
+ isClosed = true;
+ synchronized (subscriberIds) {
+ subscriberIds.notifyAll();
}
-
- // JmsUtils.closeSession(client.getSession());
- clients.remove(clientId);
-
- // try {
- // connection.stop();
- // connection.close();
- // } catch (JMSException e) {
- // throw new SlcException("Could not close JMS connection for client "
- // + clientId, e);
- // } finally {
- // clients.remove(clientId);
- // }
}
- protected ListeningClient getClient(String clientId) {
- ListeningClient client = clients.get(clientId);
- if (client == null) {
- // Lazy init
- client = new ListeningClient(connection);
- clients.put(clientId, client);
- }
- return client;
+ public boolean isClosed() {
+ return isClosed;
}
- protected class ListeningClient {
- private final Connection connection;
- private final Session session;
-
- public ListeningClient(Connection connection) {
- super();
- this.connection = connection;
- try {
- session = connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- } catch (JMSException e) {
- throw new SlcException("Cannot create session");
- }
- }
-
- public Connection getConnection() {
- return connection;
- }
-
- public Session getSession() {
- return session;
- }
-
- }
+ // public void close(String clientId) {
+ // // Session session = null;
+ // // // ListeningClient client = getClient(clientId);
+ // // // Connection connection = client.getConnection();
+ // // try {
+ // // session = client.getSession();
+ // // session.unsubscribe(clientId);
+ // // } catch (JMSException e) {
+ // // log.warn("Could not unsubscribe client " + clientId, e);
+ // // } finally {
+ // // JmsUtils.closeSession(session);
+ // // }
+ // //
+ // // // synchronized (client) {
+ // // // clients.remove(clientId);
+ // // // client.notify();
+ // // // }
+ // }
+
+ // protected ListeningClient getClient(String clientId) {
+ // ListeningClient client = clients.get(clientId);
+ // if (client == null) {
+ // // Lazy init
+ // client = new ListeningClient(connection);
+ // clients.put(clientId, client);
+ // }
+ // return client;
+ // }
+
+ // protected class ListeningClient {
+ // private final Connection connection;
+ // private final Session session;
+ //
+ // public ListeningClient(Connection connection) {
+ // super();
+ // this.connection = connection;
+ // try {
+ // session = connection.createSession(false,
+ // Session.AUTO_ACKNOWLEDGE);
+ // } catch (JMSException e) {
+ // throw new SlcException("Cannot create session");
+ // }
+ // }
+ //
+ // public Connection getConnection() {
+ // return connection;
+ // }
+ //
+ // public Session getSession() {
+ // return session;
+ // }
+ //
+ // }
}