import org.argeo.api.cms.CmsEventSubscriber;
import org.argeo.api.cms.CmsLog;
+/** An asynchronous {@link CmsEventBus} implementation based on {@link Flow}. */
public class CmsEventBusImpl implements CmsEventBus {
- private final CmsLog log = CmsLog.getLog(CmsEventBus.class);
+ private final static CmsLog log = CmsLog.getLog(CmsEventBus.class);
- // CMS events
private Map<String, SubmissionPublisher<Map<String, Object>>> topics = new TreeMap<>();
-// private IdentityHashMap<CmsEventSubscriber, List<CmsEventFlowSubscriber>> subscriptions = new IdentityHashMap<>();
- /*
- * CMS Events
- */
@Override
public void sendEvent(String topic, Map<String, Object> event) {
SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
}
}
- static class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
+ /** A subscriber to a topic. */
+ class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
private String topic;
private CmsEventSubscriber eventSubscriber;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
- subscription.request(1);
+ this.subscription.request(1);
}
@Override
public void onNext(Map<String, Object> item) {
eventSubscriber.onEvent(topic, item);
- subscription.request(1);
+ this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
- // TODO Auto-generated method stub
-
+ if (throwable instanceof Error) {
+ log.error("Unexpected error in event subscriber " + eventSubscriber + " for topic " + topic
+ + ", not trying to resubscribe.", throwable);
+ } else {
+ log.error("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic
+ + ", resubscribing...", throwable);
+ addEventSubscriber(topic, eventSubscriber);
+ }
}
@Override
public void onComplete() {
- // TODO Auto-generated method stub
-
+ if (log.isTraceEnabled())
+ log.trace("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic
+ + " is completed");
}
void unsubscribe() {