X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=runtime%2Forg.argeo.slc.support.activemq%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fjms%2FJmsSlcEventListener.java;h=07045edfa2c39047f3d0a81ef5c459a60469d28e;hb=1fdb1b4e7b1d2b0cabb6483238301b857a6392fa;hp=0c702ab620e14fecd906f6978034c5ab95761216;hpb=304125ea23a3570c78149816d76951bbb258707d;p=gpl%2Fargeo-slc.git diff --git a/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java b/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java index 0c702ab62..07045edfa 100644 --- a/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java +++ b/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java @@ -1,17 +1,29 @@ +/* + * Copyright (C) 2010 Mathieu Baudier + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.argeo.slc.jms; -import java.util.Collections; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.UUID; import javax.jms.Connection; import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSubscriber; @@ -22,8 +34,7 @@ import org.argeo.slc.SlcException; import org.argeo.slc.msg.event.SlcEvent; import org.argeo.slc.msg.event.SlcEventListener; import org.argeo.slc.msg.event.SlcEventListenerDescriptor; -import org.argeo.slc.msg.event.SlcEventListenerRegister; -import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.connection.ConnectionFactoryUtils; import org.springframework.jms.support.JmsUtils; import org.springframework.jms.support.converter.MessageConverter; @@ -34,14 +45,24 @@ public class JmsSlcEventListener implements SlcEventListener { private ConnectionFactory jmsConnectionFactory; private MessageConverter messageConverter; - private Map clients = Collections - .synchronizedMap(new HashMap()); + private Connection connection = null; + private String connectionClientId = getClass() + "#" + + UUID.randomUUID().toString(); + + private List subscriberIds = new ArrayList(); + + private Boolean isClosed = false; + + // private Map clients = Collections + // .synchronizedMap(new HashMap()); - public SlcEvent listen(String clientId, + public SlcEvent listen(String subscriberId, List descriptors, Long timeout) { if (descriptors.size() == 0) { // No listeners, just waiting try { + if(log.isTraceEnabled()) + log.trace("No event listener registered, sleeping..."); Thread.sleep(timeout); } catch (InterruptedException e) { // silent @@ -49,34 +70,44 @@ public class JmsSlcEventListener implements SlcEventListener { return null; } else { String selector = createSelector(descriptors); - if (log.isTraceEnabled()) log.debug("Selector: " + selector); Object obj = null; - Session session = null; - TopicSubscriber topicSubscriber = null; - // MessageConsumer messageConsumer = null; - try { - // Connection connection = getClient(clientId).getConnection(); - // session = connection.createSession(false, - // Session.AUTO_ACKNOWLEDGE); - session = getClient(clientId).getSession(); - topicSubscriber = session.createDurableSubscriber( - eventsDestination, clientId, - createSelector(descriptors), true); - Message message = topicSubscriber.receive(timeout); - // messageConsumer = session.createConsumer(eventsDestination, - // createSelector(descriptors)); - // Message message = messageConsumer.receive(timeout); - obj = messageConverter.fromMessage(message); - } catch (JMSException e) { - throw new SlcException("Cannot poll events for client " - + clientId, e); - } finally { - // JmsUtils.closeMessageConsumer(messageConsumer); - JmsUtils.closeMessageConsumer(topicSubscriber); - // JmsUtils.closeSession(session); + synchronized (subscriberIds) { + while (subscriberIds.contains(subscriberId)) { + try { + subscriberIds.wait(500); + if (isClosed) + return null; + } catch (InterruptedException e) { + // silent + } + } + + subscriberIds.add(subscriberId); + Session session = null; + TopicSubscriber topicSubscriber = null; + try { + // ListeningClient client = (ListeningClient) + // getClient(clientId); + session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + topicSubscriber = session.createDurableSubscriber( + eventsDestination, subscriberId, + createSelector(descriptors), true); + Message message = topicSubscriber.receive(timeout); + obj = messageConverter.fromMessage(message); + } catch (JMSException e) { + throw new SlcException("Cannot poll events for subscriber " + + subscriberId, e); + } finally { + JmsUtils.closeMessageConsumer(topicSubscriber); + JmsUtils.closeSession(session); + subscriberIds.remove(subscriberId); + subscriberIds.notifyAll(); + } + } if (obj == null) @@ -86,28 +117,6 @@ public class JmsSlcEventListener implements SlcEventListener { } } - /* - * public SlcEvent listen(SlcEventListenerRegister register, Long timeout) { - * - * JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory); - * jmsTemplate.setMessageConverter(messageConverter); - * jmsTemplate.setReceiveTimeout(timeout); - * - * List descriptors = register - * .getDescriptorsCopy(); - * - * if (descriptors.size() == 0) { // No listeners, just waiting try { - * Thread.sleep(timeout); } catch (InterruptedException e) { // silent } - * return null; } else { String selector = createSelector(descriptors); - * - * if (log.isTraceEnabled()) log.debug("Selector: " + selector); - * - * Object obj = jmsTemplate.receiveSelectedAndConvert( eventsDestination, - * selector); - * - * if (obj == null) return null; else return (SlcEvent) obj; } } - */ - /** Returns null if no filter */ protected String createSelector(List descriptors) { if (descriptors.size() == 0) @@ -145,80 +154,80 @@ public class JmsSlcEventListener implements SlcEventListener { this.messageConverter = messageConverter; } - public ListeningClient init(String clientId) { - Connection connection = null; + public void init() { try { connection = jmsConnectionFactory.createConnection(); - connection.setClientID(clientId); + connection.setClientID(connectionClientId); connection.start(); - ListeningClient client = new ListeningClient(connection); - return client; } catch (JMSException e) { - throw new SlcException("Could not init listening client " - + clientId, e); - } finally { + throw new SlcException("Could not init connection", e); } } - public void close(String clientId) { - Session session = null; - ListeningClient client = getClient(clientId); - Connection connection = client.getConnection(); - try { - session = client.getSession(); - session.unsubscribe(clientId); - } catch (JMSException e) { - log.warn("Could not unsubscribe client " + clientId, e); - } finally { - JmsUtils.closeSession(session); - } - - // JmsUtils.closeSession(client.getSession()); - clients.remove(clientId); - - try { - connection.stop(); - connection.close(); - } catch (JMSException e) { - throw new SlcException("Could not close JMS connection for client " - + clientId, e); - } finally { - clients.remove(clientId); + public void close() { + ConnectionFactoryUtils.releaseConnection(connection, + jmsConnectionFactory, true); + isClosed = true; + synchronized (subscriberIds) { + subscriberIds.notifyAll(); } } - protected ListeningClient getClient(String clientId) { - ListeningClient client = clients.get(clientId); - if (client == null) { - // Lazy init - client = init(clientId); - clients.put(clientId, client); - } - return client; + public boolean isClosed() { + return isClosed; } - protected class ListeningClient { - private final Connection connection; - private final Session session; - - public ListeningClient(Connection connection) { - super(); - this.connection = connection; - try { - session = connection.createSession(false, - Session.AUTO_ACKNOWLEDGE); - } catch (JMSException e) { - throw new SlcException("Cannot create session"); - } - } - - public Connection getConnection() { - return connection; - } - - public Session getSession() { - return session; - } - - } + // public void close(String clientId) { + // // Session session = null; + // // // ListeningClient client = getClient(clientId); + // // // Connection connection = client.getConnection(); + // // try { + // // session = client.getSession(); + // // session.unsubscribe(clientId); + // // } catch (JMSException e) { + // // log.warn("Could not unsubscribe client " + clientId, e); + // // } finally { + // // JmsUtils.closeSession(session); + // // } + // // + // // // synchronized (client) { + // // // clients.remove(clientId); + // // // client.notify(); + // // // } + // } + + // protected ListeningClient getClient(String clientId) { + // ListeningClient client = clients.get(clientId); + // if (client == null) { + // // Lazy init + // client = new ListeningClient(connection); + // clients.put(clientId, client); + // } + // return client; + // } + + // protected class ListeningClient { + // private final Connection connection; + // private final Session session; + // + // public ListeningClient(Connection connection) { + // super(); + // this.connection = connection; + // try { + // session = connection.createSession(false, + // Session.AUTO_ACKNOWLEDGE); + // } catch (JMSException e) { + // throw new SlcException("Cannot create session"); + // } + // } + // + // public Connection getConnection() { + // return connection; + // } + // + // public Session getSession() { + // return session; + // } + // + // } }