import org.argeo.api.cms.CmsEventSubscriber;
import org.argeo.api.cms.CmsLog;
+/** {@link CmsEventBus} implementation based on {@link Flow}. */
public class CmsEventBusImpl implements CmsEventBus {
private final CmsLog log = CmsLog.getLog(CmsEventBus.class);
- // CMS events
private Map<String, SubmissionPublisher<Map<String, Object>>> topics = new TreeMap<>();
-// private IdentityHashMap<CmsEventSubscriber, List<CmsEventFlowSubscriber>> subscriptions = new IdentityHashMap<>();
- /*
- * CMS Events
- */
@Override
public void sendEvent(String topic, Map<String, Object> event) {
SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
}
}
+ /** A subscriber to a topic. */
static class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
private String topic;
private CmsEventSubscriber eventSubscriber;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
- subscription.request(1);
+ this.subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Map<String, Object> item) {
eventSubscriber.onEvent(topic, item);
- subscription.request(1);
}
@Override