+/*
+ * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.argeo.slc.jms;
import java.util.ArrayList;
public class JmsSlcEventListener implements SlcEventListener {
private final static Log log = LogFactory.getLog(JmsSlcEventListener.class);
+ // IoC
private Topic eventsDestination;
private ConnectionFactory jmsConnectionFactory;
private MessageConverter messageConverter;
+ // Initialized with init() method, released with close()
private Connection connection = null;
+
+ // One by instance
private String connectionClientId = getClass() + "#"
+ UUID.randomUUID().toString();
+ private Boolean isClosed = false;
private List<String> subscriberIds = new ArrayList<String>();
- private Boolean isClosed = false;
-
// private Map<String, ListeningClient> clients = Collections
// .synchronizedMap(new HashMap<String, ListeningClient>());
public SlcEvent listen(String subscriberId,
List<SlcEventListenerDescriptor> descriptors, Long timeout) {
if (descriptors.size() == 0) {
- // No listeners, just waiting
+ // No listener, just waiting
try {
- if(log.isTraceEnabled())
+ if (log.isTraceEnabled())
log.trace("No event listener registered, sleeping...");
Thread.sleep(timeout);
} catch (InterruptedException e) {
}
return null;
} else {
- String selector = createSelector(descriptors);
- if (log.isTraceEnabled())
- log.debug("Selector: " + selector);
-
Object obj = null;
synchronized (subscriberIds) {
while (subscriberIds.contains(subscriberId)) {
// silent
}
}
-
subscriberIds.add(subscriberId);
Session session = null;
TopicSubscriber topicSubscriber = null;
}
buf.append(')');
}
+ if (log.isTraceEnabled())
+ log.trace("selector created : " + buf.toString());
return buf.toString();
}
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ // Ioc
public void setEventsDestination(Topic eventsDestination) {
this.eventsDestination = eventsDestination;
}
this.messageConverter = messageConverter;
}
+ // Life Cycle
public void init() {
try {
connection = jmsConnectionFactory.createConnection();
}
}
- public boolean isClosed() {
- return isClosed;
- }
-
// public void close(String clientId) {
// // Session session = null;
// // // ListeningClient client = getClient(clientId);