From: Mathieu Date: Tue, 13 Dec 2022 04:53:26 +0000 (+0100) Subject: Merge remote-tracking branch 'origin/unstable' into testing X-Git-Tag: v2.1.110~1 X-Git-Url: https://git.argeo.org/?p=lgpl%2Fargeo-commons.git;a=commitdiff_plain;h=4e36f6abd646130818f63c3db01e5148663b56b4;hp=0e265f172767a49a0ea9681dad1730d55a7a9d6d Merge remote-tracking branch 'origin/unstable' into testing --- diff --git a/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java index 99f6c1d8d..9d9e9bcfa 100644 --- a/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java +++ b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java @@ -12,7 +12,7 @@ import org.argeo.api.cms.CmsLog; /** {@link CmsEventBus} implementation based on {@link Flow}. */ public class CmsEventBusImpl implements CmsEventBus { - private final CmsLog log = CmsLog.getLog(CmsEventBus.class); + private final static CmsLog log = CmsLog.getLog(CmsEventBus.class); private Map>> topics = new TreeMap<>(); @@ -55,7 +55,7 @@ public class CmsEventBusImpl implements CmsEventBus { } /** A subscriber to a topic. */ - static class CmsEventFlowSubscriber implements Flow.Subscriber> { + class CmsEventFlowSubscriber implements Flow.Subscriber> { private String topic; private CmsEventSubscriber eventSubscriber; @@ -69,24 +69,32 @@ public class CmsEventBusImpl implements CmsEventBus { @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; - this.subscription.request(Long.MAX_VALUE); + this.subscription.request(1); } @Override public void onNext(Map item) { eventSubscriber.onEvent(topic, item); + this.subscription.request(1); } @Override public void onError(Throwable throwable) { - // TODO Auto-generated method stub - + if (throwable instanceof Error) { + log.error("Unexpected error in event subscriber " + eventSubscriber + " for topic " + topic + + ", not trying to resubscribe.", throwable); + } else { + log.error("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic + + ", resubscribing...", throwable); + addEventSubscriber(topic, eventSubscriber); + } } @Override public void onComplete() { - // TODO Auto-generated method stub - + if (log.isTraceEnabled()) + log.trace("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic + + " is completed"); } void unsubscribe() {