X-Git-Url: https://git.argeo.org/?a=blobdiff_plain;f=org.argeo.cms%2Fsrc%2Forg%2Fargeo%2Fcms%2Finternal%2Fruntime%2FCmsContextImpl.java;h=7a0f3388c07a4f79cabdd6928de9fa210e3bc511;hb=f3ea14abccc33b1c3326417a87c91145be776c72;hp=e14b21e7073cd39241250fa0ec337a2a795a83e8;hpb=dca2b13e0e3ca3e7a9469e089b980c48c880ad1a;p=lgpl%2Fargeo-commons.git diff --git a/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsContextImpl.java b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsContextImpl.java index e14b21e70..7a0f3388c 100644 --- a/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsContextImpl.java +++ b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsContextImpl.java @@ -1,34 +1,36 @@ package org.argeo.cms.internal.runtime; -import static java.util.Locale.ENGLISH; - import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.SubmissionPublisher; import javax.security.auth.Subject; -import org.argeo.api.cms.CmsConstants; import org.argeo.api.cms.CmsContext; import org.argeo.api.cms.CmsDeployment; +import org.argeo.api.cms.CmsEventSubscriber; import org.argeo.api.cms.CmsLog; import org.argeo.api.cms.CmsSession; import org.argeo.api.cms.CmsSessionId; import org.argeo.api.cms.CmsState; import org.argeo.api.uuid.UuidFactory; import org.argeo.cms.CmsDeployProperty; -import org.argeo.cms.LocaleUtils; import org.argeo.cms.internal.auth.CmsSessionImpl; import org.ietf.jgss.GSSCredential; import org.osgi.service.useradmin.UserAdmin; public class CmsContextImpl implements CmsContext { + private final CmsLog log = CmsLog.getLog(getClass()); // private final BundleContext bc = FrameworkUtil.getBundle(getClass()).getBundleContext(); @@ -51,6 +53,10 @@ public class CmsContextImpl implements CmsContext { private Map cmsSessionsByUuid = new HashMap<>(); private Map cmsSessionsByLocalId = new HashMap<>(); + // CMS events + private Map>> topics = new TreeMap<>(); +// private IdentityHashMap> subscriptions = new IdentityHashMap<>(); + // public CmsContextImpl() { // initTrackers(); // } @@ -311,4 +317,86 @@ public class CmsContextImpl implements CmsContext { return cmsSessionsByLocalId.get(localId); } + /* + * CMS Events + */ + public void sendEvent(String topic, Map event) { + SubmissionPublisher> publisher = topics.get(topic); + if (publisher == null) + return; // no one is interested + publisher.submit(event); + } + + public void addEventSubscriber(String topic, CmsEventSubscriber subscriber) { + synchronized (topics) { + if (!topics.containsKey(topic)) + topics.put(topic, new SubmissionPublisher<>()); + } + SubmissionPublisher> publisher = topics.get(topic); + CmsEventFlowSubscriber flowSubscriber = new CmsEventFlowSubscriber(topic, subscriber); + publisher.subscribe(flowSubscriber); + } + + public void removeEventSubscriber(String topic, CmsEventSubscriber subscriber) { + SubmissionPublisher> publisher = topics.get(topic); + if (publisher == null) { + log.error("There should be an event topic " + topic); + return; + } + for (Flow.Subscriber> flowSubscriber : publisher.getSubscribers()) { + if (flowSubscriber instanceof CmsEventFlowSubscriber) + ((CmsEventFlowSubscriber) flowSubscriber).unsubscribe(); + } + synchronized (topics) { + if (!publisher.hasSubscribers()) { + publisher.close(); + topics.remove(topic); + } + } + } + + static class CmsEventFlowSubscriber implements Flow.Subscriber> { + private String topic; + private CmsEventSubscriber eventSubscriber; + + private Subscription subscription; + + public CmsEventFlowSubscriber(String topic, CmsEventSubscriber eventSubscriber) { + this.topic = topic; + this.eventSubscriber = eventSubscriber; + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Map item) { + eventSubscriber.onEvent(topic, item); + + } + + @Override + public void onError(Throwable throwable) { + // TODO Auto-generated method stub + + } + + @Override + public void onComplete() { + // TODO Auto-generated method stub + + } + + void unsubscribe() { + if (subscription != null) + subscription.cancel(); + else + throw new IllegalStateException("No subscription to cancel"); + } + + } + }