import org.argeo.api.cms.CmsEventSubscriber;
import org.argeo.api.cms.CmsLog;
-/** {@link CmsEventBus} implementation based on {@link Flow}. */
+/** An asynchronous {@link CmsEventBus} implementation based on {@link Flow}. */
public class CmsEventBusImpl implements CmsEventBus {
- private final CmsLog log = CmsLog.getLog(CmsEventBus.class);
+ private final static CmsLog log = CmsLog.getLog(CmsEventBus.class);
private Map<String, SubmissionPublisher<Map<String, Object>>> topics = new TreeMap<>();
}
/** A subscriber to a topic. */
- static class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
+ class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
private String topic;
private CmsEventSubscriber eventSubscriber;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
- this.subscription.request(Long.MAX_VALUE);
+ this.subscription.request(1);
}
@Override
public void onNext(Map<String, Object> item) {
eventSubscriber.onEvent(topic, item);
+ this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
- // TODO Auto-generated method stub
-
+ if (throwable instanceof Error) {
+ log.error("Unexpected error in event subscriber " + eventSubscriber + " for topic " + topic
+ + ", not trying to resubscribe.", throwable);
+ } else {
+ log.error("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic
+ + ", resubscribing...", throwable);
+ addEventSubscriber(topic, eventSubscriber);
+ }
}
@Override
public void onComplete() {
- // TODO Auto-generated method stub
-
+ if (log.isTraceEnabled())
+ log.trace("Unexpected exception in event subscriber " + eventSubscriber + " for topic " + topic
+ + " is completed");
}
void unsubscribe() {