From 691171dad89a839d8c1aff859fcd4242129ce57b Mon Sep 17 00:00:00 2001 From: Mathieu Date: Thu, 12 Jan 2023 07:06:47 +0100 Subject: [PATCH] Introduce and activate synchronous CMS event bus --- org.argeo.cms/OSGI-INF/cmsEventBus.xml | 2 +- .../cms/internal/runtime/CmsEventBusImpl.java | 2 +- .../runtime/CmsSynchronousEventBusImpl.java | 76 +++++++++++++++++++ 3 files changed, 78 insertions(+), 2 deletions(-) create mode 100644 org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsSynchronousEventBusImpl.java diff --git a/org.argeo.cms/OSGI-INF/cmsEventBus.xml b/org.argeo.cms/OSGI-INF/cmsEventBus.xml index 6bb67ceed..2c4a76698 100644 --- a/org.argeo.cms/OSGI-INF/cmsEventBus.xml +++ b/org.argeo.cms/OSGI-INF/cmsEventBus.xml @@ -1,6 +1,6 @@ - + diff --git a/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java index 9d9e9bcfa..07af1b849 100644 --- a/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java +++ b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java @@ -10,7 +10,7 @@ import org.argeo.api.cms.CmsEventBus; import org.argeo.api.cms.CmsEventSubscriber; import org.argeo.api.cms.CmsLog; -/** {@link CmsEventBus} implementation based on {@link Flow}. */ +/** An asynchronous {@link CmsEventBus} implementation based on {@link Flow}. */ public class CmsEventBusImpl implements CmsEventBus { private final static CmsLog log = CmsLog.getLog(CmsEventBus.class); diff --git a/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsSynchronousEventBusImpl.java b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsSynchronousEventBusImpl.java new file mode 100644 index 000000000..f7db37433 --- /dev/null +++ b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsSynchronousEventBusImpl.java @@ -0,0 +1,76 @@ +package org.argeo.cms.internal.runtime; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.argeo.api.cms.CmsEventBus; +import org.argeo.api.cms.CmsEventSubscriber; +import org.argeo.api.cms.CmsLog; + +/** A simple synchronous {@link CmsEventBus} implementation. */ +public class CmsSynchronousEventBusImpl implements CmsEventBus { + private final static CmsLog log = CmsLog.getLog(CmsSynchronousEventBusImpl.class); + + private final Map> subscribers; + + public CmsSynchronousEventBusImpl() { + subscribers = Collections.synchronizedMap(new HashMap<>()); + } + + @Override + public void sendEvent(String topic, Map event) { + List subscribersOfTopic = subscribers.get(topic); + if (subscribersOfTopic == null) // no one cares + return; + synchronized (subscribersOfTopic) { + for (Iterator it = subscribersOfTopic.iterator(); it.hasNext();) { + CmsEventSubscriber subscriber = it.next(); + try { + subscriber.onEvent(topic, event); + } catch (Throwable e) { + log.error("Cannot process in topic " + topic + " the event " + event + " for subscriber " + + subscriber, e); + } + } + } + log.trace(() -> "Dispatched event in topic " + topic + ": " + event); + } + + @Override + public synchronized void addEventSubscriber(String topic, CmsEventSubscriber eventSubscriber) { + if (!subscribers.containsKey(topic)) { + subscribers.put(topic, new ArrayList<>()); + } + subscribers.get(topic).add(eventSubscriber); + log.debug(() -> "Added subscriber " + eventSubscriber + " to topic " + topic); + } + + @Override + public synchronized void removeEventSubscriber(String topic, CmsEventSubscriber eventSubscriber) { + List subscribersOfTopic = subscribers.get(topic); + assert subscribersOfTopic != null; + if (subscribersOfTopic != null) { + CmsEventSubscriber removedSubscriber = null; + synchronized (subscribersOfTopic) { + subscribersOfTopic: for (Iterator it = subscribersOfTopic.iterator(); it + .hasNext();) { + CmsEventSubscriber subscriber = it.next(); + if (subscriber == eventSubscriber) { + it.remove(); + removedSubscriber = subscriber; + log.debug(() -> "Removed subscriber " + eventSubscriber + " from topic " + topic); + break subscribersOfTopic; + } + } + } + if (removedSubscriber == null) + log.warn(() -> "Subscriber " + eventSubscriber + " not found (and therefore not removed) in topic " + + topic); + } + } + +} -- 2.30.2