2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org
.argeo
.slc
.jms
;
19 import java
.util
.ArrayList
;
20 import java
.util
.List
;
21 import java
.util
.UUID
;
23 import javax
.jms
.Connection
;
24 import javax
.jms
.ConnectionFactory
;
25 import javax
.jms
.JMSException
;
26 import javax
.jms
.Message
;
27 import javax
.jms
.Session
;
28 import javax
.jms
.Topic
;
29 import javax
.jms
.TopicSubscriber
;
31 import org
.apache
.commons
.logging
.Log
;
32 import org
.apache
.commons
.logging
.LogFactory
;
33 import org
.argeo
.slc
.SlcException
;
34 import org
.argeo
.slc
.msg
.event
.SlcEvent
;
35 import org
.argeo
.slc
.msg
.event
.SlcEventListener
;
36 import org
.argeo
.slc
.msg
.event
.SlcEventListenerDescriptor
;
37 import org
.springframework
.jms
.connection
.ConnectionFactoryUtils
;
38 import org
.springframework
.jms
.support
.JmsUtils
;
39 import org
.springframework
.jms
.support
.converter
.MessageConverter
;
41 public class JmsSlcEventListener
implements SlcEventListener
{
42 private final static Log log
= LogFactory
.getLog(JmsSlcEventListener
.class);
44 private Topic eventsDestination
;
45 private ConnectionFactory jmsConnectionFactory
;
46 private MessageConverter messageConverter
;
48 private Connection connection
= null;
49 private String connectionClientId
= getClass() + "#"
50 + UUID
.randomUUID().toString();
52 private List
<String
> subscriberIds
= new ArrayList
<String
>();
54 private Boolean isClosed
= false;
56 // private Map<String, ListeningClient> clients = Collections
57 // .synchronizedMap(new HashMap<String, ListeningClient>());
59 public SlcEvent
listen(String subscriberId
,
60 List
<SlcEventListenerDescriptor
> descriptors
, Long timeout
) {
61 if (descriptors
.size() == 0) {
62 // No listeners, just waiting
64 if(log
.isTraceEnabled())
65 log
.trace("No event listener registered, sleeping...");
66 Thread
.sleep(timeout
);
67 } catch (InterruptedException e
) {
72 String selector
= createSelector(descriptors
);
73 if (log
.isTraceEnabled())
74 log
.debug("Selector: " + selector
);
77 synchronized (subscriberIds
) {
78 while (subscriberIds
.contains(subscriberId
)) {
80 subscriberIds
.wait(500);
83 } catch (InterruptedException e
) {
88 subscriberIds
.add(subscriberId
);
89 Session session
= null;
90 TopicSubscriber topicSubscriber
= null;
92 // ListeningClient client = (ListeningClient)
93 // getClient(clientId);
94 session
= connection
.createSession(false,
95 Session
.AUTO_ACKNOWLEDGE
);
96 topicSubscriber
= session
.createDurableSubscriber(
97 eventsDestination
, subscriberId
,
98 createSelector(descriptors
), true);
99 Message message
= topicSubscriber
.receive(timeout
);
100 obj
= messageConverter
.fromMessage(message
);
101 } catch (JMSException e
) {
102 throw new SlcException("Cannot poll events for subscriber "
105 JmsUtils
.closeMessageConsumer(topicSubscriber
);
106 JmsUtils
.closeSession(session
);
107 subscriberIds
.remove(subscriberId
);
108 subscriberIds
.notifyAll();
116 return (SlcEvent
) obj
;
120 /** Returns null if no filter */
121 protected String
createSelector(List
<SlcEventListenerDescriptor
> descriptors
) {
122 if (descriptors
.size() == 0)
123 throw new SlcException("No listeners, cannot generate JMS selector");
125 StringBuffer buf
= new StringBuffer(256);
126 Boolean first
= true;
127 for (SlcEventListenerDescriptor descriptor
: descriptors
) {
134 buf
.append(SlcEvent
.EVENT_TYPE
).append("=").append('\'').append(
135 descriptor
.getEventType()).append('\'');
136 if (descriptor
.getFilter() != null) {
138 buf
.append('(').append(descriptor
.getFilter()).append(')');
142 return buf
.toString();
145 public void setEventsDestination(Topic eventsDestination
) {
146 this.eventsDestination
= eventsDestination
;
149 public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory
) {
150 this.jmsConnectionFactory
= jmsConnectionFactory
;
153 public void setMessageConverter(MessageConverter messageConverter
) {
154 this.messageConverter
= messageConverter
;
159 connection
= jmsConnectionFactory
.createConnection();
160 connection
.setClientID(connectionClientId
);
162 } catch (JMSException e
) {
163 throw new SlcException("Could not init connection", e
);
167 public void close() {
168 ConnectionFactoryUtils
.releaseConnection(connection
,
169 jmsConnectionFactory
, true);
171 synchronized (subscriberIds
) {
172 subscriberIds
.notifyAll();
176 public boolean isClosed() {
180 // public void close(String clientId) {
181 // // Session session = null;
182 // // // ListeningClient client = getClient(clientId);
183 // // // Connection connection = client.getConnection();
185 // // session = client.getSession();
186 // // session.unsubscribe(clientId);
187 // // } catch (JMSException e) {
188 // // log.warn("Could not unsubscribe client " + clientId, e);
190 // // JmsUtils.closeSession(session);
193 // // // synchronized (client) {
194 // // // clients.remove(clientId);
195 // // // client.notify();
199 // protected ListeningClient getClient(String clientId) {
200 // ListeningClient client = clients.get(clientId);
201 // if (client == null) {
203 // client = new ListeningClient(connection);
204 // clients.put(clientId, client);
209 // protected class ListeningClient {
210 // private final Connection connection;
211 // private final Session session;
213 // public ListeningClient(Connection connection) {
215 // this.connection = connection;
217 // session = connection.createSession(false,
218 // Session.AUTO_ACKNOWLEDGE);
219 // } catch (JMSException e) {
220 // throw new SlcException("Cannot create session");
224 // public Connection getConnection() {
225 // return connection;
228 // public Session getSession() {