X-Git-Url: https://git.argeo.org/?a=blobdiff_plain;f=org.argeo.cms%2Fsrc%2Forg%2Fargeo%2Fcms%2Finternal%2Fruntime%2FCmsEventBusImpl.java;h=07af1b8494f52b41c9e5c705a8e1c3cef90e5324;hb=691171dad89a839d8c1aff859fcd4242129ce57b;hp=eaa63756dc05e61800ffdb8668bf4ba200907ff3;hpb=a81a19a9a3e45a89ed3b7c783bd5747cc27f6aa1;p=lgpl%2Fargeo-commons.git diff --git a/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java index eaa63756d..07af1b849 100644 --- a/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java +++ b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java @@ -10,16 +10,12 @@ import org.argeo.api.cms.CmsEventBus; import org.argeo.api.cms.CmsEventSubscriber; import org.argeo.api.cms.CmsLog; +/** An asynchronous {@link CmsEventBus} implementation based on {@link Flow}. */ public class CmsEventBusImpl implements CmsEventBus { - private final CmsLog log = CmsLog.getLog(CmsEventBus.class); + private final static CmsLog log = CmsLog.getLog(CmsEventBus.class); - // CMS events private Map>> topics = new TreeMap<>(); -// private IdentityHashMap> subscriptions = new IdentityHashMap<>(); - /* - * CMS Events - */ @Override public void sendEvent(String topic, Map event) { SubmissionPublisher> publisher = topics.get(topic); @@ -58,7 +54,8 @@ public class CmsEventBusImpl implements CmsEventBus { } } - static class CmsEventFlowSubscriber implements Flow.Subscriber> { + /** A subscriber to a topic. */ + class CmsEventFlowSubscriber implements Flow.Subscriber> { private String topic; private CmsEventSubscriber eventSubscriber; @@ -72,24 +69,32 @@ public class CmsEventBusImpl implements CmsEventBus { @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; - this.subscription.request(Long.MAX_VALUE); + this.subscription.request(1); } @Override public void onNext(Map item) { eventSubscriber.onEvent(topic, item); + this.subscription.request(1); } @Override public void onError(Throwable throwable) { - // TODO Auto-generated method stub - + if (throwable instanceof Error) { + log.error("Unexpected error in event subscriber " + eventSubscriber + " for topic " + topic + + ", not trying to resubscribe.", throwable); + } else { + log.error("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic + + ", resubscribing...", throwable); + addEventSubscriber(topic, eventSubscriber); + } } @Override public void onComplete() { - // TODO Auto-generated method stub - + if (log.isTraceEnabled()) + log.trace("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic + + " is completed"); } void unsubscribe() {