+/*
+ * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.argeo.slc.jms;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
+import java.util.UUID;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.argeo.slc.msg.event.SlcEvent;
import org.argeo.slc.msg.event.SlcEventListener;
import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
-import org.argeo.slc.msg.event.SlcEventListenerRegister;
-import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.connection.ConnectionFactoryUtils;
import org.springframework.jms.support.JmsUtils;
import org.springframework.jms.support.converter.MessageConverter;
private ConnectionFactory jmsConnectionFactory;
private MessageConverter messageConverter;
- private Map<String, ListeningClient> clients = Collections
- .synchronizedMap(new HashMap<String, ListeningClient>());
+ private Connection connection = null;
+ private String connectionClientId = getClass() + "#"
+ + UUID.randomUUID().toString();
+
+ private List<String> subscriberIds = new ArrayList<String>();
+
+ private Boolean isClosed = false;
+
+ // private Map<String, ListeningClient> clients = Collections
+ // .synchronizedMap(new HashMap<String, ListeningClient>());
- public SlcEvent listen(String clientId,
+ public SlcEvent listen(String subscriberId,
List<SlcEventListenerDescriptor> descriptors, Long timeout) {
if (descriptors.size() == 0) {
// No listeners, just waiting
try {
+ if(log.isTraceEnabled())
+ log.trace("No event listener registered, sleeping...");
Thread.sleep(timeout);
} catch (InterruptedException e) {
// silent
return null;
} else {
String selector = createSelector(descriptors);
-
if (log.isTraceEnabled())
log.debug("Selector: " + selector);
Object obj = null;
- Session session = null;
- TopicSubscriber topicSubscriber = null;
- // MessageConsumer messageConsumer = null;
- try {
- // Connection connection = getClient(clientId).getConnection();
- // session = connection.createSession(false,
- // Session.AUTO_ACKNOWLEDGE);
- session = getClient(clientId).getSession();
- topicSubscriber = session.createDurableSubscriber(
- eventsDestination, clientId,
- createSelector(descriptors), true);
- Message message = topicSubscriber.receive(timeout);
- // messageConsumer = session.createConsumer(eventsDestination,
- // createSelector(descriptors));
- // Message message = messageConsumer.receive(timeout);
- obj = messageConverter.fromMessage(message);
- } catch (JMSException e) {
- throw new SlcException("Cannot poll events for client "
- + clientId, e);
- } finally {
- // JmsUtils.closeMessageConsumer(messageConsumer);
- JmsUtils.closeMessageConsumer(topicSubscriber);
- // JmsUtils.closeSession(session);
+ synchronized (subscriberIds) {
+ while (subscriberIds.contains(subscriberId)) {
+ try {
+ subscriberIds.wait(500);
+ if (isClosed)
+ return null;
+ } catch (InterruptedException e) {
+ // silent
+ }
+ }
+
+ subscriberIds.add(subscriberId);
+ Session session = null;
+ TopicSubscriber topicSubscriber = null;
+ try {
+ // ListeningClient client = (ListeningClient)
+ // getClient(clientId);
+ session = connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ topicSubscriber = session.createDurableSubscriber(
+ eventsDestination, subscriberId,
+ createSelector(descriptors), true);
+ Message message = topicSubscriber.receive(timeout);
+ obj = messageConverter.fromMessage(message);
+ } catch (JMSException e) {
+ throw new SlcException("Cannot poll events for subscriber "
+ + subscriberId, e);
+ } finally {
+ JmsUtils.closeMessageConsumer(topicSubscriber);
+ JmsUtils.closeSession(session);
+ subscriberIds.remove(subscriberId);
+ subscriberIds.notifyAll();
+ }
+
}
if (obj == null)
}
}
- /*
- * public SlcEvent listen(SlcEventListenerRegister register, Long timeout) {
- *
- * JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory);
- * jmsTemplate.setMessageConverter(messageConverter);
- * jmsTemplate.setReceiveTimeout(timeout);
- *
- * List<SlcEventListenerDescriptor> descriptors = register
- * .getDescriptorsCopy();
- *
- * if (descriptors.size() == 0) { // No listeners, just waiting try {
- * Thread.sleep(timeout); } catch (InterruptedException e) { // silent }
- * return null; } else { String selector = createSelector(descriptors);
- *
- * if (log.isTraceEnabled()) log.debug("Selector: " + selector);
- *
- * Object obj = jmsTemplate.receiveSelectedAndConvert( eventsDestination,
- * selector);
- *
- * if (obj == null) return null; else return (SlcEvent) obj; } }
- */
-
/** Returns null if no filter */
protected String createSelector(List<SlcEventListenerDescriptor> descriptors) {
if (descriptors.size() == 0)
this.messageConverter = messageConverter;
}
- public ListeningClient init(String clientId) {
- Connection connection = null;
+ public void init() {
try {
connection = jmsConnectionFactory.createConnection();
- connection.setClientID(clientId);
+ connection.setClientID(connectionClientId);
connection.start();
- ListeningClient client = new ListeningClient(connection);
- return client;
} catch (JMSException e) {
- throw new SlcException("Could not init listening client "
- + clientId, e);
- } finally {
+ throw new SlcException("Could not init connection", e);
}
}
- public void close(String clientId) {
- Session session = null;
- ListeningClient client = getClient(clientId);
- Connection connection = client.getConnection();
- try {
- session = client.getSession();
- session.unsubscribe(clientId);
- } catch (JMSException e) {
- log.warn("Could not unsubscribe client " + clientId, e);
- } finally {
- JmsUtils.closeSession(session);
- }
-
- // JmsUtils.closeSession(client.getSession());
- clients.remove(clientId);
-
- try {
- connection.stop();
- connection.close();
- } catch (JMSException e) {
- throw new SlcException("Could not close JMS connection for client "
- + clientId, e);
- } finally {
- clients.remove(clientId);
+ public void close() {
+ ConnectionFactoryUtils.releaseConnection(connection,
+ jmsConnectionFactory, true);
+ isClosed = true;
+ synchronized (subscriberIds) {
+ subscriberIds.notifyAll();
}
}
- protected ListeningClient getClient(String clientId) {
- ListeningClient client = clients.get(clientId);
- if (client == null) {
- // Lazy init
- client = init(clientId);
- clients.put(clientId, client);
- }
- return client;
+ public boolean isClosed() {
+ return isClosed;
}
- protected class ListeningClient {
- private final Connection connection;
- private final Session session;
-
- public ListeningClient(Connection connection) {
- super();
- this.connection = connection;
- try {
- session = connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- } catch (JMSException e) {
- throw new SlcException("Cannot create session");
- }
- }
-
- public Connection getConnection() {
- return connection;
- }
-
- public Session getSession() {
- return session;
- }
-
- }
+ // public void close(String clientId) {
+ // // Session session = null;
+ // // // ListeningClient client = getClient(clientId);
+ // // // Connection connection = client.getConnection();
+ // // try {
+ // // session = client.getSession();
+ // // session.unsubscribe(clientId);
+ // // } catch (JMSException e) {
+ // // log.warn("Could not unsubscribe client " + clientId, e);
+ // // } finally {
+ // // JmsUtils.closeSession(session);
+ // // }
+ // //
+ // // // synchronized (client) {
+ // // // clients.remove(clientId);
+ // // // client.notify();
+ // // // }
+ // }
+
+ // protected ListeningClient getClient(String clientId) {
+ // ListeningClient client = clients.get(clientId);
+ // if (client == null) {
+ // // Lazy init
+ // client = new ListeningClient(connection);
+ // clients.put(clientId, client);
+ // }
+ // return client;
+ // }
+
+ // protected class ListeningClient {
+ // private final Connection connection;
+ // private final Session session;
+ //
+ // public ListeningClient(Connection connection) {
+ // super();
+ // this.connection = connection;
+ // try {
+ // session = connection.createSession(false,
+ // Session.AUTO_ACKNOWLEDGE);
+ // } catch (JMSException e) {
+ // throw new SlcException("Cannot create session");
+ // }
+ // }
+ //
+ // public Connection getConnection() {
+ // return connection;
+ // }
+ //
+ // public Session getSession() {
+ // return session;
+ // }
+ //
+ // }
}