]> git.argeo.org Git - lgpl/argeo-commons.git/blob - org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java
Improve ACR attribute typing.
[lgpl/argeo-commons.git] / org.argeo.cms / src / org / argeo / cms / internal / runtime / CmsEventBusImpl.java
1 package org.argeo.cms.internal.runtime;
2
3 import java.util.Map;
4 import java.util.TreeMap;
5 import java.util.concurrent.Flow;
6 import java.util.concurrent.Flow.Subscription;
7 import java.util.concurrent.SubmissionPublisher;
8
9 import org.argeo.api.cms.CmsEventBus;
10 import org.argeo.api.cms.CmsEventSubscriber;
11 import org.argeo.api.cms.CmsLog;
12
13 /** An asynchronous {@link CmsEventBus} implementation based on {@link Flow}. */
14 public class CmsEventBusImpl implements CmsEventBus {
15 private final static CmsLog log = CmsLog.getLog(CmsEventBus.class);
16
17 private Map<String, SubmissionPublisher<Map<String, Object>>> topics = new TreeMap<>();
18
19 @Override
20 public void sendEvent(String topic, Map<String, Object> event) {
21 SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
22 if (publisher == null)
23 return; // no one is interested
24 publisher.submit(event);
25 }
26
27 @Override
28 public void addEventSubscriber(String topic, CmsEventSubscriber subscriber) {
29 synchronized (topics) {
30 if (!topics.containsKey(topic))
31 topics.put(topic, new SubmissionPublisher<>());
32 }
33 SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
34 CmsEventFlowSubscriber flowSubscriber = new CmsEventFlowSubscriber(topic, subscriber);
35 publisher.subscribe(flowSubscriber);
36 }
37
38 @Override
39 public void removeEventSubscriber(String topic, CmsEventSubscriber subscriber) {
40 SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
41 if (publisher == null) {
42 log.error("There should be an event topic " + topic);
43 return;
44 }
45 for (Flow.Subscriber<? super Map<String, Object>> flowSubscriber : publisher.getSubscribers()) {
46 if (flowSubscriber instanceof CmsEventFlowSubscriber)
47 ((CmsEventFlowSubscriber) flowSubscriber).unsubscribe();
48 }
49 synchronized (topics) {
50 if (!publisher.hasSubscribers()) {
51 publisher.close();
52 topics.remove(topic);
53 }
54 }
55 }
56
57 /** A subscriber to a topic. */
58 class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
59 private String topic;
60 private CmsEventSubscriber eventSubscriber;
61
62 private Subscription subscription;
63
64 public CmsEventFlowSubscriber(String topic, CmsEventSubscriber eventSubscriber) {
65 this.topic = topic;
66 this.eventSubscriber = eventSubscriber;
67 }
68
69 @Override
70 public void onSubscribe(Subscription subscription) {
71 this.subscription = subscription;
72 this.subscription.request(1);
73 }
74
75 @Override
76 public void onNext(Map<String, Object> item) {
77 eventSubscriber.onEvent(topic, item);
78 this.subscription.request(1);
79 }
80
81 @Override
82 public void onError(Throwable throwable) {
83 if (throwable instanceof Error) {
84 log.error("Unexpected error in event subscriber " + eventSubscriber + " for topic " + topic
85 + ", not trying to resubscribe.", throwable);
86 } else {
87 log.error("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic
88 + ", resubscribing...", throwable);
89 addEventSubscriber(topic, eventSubscriber);
90 }
91 }
92
93 @Override
94 public void onComplete() {
95 if (log.isTraceEnabled())
96 log.trace("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic
97 + " is completed");
98 }
99
100 void unsubscribe() {
101 if (subscription != null)
102 subscription.cancel();
103 else
104 throw new IllegalStateException("No subscription to cancel");
105 }
106
107 }
108
109 }