Resubscribe event publishers after unexpected exceptions
[lgpl/argeo-commons.git] / org.argeo.cms / src / org / argeo / cms / internal / runtime / CmsEventBusImpl.java
index 7fca23c991ee75fb3cce34bd7b85f3d216ed5be7..9d9e9bcfa58c6e662db3bc4065b67a813c3d2412 100644 (file)
@@ -10,16 +10,12 @@ import org.argeo.api.cms.CmsEventBus;
 import org.argeo.api.cms.CmsEventSubscriber;
 import org.argeo.api.cms.CmsLog;
 
+/** {@link CmsEventBus} implementation based on {@link Flow}. */
 public class CmsEventBusImpl implements CmsEventBus {
-       private final CmsLog log = CmsLog.getLog(CmsEventBus.class);
+       private final static CmsLog log = CmsLog.getLog(CmsEventBus.class);
 
-       // CMS events
        private Map<String, SubmissionPublisher<Map<String, Object>>> topics = new TreeMap<>();
-//     private IdentityHashMap<CmsEventSubscriber, List<CmsEventFlowSubscriber>> subscriptions = new IdentityHashMap<>();
 
-       /*
-        * CMS Events
-        */
        @Override
        public void sendEvent(String topic, Map<String, Object> event) {
                SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
@@ -58,7 +54,8 @@ public class CmsEventBusImpl implements CmsEventBus {
                }
        }
 
-       static class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
+       /** A subscriber to a topic. */
+       class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
                private String topic;
                private CmsEventSubscriber eventSubscriber;
 
@@ -72,25 +69,32 @@ public class CmsEventBusImpl implements CmsEventBus {
                @Override
                public void onSubscribe(Subscription subscription) {
                        this.subscription = subscription;
-                       subscription.request(1);
+                       this.subscription.request(1);
                }
 
                @Override
                public void onNext(Map<String, Object> item) {
                        eventSubscriber.onEvent(topic, item);
-                       subscription.request(1);
+                       this.subscription.request(1);
                }
 
                @Override
                public void onError(Throwable throwable) {
-                       // TODO Auto-generated method stub
-
+                       if (throwable instanceof Error) {
+                               log.error("Unexpected error in event subscriber " + eventSubscriber + " for topic " + topic
+                                               + ", not trying to resubscribe.", throwable);
+                       } else {
+                               log.error("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic
+                                               + ", resubscribing...", throwable);
+                               addEventSubscriber(topic, eventSubscriber);
+                       }
                }
 
                @Override
                public void onComplete() {
-                       // TODO Auto-generated method stub
-
+                       if (log.isTraceEnabled())
+                               log.trace("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic
+                                               + " is completed");
                }
 
                void unsubscribe() {