]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java
0ce259d97373d54e297c6cb5ab841463a733b0f6
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsSlcEventListener.java
1 package org.argeo.slc.jms;
2
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.UUID;
6
7 import javax.jms.Connection;
8 import javax.jms.ConnectionFactory;
9 import javax.jms.JMSException;
10 import javax.jms.Message;
11 import javax.jms.Session;
12 import javax.jms.Topic;
13 import javax.jms.TopicSubscriber;
14
15 import org.apache.commons.logging.Log;
16 import org.apache.commons.logging.LogFactory;
17 import org.argeo.slc.SlcException;
18 import org.argeo.slc.msg.event.SlcEvent;
19 import org.argeo.slc.msg.event.SlcEventListener;
20 import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
21 import org.springframework.jms.connection.ConnectionFactoryUtils;
22 import org.springframework.jms.support.JmsUtils;
23 import org.springframework.jms.support.converter.MessageConverter;
24
25 public class JmsSlcEventListener implements SlcEventListener {
26 private final static Log log = LogFactory.getLog(JmsSlcEventListener.class);
27
28 private Topic eventsDestination;
29 private ConnectionFactory jmsConnectionFactory;
30 private MessageConverter messageConverter;
31
32 private Connection connection = null;
33 private String connectionClientId = getClass() + "#"
34 + UUID.randomUUID().toString();
35
36 private List<String> subscriberIds = new ArrayList<String>();
37
38 private Boolean isClosed = false;
39
40 // private Map<String, ListeningClient> clients = Collections
41 // .synchronizedMap(new HashMap<String, ListeningClient>());
42
43 public SlcEvent listen(String subscriberId,
44 List<SlcEventListenerDescriptor> descriptors, Long timeout) {
45 if (descriptors.size() == 0) {
46 // No listeners, just waiting
47 try {
48 if(log.isTraceEnabled())
49 log.trace("No event listener registered, sleeping...");
50 Thread.sleep(timeout);
51 } catch (InterruptedException e) {
52 // silent
53 }
54 return null;
55 } else {
56 String selector = createSelector(descriptors);
57 if (log.isTraceEnabled())
58 log.debug("Selector: " + selector);
59
60 Object obj = null;
61 synchronized (subscriberIds) {
62 while (subscriberIds.contains(subscriberId)) {
63 try {
64 subscriberIds.wait(500);
65 if (isClosed)
66 return null;
67 } catch (InterruptedException e) {
68 // silent
69 }
70 }
71
72 subscriberIds.add(subscriberId);
73 Session session = null;
74 TopicSubscriber topicSubscriber = null;
75 try {
76 // ListeningClient client = (ListeningClient)
77 // getClient(clientId);
78 session = connection.createSession(false,
79 Session.AUTO_ACKNOWLEDGE);
80 topicSubscriber = session.createDurableSubscriber(
81 eventsDestination, subscriberId,
82 createSelector(descriptors), true);
83 Message message = topicSubscriber.receive(timeout);
84 obj = messageConverter.fromMessage(message);
85 } catch (JMSException e) {
86 throw new SlcException("Cannot poll events for subscriber "
87 + subscriberId, e);
88 } finally {
89 JmsUtils.closeMessageConsumer(topicSubscriber);
90 JmsUtils.closeSession(session);
91 subscriberIds.remove(subscriberId);
92 subscriberIds.notifyAll();
93 }
94
95 }
96
97 if (obj == null)
98 return null;
99 else
100 return (SlcEvent) obj;
101 }
102 }
103
104 /** Returns null if no filter */
105 protected String createSelector(List<SlcEventListenerDescriptor> descriptors) {
106 if (descriptors.size() == 0)
107 throw new SlcException("No listeners, cannot generate JMS selector");
108
109 StringBuffer buf = new StringBuffer(256);
110 Boolean first = true;
111 for (SlcEventListenerDescriptor descriptor : descriptors) {
112 if (first)
113 first = false;
114 else
115 buf.append(" OR ");
116
117 buf.append('(');
118 buf.append(SlcEvent.EVENT_TYPE).append("=").append('\'').append(
119 descriptor.getEventType()).append('\'');
120 if (descriptor.getFilter() != null) {
121 buf.append(" AND ");
122 buf.append('(').append(descriptor.getFilter()).append(')');
123 }
124 buf.append(')');
125 }
126 return buf.toString();
127 }
128
129 public void setEventsDestination(Topic eventsDestination) {
130 this.eventsDestination = eventsDestination;
131 }
132
133 public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory) {
134 this.jmsConnectionFactory = jmsConnectionFactory;
135 }
136
137 public void setMessageConverter(MessageConverter messageConverter) {
138 this.messageConverter = messageConverter;
139 }
140
141 public void init() {
142 try {
143 connection = jmsConnectionFactory.createConnection();
144 connection.setClientID(connectionClientId);
145 connection.start();
146 } catch (JMSException e) {
147 throw new SlcException("Could not init connection", e);
148 }
149 }
150
151 public void close() {
152 ConnectionFactoryUtils.releaseConnection(connection,
153 jmsConnectionFactory, true);
154 isClosed = true;
155 synchronized (subscriberIds) {
156 subscriberIds.notifyAll();
157 }
158 }
159
160 public boolean isClosed() {
161 return isClosed;
162 }
163
164 // public void close(String clientId) {
165 // // Session session = null;
166 // // // ListeningClient client = getClient(clientId);
167 // // // Connection connection = client.getConnection();
168 // // try {
169 // // session = client.getSession();
170 // // session.unsubscribe(clientId);
171 // // } catch (JMSException e) {
172 // // log.warn("Could not unsubscribe client " + clientId, e);
173 // // } finally {
174 // // JmsUtils.closeSession(session);
175 // // }
176 // //
177 // // // synchronized (client) {
178 // // // clients.remove(clientId);
179 // // // client.notify();
180 // // // }
181 // }
182
183 // protected ListeningClient getClient(String clientId) {
184 // ListeningClient client = clients.get(clientId);
185 // if (client == null) {
186 // // Lazy init
187 // client = new ListeningClient(connection);
188 // clients.put(clientId, client);
189 // }
190 // return client;
191 // }
192
193 // protected class ListeningClient {
194 // private final Connection connection;
195 // private final Session session;
196 //
197 // public ListeningClient(Connection connection) {
198 // super();
199 // this.connection = connection;
200 // try {
201 // session = connection.createSession(false,
202 // Session.AUTO_ACKNOWLEDGE);
203 // } catch (JMSException e) {
204 // throw new SlcException("Cannot create session");
205 // }
206 // }
207 //
208 // public Connection getConnection() {
209 // return connection;
210 // }
211 //
212 // public Session getSession() {
213 // return session;
214 // }
215 //
216 // }
217 }