+ public UserAdmin getUserAdmin() {
+ return userAdmin;
+ }
+
+ /*
+ * CMS Sessions
+ */
+
+ @Override
+ public synchronized CmsSession getCmsSession(Subject subject) {
+ if (subject.getPrivateCredentials(CmsSessionId.class).isEmpty())
+ return null;
+ CmsSessionId cmsSessionId = subject.getPrivateCredentials(CmsSessionId.class).iterator().next();
+ return getCmsSessionByUuid(cmsSessionId.getUuid());
+ }
+
+ public synchronized void registerCmsSession(CmsSessionImpl cmsSession) {
+ if (cmsSessionsByUuid.containsKey(cmsSession.getUuid())
+ || cmsSessionsByLocalId.containsKey(cmsSession.getLocalId()))
+ throw new IllegalStateException("CMS session " + cmsSession + " is already registered.");
+ cmsSessionsByUuid.put(cmsSession.getUuid(), cmsSession);
+ cmsSessionsByLocalId.put(cmsSession.getLocalId(), cmsSession);
+ }
+
+ public synchronized void unregisterCmsSession(CmsSessionImpl cmsSession) {
+ if (!cmsSessionsByUuid.containsKey(cmsSession.getUuid())
+ || !cmsSessionsByLocalId.containsKey(cmsSession.getLocalId()))
+ throw new IllegalStateException("CMS session " + cmsSession + " is not registered.");
+ CmsSession removed = cmsSessionsByUuid.remove(cmsSession.getUuid());
+ assert removed == cmsSession;
+ cmsSessionsByLocalId.remove(cmsSession.getLocalId());
+ }
+
+ /**
+ * The {@link CmsSession} related to this UUID, or <code>null</null> if not
+ * registered.
+ */
+ public synchronized CmsSessionImpl getCmsSessionByUuid(UUID uuid) {
+ return cmsSessionsByUuid.get(uuid);
+ }
+
+ /**
+ * The {@link CmsSession} related to this local id, or <code>null</null> if not
+ * registered.
+ */
+ public synchronized CmsSessionImpl getCmsSessionByLocalId(String localId) {
+ return cmsSessionsByLocalId.get(localId);
+ }
+
+ /*
+ * CMS Events
+ */
+ public void sendEvent(String topic, Map<String, Object> event) {
+ SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
+ if (publisher == null)
+ return; // no one is interested
+ publisher.submit(event);
+ }
+
+ public void addEventSubscriber(String topic, CmsEventSubscriber subscriber) {
+ synchronized (topics) {
+ if (!topics.containsKey(topic))
+ topics.put(topic, new SubmissionPublisher<>());
+ }
+ SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
+ CmsEventFlowSubscriber flowSubscriber = new CmsEventFlowSubscriber(topic, subscriber);
+ publisher.subscribe(flowSubscriber);
+ }
+
+ public void removeEventSubscriber(String topic, CmsEventSubscriber subscriber) {
+ SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
+ if (publisher == null) {
+ log.error("There should be an event topic " + topic);
+ return;
+ }
+ for (Flow.Subscriber<? super Map<String, Object>> flowSubscriber : publisher.getSubscribers()) {
+ if (flowSubscriber instanceof CmsEventFlowSubscriber)
+ ((CmsEventFlowSubscriber) flowSubscriber).unsubscribe();
+ }
+ synchronized (topics) {
+ if (!publisher.hasSubscribers()) {
+ publisher.close();
+ topics.remove(topic);
+ }
+ }
+ }
+
+ static class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
+ private String topic;
+ private CmsEventSubscriber eventSubscriber;
+
+ private Subscription subscription;
+
+ public CmsEventFlowSubscriber(String topic, CmsEventSubscriber eventSubscriber) {
+ this.topic = topic;
+ this.eventSubscriber = eventSubscriber;
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ this.subscription = subscription;
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(Map<String, Object> item) {
+ eventSubscriber.onEvent(topic, item);
+
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void onComplete() {
+ // TODO Auto-generated method stub
+
+ }
+
+ void unsubscribe() {
+ if (subscription != null)
+ subscription.cancel();
+ else
+ throw new IllegalStateException("No subscription to cancel");
+ }
+
+ }
+