2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org
.argeo
.slc
.jms
;
19 import java
.util
.ArrayList
;
20 import java
.util
.List
;
21 import java
.util
.UUID
;
23 import javax
.jms
.Connection
;
24 import javax
.jms
.ConnectionFactory
;
25 import javax
.jms
.JMSException
;
26 import javax
.jms
.Message
;
27 import javax
.jms
.Session
;
28 import javax
.jms
.Topic
;
29 import javax
.jms
.TopicSubscriber
;
31 import org
.apache
.commons
.logging
.Log
;
32 import org
.apache
.commons
.logging
.LogFactory
;
33 import org
.argeo
.slc
.SlcException
;
34 import org
.argeo
.slc
.msg
.event
.SlcEvent
;
35 import org
.argeo
.slc
.msg
.event
.SlcEventListener
;
36 import org
.argeo
.slc
.msg
.event
.SlcEventListenerDescriptor
;
37 import org
.springframework
.jms
.connection
.ConnectionFactoryUtils
;
38 import org
.springframework
.jms
.support
.JmsUtils
;
39 import org
.springframework
.jms
.support
.converter
.MessageConverter
;
41 public class JmsSlcEventListener
implements SlcEventListener
{
42 private final static Log log
= LogFactory
.getLog(JmsSlcEventListener
.class);
45 private Topic eventsDestination
;
46 private ConnectionFactory jmsConnectionFactory
;
47 private MessageConverter messageConverter
;
49 // Initialized with init() method, released with close()
50 private Connection connection
= null;
53 private String connectionClientId
= getClass() + "#"
54 + UUID
.randomUUID().toString();
55 private Boolean isClosed
= false;
57 private List
<String
> subscriberIds
= new ArrayList
<String
>();
59 // private Map<String, ListeningClient> clients = Collections
60 // .synchronizedMap(new HashMap<String, ListeningClient>());
62 public SlcEvent
listen(String subscriberId
,
63 List
<SlcEventListenerDescriptor
> descriptors
, Long timeout
) {
64 if (descriptors
.size() == 0) {
65 // No listener, just waiting
67 if (log
.isTraceEnabled())
68 log
.trace("No event listener registered, sleeping...");
69 Thread
.sleep(timeout
);
70 } catch (InterruptedException e
) {
76 synchronized (subscriberIds
) {
77 while (subscriberIds
.contains(subscriberId
)) {
79 subscriberIds
.wait(500);
82 } catch (InterruptedException e
) {
86 subscriberIds
.add(subscriberId
);
87 Session session
= null;
88 TopicSubscriber topicSubscriber
= null;
90 // ListeningClient client = (ListeningClient)
91 // getClient(clientId);
92 session
= connection
.createSession(false,
93 Session
.AUTO_ACKNOWLEDGE
);
94 topicSubscriber
= session
.createDurableSubscriber(
95 eventsDestination
, subscriberId
,
96 createSelector(descriptors
), true);
97 Message message
= topicSubscriber
.receive(timeout
);
98 obj
= messageConverter
.fromMessage(message
);
99 } catch (JMSException e
) {
100 throw new SlcException("Cannot poll events for subscriber "
103 JmsUtils
.closeMessageConsumer(topicSubscriber
);
104 JmsUtils
.closeSession(session
);
105 subscriberIds
.remove(subscriberId
);
106 subscriberIds
.notifyAll();
114 return (SlcEvent
) obj
;
118 /** Returns null if no filter */
119 protected String
createSelector(List
<SlcEventListenerDescriptor
> descriptors
) {
120 if (descriptors
.size() == 0)
121 throw new SlcException("No listeners, cannot generate JMS selector");
123 StringBuffer buf
= new StringBuffer(256);
124 Boolean first
= true;
125 for (SlcEventListenerDescriptor descriptor
: descriptors
) {
132 buf
.append(SlcEvent
.EVENT_TYPE
).append("=").append('\'').append(
133 descriptor
.getEventType()).append('\'');
134 if (descriptor
.getFilter() != null) {
136 buf
.append('(').append(descriptor
.getFilter()).append(')');
140 if (log
.isTraceEnabled())
141 log
.trace("selector created : " + buf
.toString());
142 return buf
.toString();
145 public boolean isClosed() {
150 public void setEventsDestination(Topic eventsDestination
) {
151 this.eventsDestination
= eventsDestination
;
154 public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory
) {
155 this.jmsConnectionFactory
= jmsConnectionFactory
;
158 public void setMessageConverter(MessageConverter messageConverter
) {
159 this.messageConverter
= messageConverter
;
165 connection
= jmsConnectionFactory
.createConnection();
166 connection
.setClientID(connectionClientId
);
168 } catch (JMSException e
) {
169 throw new SlcException("Could not init connection", e
);
173 public void close() {
174 ConnectionFactoryUtils
.releaseConnection(connection
,
175 jmsConnectionFactory
, true);
177 synchronized (subscriberIds
) {
178 subscriberIds
.notifyAll();
182 // public void close(String clientId) {
183 // // Session session = null;
184 // // // ListeningClient client = getClient(clientId);
185 // // // Connection connection = client.getConnection();
187 // // session = client.getSession();
188 // // session.unsubscribe(clientId);
189 // // } catch (JMSException e) {
190 // // log.warn("Could not unsubscribe client " + clientId, e);
192 // // JmsUtils.closeSession(session);
195 // // // synchronized (client) {
196 // // // clients.remove(clientId);
197 // // // client.notify();
201 // protected ListeningClient getClient(String clientId) {
202 // ListeningClient client = clients.get(clientId);
203 // if (client == null) {
205 // client = new ListeningClient(connection);
206 // clients.put(clientId, client);
211 // protected class ListeningClient {
212 // private final Connection connection;
213 // private final Session session;
215 // public ListeningClient(Connection connection) {
217 // this.connection = connection;
219 // session = connection.createSession(false,
220 // Session.AUTO_ACKNOWLEDGE);
221 // } catch (JMSException e) {
222 // throw new SlcException("Cannot create session");
226 // public Connection getConnection() {
227 // return connection;
230 // public Session getSession() {