package org.argeo.slc.server.client.impl;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.argeo.slc.msg.ExecutionAnswer;
import org.argeo.slc.msg.MsgConstants;
import org.argeo.slc.msg.ObjectList;
+import org.argeo.slc.msg.event.SlcEvent;
import org.argeo.slc.process.RealizedFlow;
import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.runtime.SlcAgentDescriptor;
import org.argeo.slc.server.client.SlcServerHttpClient;
+import org.argeo.slc.services.EventPublisherAspect;
public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
implements SlcServerHttpClient {
private final static Log log = LogFactory
.getLog(SlcServerHttpClientImpl.class);
- private Long retryTimeout = 60 * 1000l;
private Long serverReadyTimeout = 120 * 1000l;
- public ExecutionAnswer startFlow(String agentId, RealizedFlow realizedFlow) {
+ public void waitForSlcExecutionFinished(SlcExecution slcExecution,
+ Long timeout) {
+ if (slcExecution.getStatus().equals(SlcExecution.STATUS_FINISHED))
+ return;
+
+ long begin = System.currentTimeMillis();
+ while (System.currentTimeMillis() - begin < timeout(timeout)) {
+ SlcEvent event = pollEvent(timeout);
+ String slcExecutionId = event.getHeaders().get(
+ MsgConstants.PROPERTY_SLC_EXECUTION_ID);
+ String status = event.getHeaders().get(
+ MsgConstants.PROPERTY_SLC_EXECUTION_STATUS);
+ if (slcExecutionId.equals(slcExecution.getUuid())
+ && status.equals(SlcExecution.STATUS_FINISHED)) {
+ return;
+ }
+ }
+ throw new SlcException("SLC Execution not completed after timeout "
+ + timeout(timeout) + " elapsed.");
+ }
+
+ public SlcEvent pollEvent(Long timeout) {
+ long begin = System.currentTimeMillis();
+ while (System.currentTimeMillis() - begin < timeout(timeout)) {
+ Object obj = callService(POLL_EVENT, null);
+ if (obj instanceof ExecutionAnswer) {
+ ExecutionAnswer answer = (ExecutionAnswer) obj;
+ if (answer.isError())
+ throw new SlcException(
+ "Unexpected exception when pollign event: "
+ + answer.getMessage());
+ } else {
+ return (SlcEvent) obj;
+ }
+ }
+ throw new SlcException("No event received after timeout "
+ + timeout(timeout) + " elapsed.");
+ }
+
+ public ExecutionAnswer addEventListener(String eventType, String eventFilter) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(SlcEvent.EVENT_TYPE, eventType);
+ parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
+ return callService(ADD_EVENT_LISTENER, parameters);
+ }
+
+ public ExecutionAnswer removeEventListener(String eventType,
+ String eventFilter) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(SlcEvent.EVENT_TYPE, eventType);
+ parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
+ return callService(REMOVE_EVENT_LISTENER, parameters);
+ }
+
+ public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow) {
SlcExecution slcExecution = new SlcExecution();
slcExecution.setUuid(UUID.randomUUID().toString());
parameters.put(MsgConstants.PROPERTY_SLC_AGENT_ID, agentId);
ExecutionAnswer answer = callService(NEW_SLC_EXECUTION, parameters,
slcExecution);
- return answer;
+ if (!answer.isOk())
+ throw new SlcException("Could not start flow on agent " + agentId
+ + ": " + answer.getMessage());
+ return slcExecution;
}
- public ExecutionAnswer startFlowDefault(String moduleName, String flowName,
+ public SlcExecution startFlowDefault(String moduleName, String flowName,
Map<String, Object> args) {
SlcAgentDescriptor agentDescriptor = waitForOneAgent();
List<ExecutionModuleDescriptor> lst = listModuleDescriptors(agentDescriptor
realizedFlow.setFlowDescriptor(flowDescriptor);
return startFlow(agentDescriptor.getUuid(), realizedFlow);
+
+ // FIXME: polling not working when called from test: no unique
+ // session is created on server side
+ // SlcExecution slcExecutionFinished = null;
+ // try {
+ // addEventListener(
+ // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
+ // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(),
+ // realizedFlow);
+ //
+ // waitForSlcExecutionFinished(slcExecution, null);
+ //
+ // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null);
+ // for (Serializable sr : ol.getObjects()) {
+ // SlcExecution se = (SlcExecution) sr;
+ // if (se.getUuid().equals(slcExecution.getUuid())) {
+ // slcExecutionFinished = se;
+ // break;
+ // }
+ // }
+ //
+ // } finally {
+ // removeEventListener(
+ // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
+ // }
+ //
+ // if (slcExecutionFinished == null)
+ // throw new SlcException("No finished SLC Execution.");
+ // return slcExecutionFinished;
}
public static ExecutionModuleDescriptor findModule(
log.trace("Object list size: " + size);
return size == 1;
}
- }, retryTimeout);
+ }, null);
return (SlcAgentDescriptor) objectList.getObjects().get(0);
}
throw new SlcException("Server is not ready: " + answer);
}
- /**
- * Timeout in ms after which a safe call will throw an exception. Default is
- * 60s.
- */
- public void setRetryTimeout(Long retryTimeout) {
- this.retryTimeout = retryTimeout;
- }
-
/**
* Timeout in ms after which the client will stop waiting for the server to
* be ready and throw an exception. Default is 120s.