]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java
Start working on serialized JMS
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsSlcEventListener.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.jms;
18
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.UUID;
22
23 import javax.jms.Connection;
24 import javax.jms.ConnectionFactory;
25 import javax.jms.JMSException;
26 import javax.jms.Message;
27 import javax.jms.Session;
28 import javax.jms.Topic;
29 import javax.jms.TopicSubscriber;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.argeo.slc.SlcException;
34 import org.argeo.slc.msg.event.SlcEvent;
35 import org.argeo.slc.msg.event.SlcEventListener;
36 import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
37 import org.springframework.jms.connection.ConnectionFactoryUtils;
38 import org.springframework.jms.support.JmsUtils;
39 import org.springframework.jms.support.converter.MessageConverter;
40
41 public class JmsSlcEventListener implements SlcEventListener {
42 private final static Log log = LogFactory.getLog(JmsSlcEventListener.class);
43
44 private Topic eventsDestination;
45 private ConnectionFactory jmsConnectionFactory;
46 private MessageConverter messageConverter;
47
48 private Connection connection = null;
49 private String connectionClientId = getClass() + "#"
50 + UUID.randomUUID().toString();
51
52 private List<String> subscriberIds = new ArrayList<String>();
53
54 private Boolean isClosed = false;
55
56 // private Map<String, ListeningClient> clients = Collections
57 // .synchronizedMap(new HashMap<String, ListeningClient>());
58
59 public SlcEvent listen(String subscriberId,
60 List<SlcEventListenerDescriptor> descriptors, Long timeout) {
61 if (descriptors.size() == 0) {
62 // No listeners, just waiting
63 try {
64 if(log.isTraceEnabled())
65 log.trace("No event listener registered, sleeping...");
66 Thread.sleep(timeout);
67 } catch (InterruptedException e) {
68 // silent
69 }
70 return null;
71 } else {
72 String selector = createSelector(descriptors);
73 if (log.isTraceEnabled())
74 log.debug("Selector: " + selector);
75
76 Object obj = null;
77 synchronized (subscriberIds) {
78 while (subscriberIds.contains(subscriberId)) {
79 try {
80 subscriberIds.wait(500);
81 if (isClosed)
82 return null;
83 } catch (InterruptedException e) {
84 // silent
85 }
86 }
87
88 subscriberIds.add(subscriberId);
89 Session session = null;
90 TopicSubscriber topicSubscriber = null;
91 try {
92 // ListeningClient client = (ListeningClient)
93 // getClient(clientId);
94 session = connection.createSession(false,
95 Session.AUTO_ACKNOWLEDGE);
96 topicSubscriber = session.createDurableSubscriber(
97 eventsDestination, subscriberId,
98 createSelector(descriptors), true);
99 Message message = topicSubscriber.receive(timeout);
100 obj = messageConverter.fromMessage(message);
101 } catch (JMSException e) {
102 throw new SlcException("Cannot poll events for subscriber "
103 + subscriberId, e);
104 } finally {
105 JmsUtils.closeMessageConsumer(topicSubscriber);
106 JmsUtils.closeSession(session);
107 subscriberIds.remove(subscriberId);
108 subscriberIds.notifyAll();
109 }
110
111 }
112
113 if (obj == null)
114 return null;
115 else
116 return (SlcEvent) obj;
117 }
118 }
119
120 /** Returns null if no filter */
121 protected String createSelector(List<SlcEventListenerDescriptor> descriptors) {
122 if (descriptors.size() == 0)
123 throw new SlcException("No listeners, cannot generate JMS selector");
124
125 StringBuffer buf = new StringBuffer(256);
126 Boolean first = true;
127 for (SlcEventListenerDescriptor descriptor : descriptors) {
128 if (first)
129 first = false;
130 else
131 buf.append(" OR ");
132
133 buf.append('(');
134 buf.append(SlcEvent.EVENT_TYPE).append("=").append('\'').append(
135 descriptor.getEventType()).append('\'');
136 if (descriptor.getFilter() != null) {
137 buf.append(" AND ");
138 buf.append('(').append(descriptor.getFilter()).append(')');
139 }
140 buf.append(')');
141 }
142 return buf.toString();
143 }
144
145 public void setEventsDestination(Topic eventsDestination) {
146 this.eventsDestination = eventsDestination;
147 }
148
149 public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory) {
150 this.jmsConnectionFactory = jmsConnectionFactory;
151 }
152
153 public void setMessageConverter(MessageConverter messageConverter) {
154 this.messageConverter = messageConverter;
155 }
156
157 public void init() {
158 try {
159 connection = jmsConnectionFactory.createConnection();
160 connection.setClientID(connectionClientId);
161 connection.start();
162 } catch (JMSException e) {
163 throw new SlcException("Could not init connection", e);
164 }
165 }
166
167 public void close() {
168 ConnectionFactoryUtils.releaseConnection(connection,
169 jmsConnectionFactory, true);
170 isClosed = true;
171 synchronized (subscriberIds) {
172 subscriberIds.notifyAll();
173 }
174 }
175
176 public boolean isClosed() {
177 return isClosed;
178 }
179
180 // public void close(String clientId) {
181 // // Session session = null;
182 // // // ListeningClient client = getClient(clientId);
183 // // // Connection connection = client.getConnection();
184 // // try {
185 // // session = client.getSession();
186 // // session.unsubscribe(clientId);
187 // // } catch (JMSException e) {
188 // // log.warn("Could not unsubscribe client " + clientId, e);
189 // // } finally {
190 // // JmsUtils.closeSession(session);
191 // // }
192 // //
193 // // // synchronized (client) {
194 // // // clients.remove(clientId);
195 // // // client.notify();
196 // // // }
197 // }
198
199 // protected ListeningClient getClient(String clientId) {
200 // ListeningClient client = clients.get(clientId);
201 // if (client == null) {
202 // // Lazy init
203 // client = new ListeningClient(connection);
204 // clients.put(clientId, client);
205 // }
206 // return client;
207 // }
208
209 // protected class ListeningClient {
210 // private final Connection connection;
211 // private final Session session;
212 //
213 // public ListeningClient(Connection connection) {
214 // super();
215 // this.connection = connection;
216 // try {
217 // session = connection.createSession(false,
218 // Session.AUTO_ACKNOWLEDGE);
219 // } catch (JMSException e) {
220 // throw new SlcException("Cannot create session");
221 // }
222 // }
223 //
224 // public Connection getConnection() {
225 // return connection;
226 // }
227 //
228 // public Session getSession() {
229 // return session;
230 // }
231 //
232 // }
233 }