X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=runtime%2Forg.argeo.slc.support.activemq%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fjms%2FJmsSlcEventListener.java;h=07045edfa2c39047f3d0a81ef5c459a60469d28e;hb=1fdb1b4e7b1d2b0cabb6483238301b857a6392fa;hp=a05a367b4886f83025e48cc4f2b900910be5835c;hpb=87efa1cdb79eeaf3f203cc9bf4f3d9f8d0a299f8;p=gpl%2Fargeo-slc.git diff --git a/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java b/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java index a05a367b4..07045edfa 100644 --- a/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java +++ b/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java @@ -1,9 +1,32 @@ +/* + * Copyright (C) 2010 Mathieu Baudier + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.argeo.slc.jms; +import java.util.ArrayList; import java.util.List; +import java.util.UUID; +import javax.jms.Connection; import javax.jms.ConnectionFactory; -import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -11,28 +34,35 @@ import org.argeo.slc.SlcException; import org.argeo.slc.msg.event.SlcEvent; import org.argeo.slc.msg.event.SlcEventListener; import org.argeo.slc.msg.event.SlcEventListenerDescriptor; -import org.argeo.slc.msg.event.SlcEventListenerRegister; -import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.connection.ConnectionFactoryUtils; +import org.springframework.jms.support.JmsUtils; import org.springframework.jms.support.converter.MessageConverter; public class JmsSlcEventListener implements SlcEventListener { private final static Log log = LogFactory.getLog(JmsSlcEventListener.class); - private Destination eventsDestination; + private Topic eventsDestination; private ConnectionFactory jmsConnectionFactory; private MessageConverter messageConverter; - public SlcEvent listen(SlcEventListenerRegister register, Long timeout) { - JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory); - jmsTemplate.setMessageConverter(messageConverter); - jmsTemplate.setReceiveTimeout(timeout); + private Connection connection = null; + private String connectionClientId = getClass() + "#" + + UUID.randomUUID().toString(); + + private List subscriberIds = new ArrayList(); - List descriptors = register - .getDescriptorsCopy(); + private Boolean isClosed = false; + // private Map clients = Collections + // .synchronizedMap(new HashMap()); + + public SlcEvent listen(String subscriberId, + List descriptors, Long timeout) { if (descriptors.size() == 0) { // No listeners, just waiting try { + if(log.isTraceEnabled()) + log.trace("No event listener registered, sleeping..."); Thread.sleep(timeout); } catch (InterruptedException e) { // silent @@ -40,12 +70,45 @@ public class JmsSlcEventListener implements SlcEventListener { return null; } else { String selector = createSelector(descriptors); - if (log.isTraceEnabled()) log.debug("Selector: " + selector); - Object obj = jmsTemplate.receiveSelectedAndConvert( - eventsDestination, selector); + Object obj = null; + synchronized (subscriberIds) { + while (subscriberIds.contains(subscriberId)) { + try { + subscriberIds.wait(500); + if (isClosed) + return null; + } catch (InterruptedException e) { + // silent + } + } + + subscriberIds.add(subscriberId); + Session session = null; + TopicSubscriber topicSubscriber = null; + try { + // ListeningClient client = (ListeningClient) + // getClient(clientId); + session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + topicSubscriber = session.createDurableSubscriber( + eventsDestination, subscriberId, + createSelector(descriptors), true); + Message message = topicSubscriber.receive(timeout); + obj = messageConverter.fromMessage(message); + } catch (JMSException e) { + throw new SlcException("Cannot poll events for subscriber " + + subscriberId, e); + } finally { + JmsUtils.closeMessageConsumer(topicSubscriber); + JmsUtils.closeSession(session); + subscriberIds.remove(subscriberId); + subscriberIds.notifyAll(); + } + + } if (obj == null) return null; @@ -79,7 +142,7 @@ public class JmsSlcEventListener implements SlcEventListener { return buf.toString(); } - public void setEventsDestination(Destination eventsDestination) { + public void setEventsDestination(Topic eventsDestination) { this.eventsDestination = eventsDestination; } @@ -91,4 +154,80 @@ public class JmsSlcEventListener implements SlcEventListener { this.messageConverter = messageConverter; } + public void init() { + try { + connection = jmsConnectionFactory.createConnection(); + connection.setClientID(connectionClientId); + connection.start(); + } catch (JMSException e) { + throw new SlcException("Could not init connection", e); + } + } + + public void close() { + ConnectionFactoryUtils.releaseConnection(connection, + jmsConnectionFactory, true); + isClosed = true; + synchronized (subscriberIds) { + subscriberIds.notifyAll(); + } + } + + public boolean isClosed() { + return isClosed; + } + + // public void close(String clientId) { + // // Session session = null; + // // // ListeningClient client = getClient(clientId); + // // // Connection connection = client.getConnection(); + // // try { + // // session = client.getSession(); + // // session.unsubscribe(clientId); + // // } catch (JMSException e) { + // // log.warn("Could not unsubscribe client " + clientId, e); + // // } finally { + // // JmsUtils.closeSession(session); + // // } + // // + // // // synchronized (client) { + // // // clients.remove(clientId); + // // // client.notify(); + // // // } + // } + + // protected ListeningClient getClient(String clientId) { + // ListeningClient client = clients.get(clientId); + // if (client == null) { + // // Lazy init + // client = new ListeningClient(connection); + // clients.put(clientId, client); + // } + // return client; + // } + + // protected class ListeningClient { + // private final Connection connection; + // private final Session session; + // + // public ListeningClient(Connection connection) { + // super(); + // this.connection = connection; + // try { + // session = connection.createSession(false, + // Session.AUTO_ACKNOWLEDGE); + // } catch (JMSException e) { + // throw new SlcException("Cannot create session"); + // } + // } + // + // public Connection getConnection() { + // return connection; + // } + // + // public Session getSession() { + // return session; + // } + // + // } }