Resubscribe event publishers after unexpected exceptions
authorMathieu <mbaudier@argeo.org>
Tue, 13 Dec 2022 04:51:49 +0000 (05:51 +0100)
committerMathieu <mbaudier@argeo.org>
Tue, 13 Dec 2022 04:51:49 +0000 (05:51 +0100)
org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsEventBusImpl.java

index 99f6c1d8d18a548d6ecb6adfff5b6fd91abf5325..9d9e9bcfa58c6e662db3bc4065b67a813c3d2412 100644 (file)
@@ -12,7 +12,7 @@ import org.argeo.api.cms.CmsLog;
 
 /** {@link CmsEventBus} implementation based on {@link Flow}. */
 public class CmsEventBusImpl implements CmsEventBus {
 
 /** {@link CmsEventBus} implementation based on {@link Flow}. */
 public class CmsEventBusImpl implements CmsEventBus {
-       private final CmsLog log = CmsLog.getLog(CmsEventBus.class);
+       private final static CmsLog log = CmsLog.getLog(CmsEventBus.class);
 
        private Map<String, SubmissionPublisher<Map<String, Object>>> topics = new TreeMap<>();
 
 
        private Map<String, SubmissionPublisher<Map<String, Object>>> topics = new TreeMap<>();
 
@@ -55,7 +55,7 @@ public class CmsEventBusImpl implements CmsEventBus {
        }
 
        /** A subscriber to a topic. */
        }
 
        /** A subscriber to a topic. */
-       static class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
+       class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
                private String topic;
                private CmsEventSubscriber eventSubscriber;
 
                private String topic;
                private CmsEventSubscriber eventSubscriber;
 
@@ -69,24 +69,32 @@ public class CmsEventBusImpl implements CmsEventBus {
                @Override
                public void onSubscribe(Subscription subscription) {
                        this.subscription = subscription;
                @Override
                public void onSubscribe(Subscription subscription) {
                        this.subscription = subscription;
-                       this.subscription.request(Long.MAX_VALUE);
+                       this.subscription.request(1);
                }
 
                @Override
                public void onNext(Map<String, Object> item) {
                        eventSubscriber.onEvent(topic, item);
                }
 
                @Override
                public void onNext(Map<String, Object> item) {
                        eventSubscriber.onEvent(topic, item);
+                       this.subscription.request(1);
                }
 
                @Override
                public void onError(Throwable throwable) {
                }
 
                @Override
                public void onError(Throwable throwable) {
-                       // TODO Auto-generated method stub
-
+                       if (throwable instanceof Error) {
+                               log.error("Unexpected error in event subscriber " + eventSubscriber + " for topic " + topic
+                                               + ", not trying to resubscribe.", throwable);
+                       } else {
+                               log.error("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic
+                                               + ", resubscribing...", throwable);
+                               addEventSubscriber(topic, eventSubscriber);
+                       }
                }
 
                @Override
                public void onComplete() {
                }
 
                @Override
                public void onComplete() {
-                       // TODO Auto-generated method stub
-
+                       if (log.isTraceEnabled())
+                               log.trace("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic
+                                               + " is completed");
                }
 
                void unsubscribe() {
                }
 
                void unsubscribe() {