]> git.argeo.org Git - lgpl/argeo-commons.git/blob - org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java
Remove static default UUID factory
[lgpl/argeo-commons.git] / org.argeo.cms / src / org / argeo / cms / internal / runtime / CmsEventBusImpl.java
1 package org.argeo.cms.internal.runtime;
2
3 import java.util.Map;
4 import java.util.TreeMap;
5 import java.util.concurrent.Flow;
6 import java.util.concurrent.Flow.Subscription;
7 import java.util.concurrent.SubmissionPublisher;
8
9 import org.argeo.api.cms.CmsEventBus;
10 import org.argeo.api.cms.CmsEventSubscriber;
11 import org.argeo.api.cms.CmsLog;
12
13 public class CmsEventBusImpl implements CmsEventBus {
14 private final CmsLog log = CmsLog.getLog(CmsEventBus.class);
15
16 // CMS events
17 private Map<String, SubmissionPublisher<Map<String, Object>>> topics = new TreeMap<>();
18 // private IdentityHashMap<CmsEventSubscriber, List<CmsEventFlowSubscriber>> subscriptions = new IdentityHashMap<>();
19
20 /*
21 * CMS Events
22 */
23 @Override
24 public void sendEvent(String topic, Map<String, Object> event) {
25 SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
26 if (publisher == null)
27 return; // no one is interested
28 publisher.submit(event);
29 }
30
31 @Override
32 public void addEventSubscriber(String topic, CmsEventSubscriber subscriber) {
33 synchronized (topics) {
34 if (!topics.containsKey(topic))
35 topics.put(topic, new SubmissionPublisher<>());
36 }
37 SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
38 CmsEventFlowSubscriber flowSubscriber = new CmsEventFlowSubscriber(topic, subscriber);
39 publisher.subscribe(flowSubscriber);
40 }
41
42 @Override
43 public void removeEventSubscriber(String topic, CmsEventSubscriber subscriber) {
44 SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
45 if (publisher == null) {
46 log.error("There should be an event topic " + topic);
47 return;
48 }
49 for (Flow.Subscriber<? super Map<String, Object>> flowSubscriber : publisher.getSubscribers()) {
50 if (flowSubscriber instanceof CmsEventFlowSubscriber)
51 ((CmsEventFlowSubscriber) flowSubscriber).unsubscribe();
52 }
53 synchronized (topics) {
54 if (!publisher.hasSubscribers()) {
55 publisher.close();
56 topics.remove(topic);
57 }
58 }
59 }
60
61 static class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
62 private String topic;
63 private CmsEventSubscriber eventSubscriber;
64
65 private Subscription subscription;
66
67 public CmsEventFlowSubscriber(String topic, CmsEventSubscriber eventSubscriber) {
68 this.topic = topic;
69 this.eventSubscriber = eventSubscriber;
70 }
71
72 @Override
73 public void onSubscribe(Subscription subscription) {
74 this.subscription = subscription;
75 this.subscription.request(Long.MAX_VALUE);
76 }
77
78 @Override
79 public void onNext(Map<String, Object> item) {
80 eventSubscriber.onEvent(topic, item);
81 }
82
83 @Override
84 public void onError(Throwable throwable) {
85 // TODO Auto-generated method stub
86
87 }
88
89 @Override
90 public void onComplete() {
91 // TODO Auto-generated method stub
92
93 }
94
95 void unsubscribe() {
96 if (subscription != null)
97 subscription.cancel();
98 else
99 throw new IllegalStateException("No subscription to cancel");
100 }
101
102 }
103
104 }