import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.argeo.slc.msg.event.SlcEvent;
import org.argeo.slc.msg.event.SlcEventListener;
import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
-import org.argeo.slc.msg.event.SlcEventListenerRegister;
-import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.connection.ConnectionFactoryUtils;
import org.springframework.jms.support.JmsUtils;
import org.springframework.jms.support.converter.MessageConverter;
private ConnectionFactory jmsConnectionFactory;
private MessageConverter messageConverter;
+ private Connection connection = null;
+ private String connectionClientId = getClass() + "#"
+ + UUID.randomUUID().toString();
+
private Map<String, ListeningClient> clients = Collections
.synchronizedMap(new HashMap<String, ListeningClient>());
Object obj = null;
Session session = null;
TopicSubscriber topicSubscriber = null;
- // MessageConsumer messageConsumer = null;
try {
- // Connection connection = getClient(clientId).getConnection();
- // session = connection.createSession(false,
- // Session.AUTO_ACKNOWLEDGE);
session = getClient(clientId).getSession();
topicSubscriber = session.createDurableSubscriber(
eventsDestination, clientId,
createSelector(descriptors), true);
Message message = topicSubscriber.receive(timeout);
- // messageConsumer = session.createConsumer(eventsDestination,
- // createSelector(descriptors));
- // Message message = messageConsumer.receive(timeout);
obj = messageConverter.fromMessage(message);
} catch (JMSException e) {
throw new SlcException("Cannot poll events for client "
+ clientId, e);
} finally {
- // JmsUtils.closeMessageConsumer(messageConsumer);
JmsUtils.closeMessageConsumer(topicSubscriber);
- // JmsUtils.closeSession(session);
}
if (obj == null)
}
}
- /*
- * public SlcEvent listen(SlcEventListenerRegister register, Long timeout) {
- *
- * JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory);
- * jmsTemplate.setMessageConverter(messageConverter);
- * jmsTemplate.setReceiveTimeout(timeout);
- *
- * List<SlcEventListenerDescriptor> descriptors = register
- * .getDescriptorsCopy();
- *
- * if (descriptors.size() == 0) { // No listeners, just waiting try {
- * Thread.sleep(timeout); } catch (InterruptedException e) { // silent }
- * return null; } else { String selector = createSelector(descriptors);
- *
- * if (log.isTraceEnabled()) log.debug("Selector: " + selector);
- *
- * Object obj = jmsTemplate.receiveSelectedAndConvert( eventsDestination,
- * selector);
- *
- * if (obj == null) return null; else return (SlcEvent) obj; } }
- */
-
/** Returns null if no filter */
protected String createSelector(List<SlcEventListenerDescriptor> descriptors) {
if (descriptors.size() == 0)
this.messageConverter = messageConverter;
}
- public ListeningClient init(String clientId) {
- Connection connection = null;
+ public void init() {
try {
connection = jmsConnectionFactory.createConnection();
- connection.setClientID(clientId);
+ connection.setClientID(connectionClientId);
connection.start();
- ListeningClient client = new ListeningClient(connection);
- return client;
} catch (JMSException e) {
- throw new SlcException("Could not init listening client "
- + clientId, e);
- } finally {
+ throw new SlcException("Could not init connection", e);
}
}
+ public void close() {
+ ConnectionFactoryUtils.releaseConnection(connection,
+ jmsConnectionFactory, true);
+ }
+
public void close(String clientId) {
Session session = null;
ListeningClient client = getClient(clientId);
- Connection connection = client.getConnection();
+ // Connection connection = client.getConnection();
try {
session = client.getSession();
session.unsubscribe(clientId);
// JmsUtils.closeSession(client.getSession());
clients.remove(clientId);
- try {
- connection.stop();
- connection.close();
- } catch (JMSException e) {
- throw new SlcException("Could not close JMS connection for client "
- + clientId, e);
- } finally {
- clients.remove(clientId);
- }
+ // try {
+ // connection.stop();
+ // connection.close();
+ // } catch (JMSException e) {
+ // throw new SlcException("Could not close JMS connection for client "
+ // + clientId, e);
+ // } finally {
+ // clients.remove(clientId);
+ // }
}
protected ListeningClient getClient(String clientId) {
ListeningClient client = clients.get(clientId);
if (client == null) {
// Lazy init
- client = init(clientId);
+ client = new ListeningClient(connection);
clients.put(clientId, client);
}
return client;