From: Mathieu Date: Tue, 13 Dec 2022 04:51:49 +0000 (+0100) Subject: Resubscribe event publishers after unexpected exceptions X-Git-Tag: v2.1.110~1^2 X-Git-Url: https://git.argeo.org/?p=lgpl%2Fargeo-commons.git;a=commitdiff_plain;h=81eb40f8a68ee85349f26f3468ee52b9af121732 Resubscribe event publishers after unexpected exceptions --- diff --git a/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java index 99f6c1d8d..9d9e9bcfa 100644 --- a/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java +++ b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java @@ -12,7 +12,7 @@ import org.argeo.api.cms.CmsLog; /** {@link CmsEventBus} implementation based on {@link Flow}. */ public class CmsEventBusImpl implements CmsEventBus { - private final CmsLog log = CmsLog.getLog(CmsEventBus.class); + private final static CmsLog log = CmsLog.getLog(CmsEventBus.class); private Map>> topics = new TreeMap<>(); @@ -55,7 +55,7 @@ public class CmsEventBusImpl implements CmsEventBus { } /** A subscriber to a topic. */ - static class CmsEventFlowSubscriber implements Flow.Subscriber> { + class CmsEventFlowSubscriber implements Flow.Subscriber> { private String topic; private CmsEventSubscriber eventSubscriber; @@ -69,24 +69,32 @@ public class CmsEventBusImpl implements CmsEventBus { @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; - this.subscription.request(Long.MAX_VALUE); + this.subscription.request(1); } @Override public void onNext(Map item) { eventSubscriber.onEvent(topic, item); + this.subscription.request(1); } @Override public void onError(Throwable throwable) { - // TODO Auto-generated method stub - + if (throwable instanceof Error) { + log.error("Unexpected error in event subscriber " + eventSubscriber + " for topic " + topic + + ", not trying to resubscribe.", throwable); + } else { + log.error("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic + + ", resubscribing...", throwable); + addEventSubscriber(topic, eventSubscriber); + } } @Override public void onComplete() { - // TODO Auto-generated method stub - + if (log.isTraceEnabled()) + log.trace("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic + + " is completed"); } void unsubscribe() {