1 package org
.argeo
.cms
.internal
.runtime
;
4 import java
.util
.TreeMap
;
5 import java
.util
.concurrent
.Flow
;
6 import java
.util
.concurrent
.Flow
.Subscription
;
7 import java
.util
.concurrent
.SubmissionPublisher
;
9 import org
.argeo
.api
.cms
.CmsEventBus
;
10 import org
.argeo
.api
.cms
.CmsEventSubscriber
;
11 import org
.argeo
.api
.cms
.CmsLog
;
13 /** {@link CmsEventBus} implementation based on {@link Flow}. */
14 public class CmsEventBusImpl
implements CmsEventBus
{
15 private final CmsLog log
= CmsLog
.getLog(CmsEventBus
.class);
17 private Map
<String
, SubmissionPublisher
<Map
<String
, Object
>>> topics
= new TreeMap
<>();
20 public void sendEvent(String topic
, Map
<String
, Object
> event
) {
21 SubmissionPublisher
<Map
<String
, Object
>> publisher
= topics
.get(topic
);
22 if (publisher
== null)
23 return; // no one is interested
24 publisher
.submit(event
);
28 public void addEventSubscriber(String topic
, CmsEventSubscriber subscriber
) {
29 synchronized (topics
) {
30 if (!topics
.containsKey(topic
))
31 topics
.put(topic
, new SubmissionPublisher
<>());
33 SubmissionPublisher
<Map
<String
, Object
>> publisher
= topics
.get(topic
);
34 CmsEventFlowSubscriber flowSubscriber
= new CmsEventFlowSubscriber(topic
, subscriber
);
35 publisher
.subscribe(flowSubscriber
);
39 public void removeEventSubscriber(String topic
, CmsEventSubscriber subscriber
) {
40 SubmissionPublisher
<Map
<String
, Object
>> publisher
= topics
.get(topic
);
41 if (publisher
== null) {
42 log
.error("There should be an event topic " + topic
);
45 for (Flow
.Subscriber
<?
super Map
<String
, Object
>> flowSubscriber
: publisher
.getSubscribers()) {
46 if (flowSubscriber
instanceof CmsEventFlowSubscriber
)
47 ((CmsEventFlowSubscriber
) flowSubscriber
).unsubscribe();
49 synchronized (topics
) {
50 if (!publisher
.hasSubscribers()) {
57 /** A subscriber to a topic. */
58 static class CmsEventFlowSubscriber
implements Flow
.Subscriber
<Map
<String
, Object
>> {
60 private CmsEventSubscriber eventSubscriber
;
62 private Subscription subscription
;
64 public CmsEventFlowSubscriber(String topic
, CmsEventSubscriber eventSubscriber
) {
66 this.eventSubscriber
= eventSubscriber
;
70 public void onSubscribe(Subscription subscription
) {
71 this.subscription
= subscription
;
72 this.subscription
.request(Long
.MAX_VALUE
);
76 public void onNext(Map
<String
, Object
> item
) {
77 eventSubscriber
.onEvent(topic
, item
);
81 public void onError(Throwable throwable
) {
82 // TODO Auto-generated method stub
87 public void onComplete() {
88 // TODO Auto-generated method stub
93 if (subscription
!= null)
94 subscription
.cancel();
96 throw new IllegalStateException("No subscription to cancel");