]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java
@update:79; Simplify the execution of flows
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.activemq / src / main / java / org / argeo / slc / jms / JmsSlcEventListener.java
1 package org.argeo.slc.jms;
2
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.UUID;
6
7 import javax.jms.Connection;
8 import javax.jms.ConnectionFactory;
9 import javax.jms.JMSException;
10 import javax.jms.Message;
11 import javax.jms.Session;
12 import javax.jms.Topic;
13 import javax.jms.TopicSubscriber;
14
15 import org.apache.commons.logging.Log;
16 import org.apache.commons.logging.LogFactory;
17 import org.argeo.slc.SlcException;
18 import org.argeo.slc.msg.event.SlcEvent;
19 import org.argeo.slc.msg.event.SlcEventListener;
20 import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
21 import org.springframework.jms.connection.ConnectionFactoryUtils;
22 import org.springframework.jms.support.JmsUtils;
23 import org.springframework.jms.support.converter.MessageConverter;
24
25 public class JmsSlcEventListener implements SlcEventListener {
26 private final static Log log = LogFactory.getLog(JmsSlcEventListener.class);
27
28 private Topic eventsDestination;
29 private ConnectionFactory jmsConnectionFactory;
30 private MessageConverter messageConverter;
31
32 private Connection connection = null;
33 private String connectionClientId = getClass() + "#"
34 + UUID.randomUUID().toString();
35
36 private List<String> subscriberIds = new ArrayList<String>();
37
38 private Boolean isClosed = false;
39
40 // private Map<String, ListeningClient> clients = Collections
41 // .synchronizedMap(new HashMap<String, ListeningClient>());
42
43 public SlcEvent listen(String subscriberId,
44 List<SlcEventListenerDescriptor> descriptors, Long timeout) {
45 if (descriptors.size() == 0) {
46 // No listeners, just waiting
47 try {
48 Thread.sleep(timeout);
49 } catch (InterruptedException e) {
50 // silent
51 }
52 return null;
53 } else {
54 String selector = createSelector(descriptors);
55 if (log.isTraceEnabled())
56 log.debug("Selector: " + selector);
57
58 Object obj = null;
59 synchronized (subscriberIds) {
60 while (subscriberIds.contains(subscriberId)) {
61 try {
62 subscriberIds.wait(500);
63 if (isClosed)
64 return null;
65 } catch (InterruptedException e) {
66 // silent
67 }
68 }
69
70 subscriberIds.add(subscriberId);
71 Session session = null;
72 TopicSubscriber topicSubscriber = null;
73 try {
74 // ListeningClient client = (ListeningClient)
75 // getClient(clientId);
76 session = connection.createSession(false,
77 Session.AUTO_ACKNOWLEDGE);
78 topicSubscriber = session.createDurableSubscriber(
79 eventsDestination, subscriberId,
80 createSelector(descriptors), true);
81 Message message = topicSubscriber.receive(timeout);
82 obj = messageConverter.fromMessage(message);
83 } catch (JMSException e) {
84 throw new SlcException("Cannot poll events for subscriber "
85 + subscriberId, e);
86 } finally {
87 JmsUtils.closeMessageConsumer(topicSubscriber);
88 JmsUtils.closeSession(session);
89 subscriberIds.remove(subscriberId);
90 subscriberIds.notifyAll();
91 }
92
93 }
94
95 if (obj == null)
96 return null;
97 else
98 return (SlcEvent) obj;
99 }
100 }
101
102 /** Returns null if no filter */
103 protected String createSelector(List<SlcEventListenerDescriptor> descriptors) {
104 if (descriptors.size() == 0)
105 throw new SlcException("No listeners, cannot generate JMS selector");
106
107 StringBuffer buf = new StringBuffer(256);
108 Boolean first = true;
109 for (SlcEventListenerDescriptor descriptor : descriptors) {
110 if (first)
111 first = false;
112 else
113 buf.append(" OR ");
114
115 buf.append('(');
116 buf.append(SlcEvent.EVENT_TYPE).append("=").append('\'').append(
117 descriptor.getEventType()).append('\'');
118 if (descriptor.getFilter() != null) {
119 buf.append(" AND ");
120 buf.append('(').append(descriptor.getFilter()).append(')');
121 }
122 buf.append(')');
123 }
124 return buf.toString();
125 }
126
127 public void setEventsDestination(Topic eventsDestination) {
128 this.eventsDestination = eventsDestination;
129 }
130
131 public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory) {
132 this.jmsConnectionFactory = jmsConnectionFactory;
133 }
134
135 public void setMessageConverter(MessageConverter messageConverter) {
136 this.messageConverter = messageConverter;
137 }
138
139 public void init() {
140 try {
141 connection = jmsConnectionFactory.createConnection();
142 connection.setClientID(connectionClientId);
143 connection.start();
144 } catch (JMSException e) {
145 throw new SlcException("Could not init connection", e);
146 }
147 }
148
149 public void close() {
150 ConnectionFactoryUtils.releaseConnection(connection,
151 jmsConnectionFactory, true);
152 isClosed = true;
153 synchronized (subscriberIds) {
154 subscriberIds.notifyAll();
155 }
156 }
157
158 public boolean isClosed() {
159 return isClosed;
160 }
161
162 // public void close(String clientId) {
163 // // Session session = null;
164 // // // ListeningClient client = getClient(clientId);
165 // // // Connection connection = client.getConnection();
166 // // try {
167 // // session = client.getSession();
168 // // session.unsubscribe(clientId);
169 // // } catch (JMSException e) {
170 // // log.warn("Could not unsubscribe client " + clientId, e);
171 // // } finally {
172 // // JmsUtils.closeSession(session);
173 // // }
174 // //
175 // // // synchronized (client) {
176 // // // clients.remove(clientId);
177 // // // client.notify();
178 // // // }
179 // }
180
181 // protected ListeningClient getClient(String clientId) {
182 // ListeningClient client = clients.get(clientId);
183 // if (client == null) {
184 // // Lazy init
185 // client = new ListeningClient(connection);
186 // clients.put(clientId, client);
187 // }
188 // return client;
189 // }
190
191 // protected class ListeningClient {
192 // private final Connection connection;
193 // private final Session session;
194 //
195 // public ListeningClient(Connection connection) {
196 // super();
197 // this.connection = connection;
198 // try {
199 // session = connection.createSession(false,
200 // Session.AUTO_ACKNOWLEDGE);
201 // } catch (JMSException e) {
202 // throw new SlcException("Cannot create session");
203 // }
204 // }
205 //
206 // public Connection getConnection() {
207 // return connection;
208 // }
209 //
210 // public Session getSession() {
211 // return session;
212 // }
213 //
214 // }
215 }