X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=runtime%2Forg.argeo.slc.server%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fserver%2Fclient%2Fimpl%2FSlcServerHttpClientImpl.java;h=301afb5b1007327d1f440bc07d2e30ec586e4f31;hb=171d606f8c2ba89c9bdc518f39aa88662c4d942a;hp=20c938301dd0de2f3d1605a58553f144305eb0e2;hpb=b0c2c01573db47690afdf723e49fb7fa39561e8e;p=gpl%2Fargeo-slc.git diff --git a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/SlcServerHttpClientImpl.java b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/SlcServerHttpClientImpl.java index 20c938301..301afb5b1 100644 --- a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/SlcServerHttpClientImpl.java +++ b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/SlcServerHttpClientImpl.java @@ -1,5 +1,6 @@ package org.argeo.slc.server.client.impl; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -15,10 +16,12 @@ import org.argeo.slc.execution.ExecutionModuleDescriptor; import org.argeo.slc.msg.ExecutionAnswer; import org.argeo.slc.msg.MsgConstants; import org.argeo.slc.msg.ObjectList; +import org.argeo.slc.msg.event.SlcEvent; import org.argeo.slc.process.RealizedFlow; import org.argeo.slc.process.SlcExecution; import org.argeo.slc.runtime.SlcAgentDescriptor; import org.argeo.slc.server.client.SlcServerHttpClient; +import org.argeo.slc.services.EventPublisherAspect; public class SlcServerHttpClientImpl extends AbstractHttpServicesClient implements SlcServerHttpClient { @@ -28,10 +31,63 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient private final static Log log = LogFactory .getLog(SlcServerHttpClientImpl.class); - private Long retryTimeout = 60 * 1000l; private Long serverReadyTimeout = 120 * 1000l; - public ExecutionAnswer startFlow(String agentId, RealizedFlow realizedFlow) { + public void waitForSlcExecutionFinished(SlcExecution slcExecution, + Long timeout) { + if (slcExecution.getStatus().equals(SlcExecution.STATUS_FINISHED)) + return; + + long begin = System.currentTimeMillis(); + while (System.currentTimeMillis() - begin < timeout(timeout)) { + SlcEvent event = pollEvent(timeout); + String slcExecutionId = event.getHeaders().get( + MsgConstants.PROPERTY_SLC_EXECUTION_ID); + String status = event.getHeaders().get( + MsgConstants.PROPERTY_SLC_EXECUTION_STATUS); + if (slcExecutionId.equals(slcExecution.getUuid()) + && status.equals(SlcExecution.STATUS_FINISHED)) { + return; + } + } + throw new SlcException("SLC Execution not completed after timeout " + + timeout(timeout) + " elapsed."); + } + + public SlcEvent pollEvent(Long timeout) { + long begin = System.currentTimeMillis(); + while (System.currentTimeMillis() - begin < timeout(timeout)) { + Object obj = callService(POLL_EVENT, null); + if (obj instanceof ExecutionAnswer) { + ExecutionAnswer answer = (ExecutionAnswer) obj; + if (answer.isError()) + throw new SlcException( + "Unexpected exception when pollign event: " + + answer.getMessage()); + } else { + return (SlcEvent) obj; + } + } + throw new SlcException("No event received after timeout " + + timeout(timeout) + " elapsed."); + } + + public ExecutionAnswer addEventListener(String eventType, String eventFilter) { + Map parameters = new HashMap(); + parameters.put(SlcEvent.EVENT_TYPE, eventType); + parameters.put(SlcEvent.EVENT_FILTER, eventFilter); + return callService(ADD_EVENT_LISTENER, parameters); + } + + public ExecutionAnswer removeEventListener(String eventType, + String eventFilter) { + Map parameters = new HashMap(); + parameters.put(SlcEvent.EVENT_TYPE, eventType); + parameters.put(SlcEvent.EVENT_FILTER, eventFilter); + return callService(REMOVE_EVENT_LISTENER, parameters); + } + + public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow) { SlcExecution slcExecution = new SlcExecution(); slcExecution.setUuid(UUID.randomUUID().toString()); @@ -41,10 +97,13 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient parameters.put(MsgConstants.PROPERTY_SLC_AGENT_ID, agentId); ExecutionAnswer answer = callService(NEW_SLC_EXECUTION, parameters, slcExecution); - return answer; + if (!answer.isOk()) + throw new SlcException("Could not start flow on agent " + agentId + + ": " + answer.getMessage()); + return slcExecution; } - public ExecutionAnswer startFlowDefault(String moduleName, String flowName, + public SlcExecution startFlowDefault(String moduleName, String flowName, Map args) { SlcAgentDescriptor agentDescriptor = waitForOneAgent(); List lst = listModuleDescriptors(agentDescriptor @@ -71,6 +130,35 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient realizedFlow.setFlowDescriptor(flowDescriptor); return startFlow(agentDescriptor.getUuid(), realizedFlow); + + // FIXME: polling not working when called from test: no unique + // session is created on server side + // SlcExecution slcExecutionFinished = null; + // try { + // addEventListener( + // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null); + // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(), + // realizedFlow); + // + // waitForSlcExecutionFinished(slcExecution, null); + // + // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null); + // for (Serializable sr : ol.getObjects()) { + // SlcExecution se = (SlcExecution) sr; + // if (se.getUuid().equals(slcExecution.getUuid())) { + // slcExecutionFinished = se; + // break; + // } + // } + // + // } finally { + // removeEventListener( + // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null); + // } + // + // if (slcExecutionFinished == null) + // throw new SlcException("No finished SLC Execution."); + // return slcExecutionFinished; } public static ExecutionModuleDescriptor findModule( @@ -130,7 +218,7 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient log.trace("Object list size: " + size); return size == 1; } - }, retryTimeout); + }, null); return (SlcAgentDescriptor) objectList.getObjects().get(0); } @@ -141,14 +229,6 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient throw new SlcException("Server is not ready: " + answer); } - /** - * Timeout in ms after which a safe call will throw an exception. Default is - * 60s. - */ - public void setRetryTimeout(Long retryTimeout) { - this.retryTimeout = retryTimeout; - } - /** * Timeout in ms after which the client will stop waiting for the server to * be ready and throw an exception. Default is 120s.