Introduce and activate synchronous CMS event bus
authorMathieu <mbaudier@argeo.org>
Thu, 12 Jan 2023 06:06:47 +0000 (07:06 +0100)
committerMathieu <mbaudier@argeo.org>
Thu, 12 Jan 2023 06:06:47 +0000 (07:06 +0100)
org.argeo.cms/OSGI-INF/cmsEventBus.xml
org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java
org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsSynchronousEventBusImpl.java [new file with mode: 0644]

index 6bb67ceedc3603b6274cb0b02b63a42f19d23589..2c4a76698937a2892f7a2a504379dff6bd17a23d 100644 (file)
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0" name="CMS Event Bus">
-   <implementation class="org.argeo.cms.internal.runtime.CmsEventBusImpl"/>
+   <implementation class="org.argeo.cms.internal.runtime.CmsSynchronousEventBusImpl"/>
    <service>
       <provide interface="org.argeo.api.cms.CmsEventBus"/>
    </service>
index 9d9e9bcfa58c6e662db3bc4065b67a813c3d2412..07af1b8494f52b41c9e5c705a8e1c3cef90e5324 100644 (file)
@@ -10,7 +10,7 @@ import org.argeo.api.cms.CmsEventBus;
 import org.argeo.api.cms.CmsEventSubscriber;
 import org.argeo.api.cms.CmsLog;
 
-/** {@link CmsEventBus} implementation based on {@link Flow}. */
+/** An asynchronous {@link CmsEventBus} implementation based on {@link Flow}. */
 public class CmsEventBusImpl implements CmsEventBus {
        private final static CmsLog log = CmsLog.getLog(CmsEventBus.class);
 
diff --git a/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsSynchronousEventBusImpl.java b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsSynchronousEventBusImpl.java
new file mode 100644 (file)
index 0000000..f7db374
--- /dev/null
@@ -0,0 +1,76 @@
+package org.argeo.cms.internal.runtime;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.argeo.api.cms.CmsEventBus;
+import org.argeo.api.cms.CmsEventSubscriber;
+import org.argeo.api.cms.CmsLog;
+
+/** A simple synchronous {@link CmsEventBus} implementation. */
+public class CmsSynchronousEventBusImpl implements CmsEventBus {
+       private final static CmsLog log = CmsLog.getLog(CmsSynchronousEventBusImpl.class);
+
+       private final Map<String, List<CmsEventSubscriber>> subscribers;
+
+       public CmsSynchronousEventBusImpl() {
+               subscribers = Collections.synchronizedMap(new HashMap<>());
+       }
+
+       @Override
+       public void sendEvent(String topic, Map<String, Object> event) {
+               List<CmsEventSubscriber> subscribersOfTopic = subscribers.get(topic);
+               if (subscribersOfTopic == null) // no one cares
+                       return;
+               synchronized (subscribersOfTopic) {
+                       for (Iterator<CmsEventSubscriber> it = subscribersOfTopic.iterator(); it.hasNext();) {
+                               CmsEventSubscriber subscriber = it.next();
+                               try {
+                                       subscriber.onEvent(topic, event);
+                               } catch (Throwable e) {
+                                       log.error("Cannot process in topic " + topic + " the event " + event + " for subscriber "
+                                                       + subscriber, e);
+                               }
+                       }
+               }
+               log.trace(() -> "Dispatched event in topic " + topic + ": " + event);
+       }
+
+       @Override
+       public synchronized void addEventSubscriber(String topic, CmsEventSubscriber eventSubscriber) {
+               if (!subscribers.containsKey(topic)) {
+                       subscribers.put(topic, new ArrayList<>());
+               }
+               subscribers.get(topic).add(eventSubscriber);
+               log.debug(() -> "Added subscriber " + eventSubscriber + " to topic " + topic);
+       }
+
+       @Override
+       public synchronized void removeEventSubscriber(String topic, CmsEventSubscriber eventSubscriber) {
+               List<CmsEventSubscriber> subscribersOfTopic = subscribers.get(topic);
+               assert subscribersOfTopic != null;
+               if (subscribersOfTopic != null) {
+                       CmsEventSubscriber removedSubscriber = null;
+                       synchronized (subscribersOfTopic) {
+                               subscribersOfTopic: for (Iterator<CmsEventSubscriber> it = subscribersOfTopic.iterator(); it
+                                               .hasNext();) {
+                                       CmsEventSubscriber subscriber = it.next();
+                                       if (subscriber == eventSubscriber) {
+                                               it.remove();
+                                               removedSubscriber = subscriber;
+                                               log.debug(() -> "Removed subscriber " + eventSubscriber + " from topic " + topic);
+                                               break subscribersOfTopic;
+                                       }
+                               }
+                       }
+                       if (removedSubscriber == null)
+                               log.warn(() -> "Subscriber " + eventSubscriber + " not found (and therefore not removed) in topic "
+                                               + topic);
+               }
+       }
+
+}