1 package org
.argeo
.slc
.jms
;
3 import java
.util
.ArrayList
;
7 import javax
.jms
.Connection
;
8 import javax
.jms
.ConnectionFactory
;
9 import javax
.jms
.JMSException
;
10 import javax
.jms
.Message
;
11 import javax
.jms
.Session
;
12 import javax
.jms
.Topic
;
13 import javax
.jms
.TopicSubscriber
;
15 import org
.apache
.commons
.logging
.Log
;
16 import org
.apache
.commons
.logging
.LogFactory
;
17 import org
.argeo
.slc
.SlcException
;
18 import org
.argeo
.slc
.msg
.event
.SlcEvent
;
19 import org
.argeo
.slc
.msg
.event
.SlcEventListener
;
20 import org
.argeo
.slc
.msg
.event
.SlcEventListenerDescriptor
;
21 import org
.springframework
.jms
.connection
.ConnectionFactoryUtils
;
22 import org
.springframework
.jms
.support
.JmsUtils
;
23 import org
.springframework
.jms
.support
.converter
.MessageConverter
;
25 public class JmsSlcEventListener
implements SlcEventListener
{
26 private final static Log log
= LogFactory
.getLog(JmsSlcEventListener
.class);
28 private Topic eventsDestination
;
29 private ConnectionFactory jmsConnectionFactory
;
30 private MessageConverter messageConverter
;
32 private Connection connection
= null;
33 private String connectionClientId
= getClass() + "#"
34 + UUID
.randomUUID().toString();
36 private List
<String
> subscriberIds
= new ArrayList
<String
>();
38 private Boolean isClosed
= false;
40 // private Map<String, ListeningClient> clients = Collections
41 // .synchronizedMap(new HashMap<String, ListeningClient>());
43 public SlcEvent
listen(String subscriberId
,
44 List
<SlcEventListenerDescriptor
> descriptors
, Long timeout
) {
45 if (descriptors
.size() == 0) {
46 // No listeners, just waiting
48 if(log
.isTraceEnabled())
49 log
.trace("No event listener registered, sleeping...");
50 Thread
.sleep(timeout
);
51 } catch (InterruptedException e
) {
56 String selector
= createSelector(descriptors
);
57 if (log
.isTraceEnabled())
58 log
.debug("Selector: " + selector
);
61 synchronized (subscriberIds
) {
62 while (subscriberIds
.contains(subscriberId
)) {
64 subscriberIds
.wait(500);
67 } catch (InterruptedException e
) {
72 subscriberIds
.add(subscriberId
);
73 Session session
= null;
74 TopicSubscriber topicSubscriber
= null;
76 // ListeningClient client = (ListeningClient)
77 // getClient(clientId);
78 session
= connection
.createSession(false,
79 Session
.AUTO_ACKNOWLEDGE
);
80 topicSubscriber
= session
.createDurableSubscriber(
81 eventsDestination
, subscriberId
,
82 createSelector(descriptors
), true);
83 Message message
= topicSubscriber
.receive(timeout
);
84 obj
= messageConverter
.fromMessage(message
);
85 } catch (JMSException e
) {
86 throw new SlcException("Cannot poll events for subscriber "
89 JmsUtils
.closeMessageConsumer(topicSubscriber
);
90 JmsUtils
.closeSession(session
);
91 subscriberIds
.remove(subscriberId
);
92 subscriberIds
.notifyAll();
100 return (SlcEvent
) obj
;
104 /** Returns null if no filter */
105 protected String
createSelector(List
<SlcEventListenerDescriptor
> descriptors
) {
106 if (descriptors
.size() == 0)
107 throw new SlcException("No listeners, cannot generate JMS selector");
109 StringBuffer buf
= new StringBuffer(256);
110 Boolean first
= true;
111 for (SlcEventListenerDescriptor descriptor
: descriptors
) {
118 buf
.append(SlcEvent
.EVENT_TYPE
).append("=").append('\'').append(
119 descriptor
.getEventType()).append('\'');
120 if (descriptor
.getFilter() != null) {
122 buf
.append('(').append(descriptor
.getFilter()).append(')');
126 return buf
.toString();
129 public void setEventsDestination(Topic eventsDestination
) {
130 this.eventsDestination
= eventsDestination
;
133 public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory
) {
134 this.jmsConnectionFactory
= jmsConnectionFactory
;
137 public void setMessageConverter(MessageConverter messageConverter
) {
138 this.messageConverter
= messageConverter
;
143 connection
= jmsConnectionFactory
.createConnection();
144 connection
.setClientID(connectionClientId
);
146 } catch (JMSException e
) {
147 throw new SlcException("Could not init connection", e
);
151 public void close() {
152 ConnectionFactoryUtils
.releaseConnection(connection
,
153 jmsConnectionFactory
, true);
155 synchronized (subscriberIds
) {
156 subscriberIds
.notifyAll();
160 public boolean isClosed() {
164 // public void close(String clientId) {
165 // // Session session = null;
166 // // // ListeningClient client = getClient(clientId);
167 // // // Connection connection = client.getConnection();
169 // // session = client.getSession();
170 // // session.unsubscribe(clientId);
171 // // } catch (JMSException e) {
172 // // log.warn("Could not unsubscribe client " + clientId, e);
174 // // JmsUtils.closeSession(session);
177 // // // synchronized (client) {
178 // // // clients.remove(clientId);
179 // // // client.notify();
183 // protected ListeningClient getClient(String clientId) {
184 // ListeningClient client = clients.get(clientId);
185 // if (client == null) {
187 // client = new ListeningClient(connection);
188 // clients.put(clientId, client);
193 // protected class ListeningClient {
194 // private final Connection connection;
195 // private final Session session;
197 // public ListeningClient(Connection connection) {
199 // this.connection = connection;
201 // session = connection.createSession(false,
202 // Session.AUTO_ACKNOWLEDGE);
203 // } catch (JMSException e) {
204 // throw new SlcException("Cannot create session");
208 // public Connection getConnection() {
209 // return connection;
212 // public Session getSession() {