Improve events and web sockets
[lgpl/argeo-commons.git] / org.argeo.cms / src / org / argeo / cms / internal / runtime / CmsContextImpl.java
index e14b21e7073cd39241250fa0ec337a2a795a83e8..ea9a401a4d3fc49ee2f617558432dbc11f170024 100644 (file)
@@ -1,34 +1,37 @@
 package org.argeo.cms.internal.runtime;
 
-import static java.util.Locale.ENGLISH;
-
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.SubmissionPublisher;
 
 import javax.security.auth.Subject;
 
-import org.argeo.api.cms.CmsConstants;
 import org.argeo.api.cms.CmsContext;
 import org.argeo.api.cms.CmsDeployment;
+import org.argeo.api.cms.CmsEventSubscriber;
 import org.argeo.api.cms.CmsLog;
 import org.argeo.api.cms.CmsSession;
 import org.argeo.api.cms.CmsSessionId;
 import org.argeo.api.cms.CmsState;
 import org.argeo.api.uuid.UuidFactory;
 import org.argeo.cms.CmsDeployProperty;
-import org.argeo.cms.LocaleUtils;
 import org.argeo.cms.internal.auth.CmsSessionImpl;
 import org.ietf.jgss.GSSCredential;
 import org.osgi.service.useradmin.UserAdmin;
 
 public class CmsContextImpl implements CmsContext {
+
        private final CmsLog log = CmsLog.getLog(getClass());
 //     private final BundleContext bc = FrameworkUtil.getBundle(getClass()).getBundleContext();
 
@@ -51,6 +54,10 @@ public class CmsContextImpl implements CmsContext {
        private Map<UUID, CmsSessionImpl> cmsSessionsByUuid = new HashMap<>();
        private Map<String, CmsSessionImpl> cmsSessionsByLocalId = new HashMap<>();
 
+       // CMS events
+       private Map<String, SubmissionPublisher<Map<String, Object>>> topics = new TreeMap<>();
+//     private IdentityHashMap<CmsEventSubscriber, List<CmsEventFlowSubscriber>> subscriptions = new IdentityHashMap<>();
+
 //     public CmsContextImpl() {
 //             initTrackers();
 //     }
@@ -311,4 +318,86 @@ public class CmsContextImpl implements CmsContext {
                return cmsSessionsByLocalId.get(localId);
        }
 
+       /*
+        * CMS Events
+        */
+       public void sendEvent(String topic, Map<String, Object> event) {
+               SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
+               if (publisher == null)
+                       return; // no one is interested
+               publisher.submit(event);
+       }
+
+       public void addEventSubscriber(String topic, CmsEventSubscriber subscriber) {
+               synchronized (topics) {
+                       if (!topics.containsKey(topic))
+                               topics.put(topic, new SubmissionPublisher<>());
+               }
+               SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
+               CmsEventFlowSubscriber flowSubscriber = new CmsEventFlowSubscriber(topic, subscriber);
+               publisher.subscribe(flowSubscriber);
+       }
+
+       public void removeEventSubscriber(String topic, CmsEventSubscriber subscriber) {
+               SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
+               if (publisher == null) {
+                       log.error("There should be an event topic " + topic);
+                       return;
+               }
+               for (Flow.Subscriber<? super Map<String, Object>> flowSubscriber : publisher.getSubscribers()) {
+                       if (flowSubscriber instanceof CmsEventFlowSubscriber)
+                               ((CmsEventFlowSubscriber) flowSubscriber).unsubscribe();
+               }
+               synchronized (topics) {
+                       if (!publisher.hasSubscribers()) {
+                               publisher.close();
+                               topics.remove(topic);
+                       }
+               }
+       }
+
+       static class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
+               private String topic;
+               private CmsEventSubscriber eventSubscriber;
+
+               private Subscription subscription;
+
+               public CmsEventFlowSubscriber(String topic, CmsEventSubscriber eventSubscriber) {
+                       this.topic = topic;
+                       this.eventSubscriber = eventSubscriber;
+               }
+
+               @Override
+               public void onSubscribe(Subscription subscription) {
+                       this.subscription = subscription;
+                       subscription.request(Long.MAX_VALUE);
+               }
+
+               @Override
+               public void onNext(Map<String, Object> item) {
+                       eventSubscriber.onEvent(topic, item);
+
+               }
+
+               @Override
+               public void onError(Throwable throwable) {
+                       // TODO Auto-generated method stub
+
+               }
+
+               @Override
+               public void onComplete() {
+                       // TODO Auto-generated method stub
+
+               }
+
+               void unsubscribe() {
+                       if (subscription != null)
+                               subscription.cancel();
+                       else
+                               throw new IllegalStateException("No subscription to cancel");
+               }
+
+       }
+
 }