1 package org
.argeo
.cms
.internal
.runtime
;
4 import java
.util
.TreeMap
;
5 import java
.util
.concurrent
.Flow
;
6 import java
.util
.concurrent
.Flow
.Subscription
;
7 import java
.util
.concurrent
.SubmissionPublisher
;
9 import org
.argeo
.api
.cms
.CmsEventBus
;
10 import org
.argeo
.api
.cms
.CmsEventSubscriber
;
11 import org
.argeo
.api
.cms
.CmsLog
;
13 public class CmsEventBusImpl
implements CmsEventBus
{
14 private final CmsLog log
= CmsLog
.getLog(CmsEventBus
.class);
17 private Map
<String
, SubmissionPublisher
<Map
<String
, Object
>>> topics
= new TreeMap
<>();
18 // private IdentityHashMap<CmsEventSubscriber, List<CmsEventFlowSubscriber>> subscriptions = new IdentityHashMap<>();
24 public void sendEvent(String topic
, Map
<String
, Object
> event
) {
25 SubmissionPublisher
<Map
<String
, Object
>> publisher
= topics
.get(topic
);
26 if (publisher
== null)
27 return; // no one is interested
28 publisher
.submit(event
);
32 public void addEventSubscriber(String topic
, CmsEventSubscriber subscriber
) {
33 synchronized (topics
) {
34 if (!topics
.containsKey(topic
))
35 topics
.put(topic
, new SubmissionPublisher
<>());
37 SubmissionPublisher
<Map
<String
, Object
>> publisher
= topics
.get(topic
);
38 CmsEventFlowSubscriber flowSubscriber
= new CmsEventFlowSubscriber(topic
, subscriber
);
39 publisher
.subscribe(flowSubscriber
);
43 public void removeEventSubscriber(String topic
, CmsEventSubscriber subscriber
) {
44 SubmissionPublisher
<Map
<String
, Object
>> publisher
= topics
.get(topic
);
45 if (publisher
== null) {
46 log
.error("There should be an event topic " + topic
);
49 for (Flow
.Subscriber
<?
super Map
<String
, Object
>> flowSubscriber
: publisher
.getSubscribers()) {
50 if (flowSubscriber
instanceof CmsEventFlowSubscriber
)
51 ((CmsEventFlowSubscriber
) flowSubscriber
).unsubscribe();
53 synchronized (topics
) {
54 if (!publisher
.hasSubscribers()) {
61 static class CmsEventFlowSubscriber
implements Flow
.Subscriber
<Map
<String
, Object
>> {
63 private CmsEventSubscriber eventSubscriber
;
65 private Subscription subscription
;
67 public CmsEventFlowSubscriber(String topic
, CmsEventSubscriber eventSubscriber
) {
69 this.eventSubscriber
= eventSubscriber
;
73 public void onSubscribe(Subscription subscription
) {
74 this.subscription
= subscription
;
75 this.subscription
.request(Long
.MAX_VALUE
);
79 public void onNext(Map
<String
, Object
> item
) {
80 eventSubscriber
.onEvent(topic
, item
);
84 public void onError(Throwable throwable
) {
85 // TODO Auto-generated method stub
90 public void onComplete() {
91 // TODO Auto-generated method stub
96 if (subscription
!= null)
97 subscription
.cancel();
99 throw new IllegalStateException("No subscription to cancel");