]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java
0c702ab620e14fecd906f6978034c5ab95761216
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsSlcEventListener.java
1 package org.argeo.slc.jms;
2
3 import java.util.Collections;
4 import java.util.HashMap;
5 import java.util.List;
6 import java.util.Map;
7
8 import javax.jms.Connection;
9 import javax.jms.ConnectionFactory;
10 import javax.jms.DeliveryMode;
11 import javax.jms.Destination;
12 import javax.jms.JMSException;
13 import javax.jms.Message;
14 import javax.jms.MessageConsumer;
15 import javax.jms.Session;
16 import javax.jms.Topic;
17 import javax.jms.TopicSubscriber;
18
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.argeo.slc.SlcException;
22 import org.argeo.slc.msg.event.SlcEvent;
23 import org.argeo.slc.msg.event.SlcEventListener;
24 import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
25 import org.argeo.slc.msg.event.SlcEventListenerRegister;
26 import org.springframework.jms.core.JmsTemplate;
27 import org.springframework.jms.support.JmsUtils;
28 import org.springframework.jms.support.converter.MessageConverter;
29
30 public class JmsSlcEventListener implements SlcEventListener {
31 private final static Log log = LogFactory.getLog(JmsSlcEventListener.class);
32
33 private Topic eventsDestination;
34 private ConnectionFactory jmsConnectionFactory;
35 private MessageConverter messageConverter;
36
37 private Map<String, ListeningClient> clients = Collections
38 .synchronizedMap(new HashMap<String, ListeningClient>());
39
40 public SlcEvent listen(String clientId,
41 List<SlcEventListenerDescriptor> descriptors, Long timeout) {
42 if (descriptors.size() == 0) {
43 // No listeners, just waiting
44 try {
45 Thread.sleep(timeout);
46 } catch (InterruptedException e) {
47 // silent
48 }
49 return null;
50 } else {
51 String selector = createSelector(descriptors);
52
53 if (log.isTraceEnabled())
54 log.debug("Selector: " + selector);
55
56 Object obj = null;
57 Session session = null;
58 TopicSubscriber topicSubscriber = null;
59 // MessageConsumer messageConsumer = null;
60 try {
61 // Connection connection = getClient(clientId).getConnection();
62 // session = connection.createSession(false,
63 // Session.AUTO_ACKNOWLEDGE);
64 session = getClient(clientId).getSession();
65 topicSubscriber = session.createDurableSubscriber(
66 eventsDestination, clientId,
67 createSelector(descriptors), true);
68 Message message = topicSubscriber.receive(timeout);
69 // messageConsumer = session.createConsumer(eventsDestination,
70 // createSelector(descriptors));
71 // Message message = messageConsumer.receive(timeout);
72 obj = messageConverter.fromMessage(message);
73 } catch (JMSException e) {
74 throw new SlcException("Cannot poll events for client "
75 + clientId, e);
76 } finally {
77 // JmsUtils.closeMessageConsumer(messageConsumer);
78 JmsUtils.closeMessageConsumer(topicSubscriber);
79 // JmsUtils.closeSession(session);
80 }
81
82 if (obj == null)
83 return null;
84 else
85 return (SlcEvent) obj;
86 }
87 }
88
89 /*
90 * public SlcEvent listen(SlcEventListenerRegister register, Long timeout) {
91 *
92 * JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory);
93 * jmsTemplate.setMessageConverter(messageConverter);
94 * jmsTemplate.setReceiveTimeout(timeout);
95 *
96 * List<SlcEventListenerDescriptor> descriptors = register
97 * .getDescriptorsCopy();
98 *
99 * if (descriptors.size() == 0) { // No listeners, just waiting try {
100 * Thread.sleep(timeout); } catch (InterruptedException e) { // silent }
101 * return null; } else { String selector = createSelector(descriptors);
102 *
103 * if (log.isTraceEnabled()) log.debug("Selector: " + selector);
104 *
105 * Object obj = jmsTemplate.receiveSelectedAndConvert( eventsDestination,
106 * selector);
107 *
108 * if (obj == null) return null; else return (SlcEvent) obj; } }
109 */
110
111 /** Returns null if no filter */
112 protected String createSelector(List<SlcEventListenerDescriptor> descriptors) {
113 if (descriptors.size() == 0)
114 throw new SlcException("No listeners, cannot generate JMS selector");
115
116 StringBuffer buf = new StringBuffer(256);
117 Boolean first = true;
118 for (SlcEventListenerDescriptor descriptor : descriptors) {
119 if (first)
120 first = false;
121 else
122 buf.append(" OR ");
123
124 buf.append('(');
125 buf.append(SlcEvent.EVENT_TYPE).append("=").append('\'').append(
126 descriptor.getEventType()).append('\'');
127 if (descriptor.getFilter() != null) {
128 buf.append(" AND ");
129 buf.append('(').append(descriptor.getFilter()).append(')');
130 }
131 buf.append(')');
132 }
133 return buf.toString();
134 }
135
136 public void setEventsDestination(Topic eventsDestination) {
137 this.eventsDestination = eventsDestination;
138 }
139
140 public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory) {
141 this.jmsConnectionFactory = jmsConnectionFactory;
142 }
143
144 public void setMessageConverter(MessageConverter messageConverter) {
145 this.messageConverter = messageConverter;
146 }
147
148 public ListeningClient init(String clientId) {
149 Connection connection = null;
150 try {
151 connection = jmsConnectionFactory.createConnection();
152 connection.setClientID(clientId);
153 connection.start();
154 ListeningClient client = new ListeningClient(connection);
155 return client;
156 } catch (JMSException e) {
157 throw new SlcException("Could not init listening client "
158 + clientId, e);
159 } finally {
160 }
161 }
162
163 public void close(String clientId) {
164 Session session = null;
165 ListeningClient client = getClient(clientId);
166 Connection connection = client.getConnection();
167 try {
168 session = client.getSession();
169 session.unsubscribe(clientId);
170 } catch (JMSException e) {
171 log.warn("Could not unsubscribe client " + clientId, e);
172 } finally {
173 JmsUtils.closeSession(session);
174 }
175
176 // JmsUtils.closeSession(client.getSession());
177 clients.remove(clientId);
178
179 try {
180 connection.stop();
181 connection.close();
182 } catch (JMSException e) {
183 throw new SlcException("Could not close JMS connection for client "
184 + clientId, e);
185 } finally {
186 clients.remove(clientId);
187 }
188 }
189
190 protected ListeningClient getClient(String clientId) {
191 ListeningClient client = clients.get(clientId);
192 if (client == null) {
193 // Lazy init
194 client = init(clientId);
195 clients.put(clientId, client);
196 }
197 return client;
198 }
199
200 protected class ListeningClient {
201 private final Connection connection;
202 private final Session session;
203
204 public ListeningClient(Connection connection) {
205 super();
206 this.connection = connection;
207 try {
208 session = connection.createSession(false,
209 Session.AUTO_ACKNOWLEDGE);
210 } catch (JMSException e) {
211 throw new SlcException("Cannot create session");
212 }
213 }
214
215 public Connection getConnection() {
216 return connection;
217 }
218
219 public Session getSession() {
220 return session;
221 }
222
223 }
224 }