1 package org
.argeo
.slc
.jms
;
3 import java
.util
.Collections
;
4 import java
.util
.HashMap
;
8 import javax
.jms
.Connection
;
9 import javax
.jms
.ConnectionFactory
;
10 import javax
.jms
.DeliveryMode
;
11 import javax
.jms
.Destination
;
12 import javax
.jms
.JMSException
;
13 import javax
.jms
.Message
;
14 import javax
.jms
.MessageConsumer
;
15 import javax
.jms
.Session
;
16 import javax
.jms
.Topic
;
17 import javax
.jms
.TopicSubscriber
;
19 import org
.apache
.commons
.logging
.Log
;
20 import org
.apache
.commons
.logging
.LogFactory
;
21 import org
.argeo
.slc
.SlcException
;
22 import org
.argeo
.slc
.msg
.event
.SlcEvent
;
23 import org
.argeo
.slc
.msg
.event
.SlcEventListener
;
24 import org
.argeo
.slc
.msg
.event
.SlcEventListenerDescriptor
;
25 import org
.argeo
.slc
.msg
.event
.SlcEventListenerRegister
;
26 import org
.springframework
.jms
.core
.JmsTemplate
;
27 import org
.springframework
.jms
.support
.JmsUtils
;
28 import org
.springframework
.jms
.support
.converter
.MessageConverter
;
30 public class JmsSlcEventListener
implements SlcEventListener
{
31 private final static Log log
= LogFactory
.getLog(JmsSlcEventListener
.class);
33 private Topic eventsDestination
;
34 private ConnectionFactory jmsConnectionFactory
;
35 private MessageConverter messageConverter
;
37 private Map
<String
, ListeningClient
> clients
= Collections
38 .synchronizedMap(new HashMap
<String
, ListeningClient
>());
40 public SlcEvent
listen(String clientId
,
41 List
<SlcEventListenerDescriptor
> descriptors
, Long timeout
) {
42 if (descriptors
.size() == 0) {
43 // No listeners, just waiting
45 Thread
.sleep(timeout
);
46 } catch (InterruptedException e
) {
51 String selector
= createSelector(descriptors
);
53 if (log
.isTraceEnabled())
54 log
.debug("Selector: " + selector
);
57 Session session
= null;
58 TopicSubscriber topicSubscriber
= null;
59 // MessageConsumer messageConsumer = null;
61 // Connection connection = getClient(clientId).getConnection();
62 // session = connection.createSession(false,
63 // Session.AUTO_ACKNOWLEDGE);
64 session
= getClient(clientId
).getSession();
65 topicSubscriber
= session
.createDurableSubscriber(
66 eventsDestination
, clientId
,
67 createSelector(descriptors
), true);
68 Message message
= topicSubscriber
.receive(timeout
);
69 // messageConsumer = session.createConsumer(eventsDestination,
70 // createSelector(descriptors));
71 // Message message = messageConsumer.receive(timeout);
72 obj
= messageConverter
.fromMessage(message
);
73 } catch (JMSException e
) {
74 throw new SlcException("Cannot poll events for client "
77 // JmsUtils.closeMessageConsumer(messageConsumer);
78 JmsUtils
.closeMessageConsumer(topicSubscriber
);
79 // JmsUtils.closeSession(session);
85 return (SlcEvent
) obj
;
90 * public SlcEvent listen(SlcEventListenerRegister register, Long timeout) {
92 * JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory);
93 * jmsTemplate.setMessageConverter(messageConverter);
94 * jmsTemplate.setReceiveTimeout(timeout);
96 * List<SlcEventListenerDescriptor> descriptors = register
97 * .getDescriptorsCopy();
99 * if (descriptors.size() == 0) { // No listeners, just waiting try {
100 * Thread.sleep(timeout); } catch (InterruptedException e) { // silent }
101 * return null; } else { String selector = createSelector(descriptors);
103 * if (log.isTraceEnabled()) log.debug("Selector: " + selector);
105 * Object obj = jmsTemplate.receiveSelectedAndConvert( eventsDestination,
108 * if (obj == null) return null; else return (SlcEvent) obj; } }
111 /** Returns null if no filter */
112 protected String
createSelector(List
<SlcEventListenerDescriptor
> descriptors
) {
113 if (descriptors
.size() == 0)
114 throw new SlcException("No listeners, cannot generate JMS selector");
116 StringBuffer buf
= new StringBuffer(256);
117 Boolean first
= true;
118 for (SlcEventListenerDescriptor descriptor
: descriptors
) {
125 buf
.append(SlcEvent
.EVENT_TYPE
).append("=").append('\'').append(
126 descriptor
.getEventType()).append('\'');
127 if (descriptor
.getFilter() != null) {
129 buf
.append('(').append(descriptor
.getFilter()).append(')');
133 return buf
.toString();
136 public void setEventsDestination(Topic eventsDestination
) {
137 this.eventsDestination
= eventsDestination
;
140 public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory
) {
141 this.jmsConnectionFactory
= jmsConnectionFactory
;
144 public void setMessageConverter(MessageConverter messageConverter
) {
145 this.messageConverter
= messageConverter
;
148 public ListeningClient
init(String clientId
) {
149 Connection connection
= null;
151 connection
= jmsConnectionFactory
.createConnection();
152 connection
.setClientID(clientId
);
154 ListeningClient client
= new ListeningClient(connection
);
156 } catch (JMSException e
) {
157 throw new SlcException("Could not init listening client "
163 public void close(String clientId
) {
164 Session session
= null;
165 ListeningClient client
= getClient(clientId
);
166 Connection connection
= client
.getConnection();
168 session
= client
.getSession();
169 session
.unsubscribe(clientId
);
170 } catch (JMSException e
) {
171 log
.warn("Could not unsubscribe client " + clientId
, e
);
173 JmsUtils
.closeSession(session
);
176 // JmsUtils.closeSession(client.getSession());
177 clients
.remove(clientId
);
182 } catch (JMSException e
) {
183 throw new SlcException("Could not close JMS connection for client "
186 clients
.remove(clientId
);
190 protected ListeningClient
getClient(String clientId
) {
191 ListeningClient client
= clients
.get(clientId
);
192 if (client
== null) {
194 client
= init(clientId
);
195 clients
.put(clientId
, client
);
200 protected class ListeningClient
{
201 private final Connection connection
;
202 private final Session session
;
204 public ListeningClient(Connection connection
) {
206 this.connection
= connection
;
208 session
= connection
.createSession(false,
209 Session
.AUTO_ACKNOWLEDGE
);
210 } catch (JMSException e
) {
211 throw new SlcException("Cannot create session");
215 public Connection
getConnection() {
219 public Session
getSession() {