--- /dev/null
+package org.argeo.slc.web.mvc;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.msg.event.SlcEvent;
+import org.argeo.slc.msg.event.SlcEventListener;
+import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
+import org.argeo.slc.msg.event.SlcEventListenerRegister;
+import org.springframework.web.context.request.RequestContextHolder;
+
+public class WebSlcEventListenerRegister implements SlcEventListenerRegister,
+ Serializable {
+ private final static Log log = LogFactory
+ .getLog(WebSlcEventListenerRegister.class);
+ public final static String ATTR_EVENT_LISTENER = "slcEventListener";
+
+ static final long serialVersionUID = 1l;
+
+ transient private SlcEventListener eventListener = null;
+
+ private String clientId = null;
+
+ /** Synchronized */
+ private List<SlcEventListenerDescriptor> descriptors = new Vector<SlcEventListenerDescriptor>();
+
+ public synchronized void addEventListenerDescriptor(
+ SlcEventListenerDescriptor eventListenerDescriptor) {
+ if (descriptors.contains(eventListenerDescriptor))
+ descriptors.remove(eventListenerDescriptor);
+ descriptors.add(eventListenerDescriptor);
+ }
+
+ public synchronized void removeEventListenerDescriptor(
+ SlcEventListenerDescriptor eventListenerDescriptor) {
+ descriptors.remove(eventListenerDescriptor);
+ }
+
+ // public synchronized List<SlcEventListenerDescriptor> getDescriptorsCopy()
+ // {
+ // return new ArrayList<SlcEventListenerDescriptor>(descriptors);
+ // }
+
+ public SlcEvent listen(SlcEventListener eventListener, Long timeout) {
+ checkClientId();
+ this.eventListener = eventListener;
+ return this.eventListener.listen(clientId, descriptors, timeout);
+ }
+
+ public void init() {
+ clientId = getSessionId();
+ checkClientId();
+
+ if (log.isDebugEnabled())
+ log.debug("Initialized web event listener " + clientId);
+ }
+
+ public void close() {
+ checkClientId();
+ if (eventListener != null)
+ eventListener.close(clientId);
+
+ if (log.isDebugEnabled())
+ log.debug("Closed web event listener " + clientId);
+ }
+
+ protected void checkClientId() {
+ String sessionId = getSessionId();
+ if (clientId == null || !clientId.equals(sessionId))
+ throw new SlcException("Client id " + clientId
+ + " not consistent with web session id " + sessionId);
+ }
+
+ protected String getSessionId() {
+ return RequestContextHolder.currentRequestAttributes().getSessionId();
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+}
public class PollEventController extends AbstractServiceController {
private final static Log log = LogFactory.getLog(PollEventController.class);
- private SlcEventListener eventListener;
+ private SlcEventListener eventListener = null;
+
private SlcEventListenerRegister eventListenerRegister;
private Long defaultTimeout = 10000l;
else
timeout = defaultTimeout;
- SlcEvent event = eventListener.listen(eventListenerRegister, timeout);
+ SlcEvent event = eventListenerRegister.listen(eventListener, timeout);
if (event != null) {
modelAndView.addObject("event", event);
}
}
- public void setEventListener(SlcEventListener slcEventListener) {
- this.eventListener = slcEventListener;
- }
-
public void setEventListenerRegister(
SlcEventListenerRegister eventListenerRegister) {
this.eventListenerRegister = eventListenerRegister;
this.defaultTimeout = defaultTimeout;
}
+ public void setEventListener(SlcEventListener eventListener) {
+ this.eventListener = eventListener;
+ }
+
}
package org.argeo.slc.jms;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
import org.argeo.slc.msg.event.SlcEventListenerRegister;
import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.JmsUtils;
import org.springframework.jms.support.converter.MessageConverter;
public class JmsSlcEventListener implements SlcEventListener {
private final static Log log = LogFactory.getLog(JmsSlcEventListener.class);
- private Destination eventsDestination;
+ private Topic eventsDestination;
private ConnectionFactory jmsConnectionFactory;
private MessageConverter messageConverter;
- public SlcEvent listen(SlcEventListenerRegister register, Long timeout) {
- JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory);
- jmsTemplate.setMessageConverter(messageConverter);
- jmsTemplate.setReceiveTimeout(timeout);
-
- List<SlcEventListenerDescriptor> descriptors = register
- .getDescriptorsCopy();
+ private Map<String, ListeningClient> clients = Collections
+ .synchronizedMap(new HashMap<String, ListeningClient>());
+ public SlcEvent listen(String clientId,
+ List<SlcEventListenerDescriptor> descriptors, Long timeout) {
if (descriptors.size() == 0) {
// No listeners, just waiting
try {
if (log.isTraceEnabled())
log.debug("Selector: " + selector);
- Object obj = jmsTemplate.receiveSelectedAndConvert(
- eventsDestination, selector);
+ Object obj = null;
+ Session session = null;
+ TopicSubscriber topicSubscriber = null;
+ // MessageConsumer messageConsumer = null;
+ try {
+ // Connection connection = getClient(clientId).getConnection();
+ // session = connection.createSession(false,
+ // Session.AUTO_ACKNOWLEDGE);
+ session = getClient(clientId).getSession();
+ topicSubscriber = session.createDurableSubscriber(
+ eventsDestination, clientId,
+ createSelector(descriptors), true);
+ Message message = topicSubscriber.receive(timeout);
+ // messageConsumer = session.createConsumer(eventsDestination,
+ // createSelector(descriptors));
+ // Message message = messageConsumer.receive(timeout);
+ obj = messageConverter.fromMessage(message);
+ } catch (JMSException e) {
+ throw new SlcException("Cannot poll events for client "
+ + clientId, e);
+ } finally {
+ // JmsUtils.closeMessageConsumer(messageConsumer);
+ JmsUtils.closeMessageConsumer(topicSubscriber);
+ // JmsUtils.closeSession(session);
+ }
if (obj == null)
return null;
}
}
+ /*
+ * public SlcEvent listen(SlcEventListenerRegister register, Long timeout) {
+ *
+ * JmsTemplate jmsTemplate = new JmsTemplate(jmsConnectionFactory);
+ * jmsTemplate.setMessageConverter(messageConverter);
+ * jmsTemplate.setReceiveTimeout(timeout);
+ *
+ * List<SlcEventListenerDescriptor> descriptors = register
+ * .getDescriptorsCopy();
+ *
+ * if (descriptors.size() == 0) { // No listeners, just waiting try {
+ * Thread.sleep(timeout); } catch (InterruptedException e) { // silent }
+ * return null; } else { String selector = createSelector(descriptors);
+ *
+ * if (log.isTraceEnabled()) log.debug("Selector: " + selector);
+ *
+ * Object obj = jmsTemplate.receiveSelectedAndConvert( eventsDestination,
+ * selector);
+ *
+ * if (obj == null) return null; else return (SlcEvent) obj; } }
+ */
+
/** Returns null if no filter */
protected String createSelector(List<SlcEventListenerDescriptor> descriptors) {
if (descriptors.size() == 0)
return buf.toString();
}
- public void setEventsDestination(Destination eventsDestination) {
+ public void setEventsDestination(Topic eventsDestination) {
this.eventsDestination = eventsDestination;
}
this.messageConverter = messageConverter;
}
+ public ListeningClient init(String clientId) {
+ Connection connection = null;
+ try {
+ connection = jmsConnectionFactory.createConnection();
+ connection.setClientID(clientId);
+ connection.start();
+ ListeningClient client = new ListeningClient(connection);
+ return client;
+ } catch (JMSException e) {
+ throw new SlcException("Could not init listening client "
+ + clientId, e);
+ } finally {
+ }
+ }
+
+ public void close(String clientId) {
+ Session session = null;
+ ListeningClient client = getClient(clientId);
+ Connection connection = client.getConnection();
+ try {
+ session = client.getSession();
+ session.unsubscribe(clientId);
+ } catch (JMSException e) {
+ log.warn("Could not unsubscribe client " + clientId, e);
+ } finally {
+ JmsUtils.closeSession(session);
+ }
+
+ // JmsUtils.closeSession(client.getSession());
+ clients.remove(clientId);
+
+ try {
+ connection.stop();
+ connection.close();
+ } catch (JMSException e) {
+ throw new SlcException("Could not close JMS connection for client "
+ + clientId, e);
+ } finally {
+ clients.remove(clientId);
+ }
+ }
+
+ protected ListeningClient getClient(String clientId) {
+ ListeningClient client = clients.get(clientId);
+ if (client == null) {
+ // Lazy init
+ client = init(clientId);
+ clients.put(clientId, client);
+ }
+ return client;
+ }
+
+ protected class ListeningClient {
+ private final Connection connection;
+ private final Session session;
+
+ public ListeningClient(Connection connection) {
+ super();
+ this.connection = connection;
+ try {
+ session = connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ } catch (JMSException e) {
+ throw new SlcException("Cannot create session");
+ }
+ }
+
+ public Connection getConnection() {
+ return connection;
+ }
+
+ public Session getSession() {
+ return session;
+ }
+
+ }
}
import java.util.Map;
+import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
+import org.argeo.slc.SlcException;
import org.argeo.slc.msg.event.SlcEvent;
import org.argeo.slc.msg.event.SlcEventPublisher;
import org.springframework.jms.core.JmsTemplate;
private JmsTemplate jmsTemplate;
public void publish(final SlcEvent event) {
+ if (jmsTemplate.getDeliveryMode() != DeliveryMode.PERSISTENT)
+ throw new SlcException(
+ "Delivery mode has to be persistent in order to have durable subscription");
+
jmsTemplate.convertAndSend(eventsDestination, event,
new MessagePostProcessor() {
return message;
}
});
- // jmsTemplate.send(eventsDestination, new MessageCreator() {
- // public Message createMessage(Session session) throws JMSException {
- // TextMessage msg = session.createTextMessage();
- // // TODO: remove workaround when upgrading to ActiveMQ 5.3
- // // Workaround for
- // // https://issues.apache.org/activemq/browse/AMQ-2046
- // msg.setText("");
- //
- // return msg;
- // }
- // });
}
public void setEventsDestination(Destination eventsDestination) {
package org.argeo.slc.msg.event;
+import java.util.List;
+
public interface SlcEventListener {
/**
* Blocks until an event is received or timeout is reached
* @return the event received or null if timeout was reached before
* receiving one
*/
- public SlcEvent listen(SlcEventListenerRegister register, Long timeout);
+ public SlcEvent listen(String clientId,
+ List<SlcEventListenerDescriptor> descriptors, Long timeout);
+
+ public void close(String clientId);
}
package org.argeo.slc.msg.event;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Vector;
-public class SlcEventListenerRegister implements Serializable {
- static final long serialVersionUID = 1l;
+public interface SlcEventListenerRegister {
+ public void addEventListenerDescriptor(
+ SlcEventListenerDescriptor eventListenerDescriptor);
- /** Synchronized */
- private List<SlcEventListenerDescriptor> descriptors = new Vector<SlcEventListenerDescriptor>();
+ public void removeEventListenerDescriptor(
+ SlcEventListenerDescriptor eventListenerDescriptor);
- public synchronized void addEventListenerDescriptor(
- SlcEventListenerDescriptor eventListenerDescriptor) {
- if (descriptors.contains(eventListenerDescriptor))
- descriptors.remove(eventListenerDescriptor);
- descriptors.add(eventListenerDescriptor);
- }
-
- public synchronized void removeEventListenerDescriptor(
- SlcEventListenerDescriptor eventListenerDescriptor) {
- descriptors.remove(eventListenerDescriptor);
- }
-
- public synchronized List<SlcEventListenerDescriptor> getDescriptorsCopy() {
- return new ArrayList<SlcEventListenerDescriptor>(descriptors);
- }
+ public SlcEvent listen(SlcEventListener eventListener, Long timeout);
}
</amq:broker>
<!-- Connection Factory -->
- <!-- -->
- <bean id="jmsConnectionFactory"
+ <!--
+ <bean id="jmsConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
- <property name="targetConnectionFactory">
+ <property name="targetConnectionFactory"> <bean
+ class="org.apache.activemq.ActiveMQConnectionFactory"> <property
+ name="brokerURL"> <value>vm://localhost</value> </property> </bean>
+ </property> </bean>
+ -->
+
+ <!---->
+ <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
+ destroy-method="stop" depends-on="broker">
+ <property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>vm://localhost</value>
</property>
</bean>
- <!--
- <bean id="jmsConnectionFactory"
- class="org.apache.activemq.pool.PooledConnectionFactory"
- destroy-method="stop" depends-on="broker"> <property
- name="connectionFactory"> <bean
- class="org.apache.activemq.ActiveMQConnectionFactory"> <property
- name="brokerURL"> <value>vm://localhost</value> </property> </bean>
- </property> </bean>
- -->
<bean id="jmsTransactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
<!-- Events -->
<bean id="jmsEventListener" class="org.argeo.slc.jms.JmsSlcEventListener">
- <property name="jmsConnectionFactory" ref="jmsConnectionFactory" />
+ <property name="jmsConnectionFactory">
+ <!--
+ Need its own connection factory in order to set client ids (not
+ possible on pool)
+ -->
+ <bean class="org.apache.activemq.ActiveMQConnectionFactory">
+ <property name="brokerURL">
+ <value>vm://localhost</value>
+ </property>
+ </bean>
+ </property>
<property name="eventsDestination" ref="slcJms.destination.events" />
<property name="messageConverter" ref="slcDefault.jms.castorMessageConverter" />
</bean>
<property name="eventListener" ref="eventListener" />
</bean>
- <bean name="eventListenerRegister" class="org.argeo.slc.msg.event.SlcEventListenerRegister"
- scope="session">
+ <bean name="eventListenerRegister" class="org.argeo.slc.web.mvc.WebSlcEventListenerRegister"
+ scope="session" init-method="init" destroy-method="close">
<aop:scoped-proxy />
</bean>