1 package org
.argeo
.cms
.internal
.runtime
;
4 import java
.util
.TreeMap
;
5 import java
.util
.concurrent
.Flow
;
6 import java
.util
.concurrent
.Flow
.Subscription
;
7 import java
.util
.concurrent
.SubmissionPublisher
;
9 import org
.argeo
.api
.cms
.CmsEventBus
;
10 import org
.argeo
.api
.cms
.CmsEventSubscriber
;
11 import org
.argeo
.api
.cms
.CmsLog
;
13 /** An asynchronous {@link CmsEventBus} implementation based on {@link Flow}. */
14 public class CmsEventBusImpl
implements CmsEventBus
{
15 private final static CmsLog log
= CmsLog
.getLog(CmsEventBus
.class);
17 private Map
<String
, SubmissionPublisher
<Map
<String
, Object
>>> topics
= new TreeMap
<>();
20 public void sendEvent(String topic
, Map
<String
, Object
> event
) {
21 SubmissionPublisher
<Map
<String
, Object
>> publisher
= topics
.get(topic
);
22 if (publisher
== null)
23 return; // no one is interested
24 publisher
.submit(event
);
28 public void addEventSubscriber(String topic
, CmsEventSubscriber subscriber
) {
29 synchronized (topics
) {
30 if (!topics
.containsKey(topic
))
31 topics
.put(topic
, new SubmissionPublisher
<>());
33 SubmissionPublisher
<Map
<String
, Object
>> publisher
= topics
.get(topic
);
34 CmsEventFlowSubscriber flowSubscriber
= new CmsEventFlowSubscriber(topic
, subscriber
);
35 publisher
.subscribe(flowSubscriber
);
39 public void removeEventSubscriber(String topic
, CmsEventSubscriber subscriber
) {
40 SubmissionPublisher
<Map
<String
, Object
>> publisher
= topics
.get(topic
);
41 if (publisher
== null) {
42 log
.error("There should be an event topic " + topic
);
45 for (Flow
.Subscriber
<?
super Map
<String
, Object
>> flowSubscriber
: publisher
.getSubscribers()) {
46 if (flowSubscriber
instanceof CmsEventFlowSubscriber
)
47 ((CmsEventFlowSubscriber
) flowSubscriber
).unsubscribe();
49 synchronized (topics
) {
50 if (!publisher
.hasSubscribers()) {
57 /** A subscriber to a topic. */
58 class CmsEventFlowSubscriber
implements Flow
.Subscriber
<Map
<String
, Object
>> {
60 private CmsEventSubscriber eventSubscriber
;
62 private Subscription subscription
;
64 public CmsEventFlowSubscriber(String topic
, CmsEventSubscriber eventSubscriber
) {
66 this.eventSubscriber
= eventSubscriber
;
70 public void onSubscribe(Subscription subscription
) {
71 this.subscription
= subscription
;
72 this.subscription
.request(1);
76 public void onNext(Map
<String
, Object
> item
) {
77 eventSubscriber
.onEvent(topic
, item
);
78 this.subscription
.request(1);
82 public void onError(Throwable throwable
) {
83 if (throwable
instanceof Error
) {
84 log
.error("Unexpected error in event subscriber " + eventSubscriber
+ " for topic " + topic
85 + ", not trying to resubscribe.", throwable
);
87 log
.error("Unexpected exception in event subscriber " + eventSubscriber
+ " for topic " + topic
88 + ", resubscribing...", throwable
);
89 addEventSubscriber(topic
, eventSubscriber
);
94 public void onComplete() {
95 if (log
.isTraceEnabled())
96 log
.trace("Unexpected exception in event subscriber " + eventSubscriber
+ " for topic " + topic
101 if (subscription
!= null)
102 subscription
.cancel();
104 throw new IllegalStateException("No subscription to cancel");