1 package org
.argeo
.slc
.jms
;
3 import java
.util
.ArrayList
;
7 import javax
.jms
.Connection
;
8 import javax
.jms
.ConnectionFactory
;
9 import javax
.jms
.JMSException
;
10 import javax
.jms
.Message
;
11 import javax
.jms
.Session
;
12 import javax
.jms
.Topic
;
13 import javax
.jms
.TopicSubscriber
;
15 import org
.apache
.commons
.logging
.Log
;
16 import org
.apache
.commons
.logging
.LogFactory
;
17 import org
.argeo
.slc
.SlcException
;
18 import org
.argeo
.slc
.msg
.event
.SlcEvent
;
19 import org
.argeo
.slc
.msg
.event
.SlcEventListener
;
20 import org
.argeo
.slc
.msg
.event
.SlcEventListenerDescriptor
;
21 import org
.springframework
.jms
.connection
.ConnectionFactoryUtils
;
22 import org
.springframework
.jms
.support
.JmsUtils
;
23 import org
.springframework
.jms
.support
.converter
.MessageConverter
;
25 public class JmsSlcEventListener
implements SlcEventListener
{
26 private final static Log log
= LogFactory
.getLog(JmsSlcEventListener
.class);
28 private Topic eventsDestination
;
29 private ConnectionFactory jmsConnectionFactory
;
30 private MessageConverter messageConverter
;
32 private Connection connection
= null;
33 private String connectionClientId
= getClass() + "#"
34 + UUID
.randomUUID().toString();
36 private List
<String
> subscriberIds
= new ArrayList
<String
>();
38 private Boolean isClosed
= false;
40 // private Map<String, ListeningClient> clients = Collections
41 // .synchronizedMap(new HashMap<String, ListeningClient>());
43 public SlcEvent
listen(String subscriberId
,
44 List
<SlcEventListenerDescriptor
> descriptors
, Long timeout
) {
45 if (descriptors
.size() == 0) {
46 // No listeners, just waiting
48 Thread
.sleep(timeout
);
49 } catch (InterruptedException e
) {
54 String selector
= createSelector(descriptors
);
55 if (log
.isTraceEnabled())
56 log
.debug("Selector: " + selector
);
59 synchronized (subscriberIds
) {
60 while (subscriberIds
.contains(subscriberId
)) {
62 subscriberIds
.wait(500);
65 } catch (InterruptedException e
) {
70 subscriberIds
.add(subscriberId
);
71 Session session
= null;
72 TopicSubscriber topicSubscriber
= null;
74 // ListeningClient client = (ListeningClient)
75 // getClient(clientId);
76 session
= connection
.createSession(false,
77 Session
.AUTO_ACKNOWLEDGE
);
78 topicSubscriber
= session
.createDurableSubscriber(
79 eventsDestination
, subscriberId
,
80 createSelector(descriptors
), true);
81 Message message
= topicSubscriber
.receive(timeout
);
82 obj
= messageConverter
.fromMessage(message
);
83 } catch (JMSException e
) {
84 throw new SlcException("Cannot poll events for subscriber "
87 JmsUtils
.closeMessageConsumer(topicSubscriber
);
88 JmsUtils
.closeSession(session
);
89 subscriberIds
.remove(subscriberId
);
90 subscriberIds
.notifyAll();
98 return (SlcEvent
) obj
;
102 /** Returns null if no filter */
103 protected String
createSelector(List
<SlcEventListenerDescriptor
> descriptors
) {
104 if (descriptors
.size() == 0)
105 throw new SlcException("No listeners, cannot generate JMS selector");
107 StringBuffer buf
= new StringBuffer(256);
108 Boolean first
= true;
109 for (SlcEventListenerDescriptor descriptor
: descriptors
) {
116 buf
.append(SlcEvent
.EVENT_TYPE
).append("=").append('\'').append(
117 descriptor
.getEventType()).append('\'');
118 if (descriptor
.getFilter() != null) {
120 buf
.append('(').append(descriptor
.getFilter()).append(')');
124 return buf
.toString();
127 public void setEventsDestination(Topic eventsDestination
) {
128 this.eventsDestination
= eventsDestination
;
131 public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory
) {
132 this.jmsConnectionFactory
= jmsConnectionFactory
;
135 public void setMessageConverter(MessageConverter messageConverter
) {
136 this.messageConverter
= messageConverter
;
141 connection
= jmsConnectionFactory
.createConnection();
142 connection
.setClientID(connectionClientId
);
144 } catch (JMSException e
) {
145 throw new SlcException("Could not init connection", e
);
149 public void close() {
150 ConnectionFactoryUtils
.releaseConnection(connection
,
151 jmsConnectionFactory
, true);
153 synchronized (subscriberIds
) {
154 subscriberIds
.notifyAll();
158 public boolean isClosed() {
162 // public void close(String clientId) {
163 // // Session session = null;
164 // // // ListeningClient client = getClient(clientId);
165 // // // Connection connection = client.getConnection();
167 // // session = client.getSession();
168 // // session.unsubscribe(clientId);
169 // // } catch (JMSException e) {
170 // // log.warn("Could not unsubscribe client " + clientId, e);
172 // // JmsUtils.closeSession(session);
175 // // // synchronized (client) {
176 // // // clients.remove(clientId);
177 // // // client.notify();
181 // protected ListeningClient getClient(String clientId) {
182 // ListeningClient client = clients.get(clientId);
183 // if (client == null) {
185 // client = new ListeningClient(connection);
186 // clients.put(clientId, client);
191 // protected class ListeningClient {
192 // private final Connection connection;
193 // private final Session session;
195 // public ListeningClient(Connection connection) {
197 // this.connection = connection;
199 // session = connection.createSession(false,
200 // Session.AUTO_ACKNOWLEDGE);
201 // } catch (JMSException e) {
202 // throw new SlcException("Cannot create session");
206 // public Connection getConnection() {
207 // return connection;
210 // public Session getSession() {