]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/SlcServerHttpClientImpl.java
@update:79; INtroduce event polling (not working yet)
[gpl/argeo-slc.git] / runtime / org.argeo.slc.server / src / main / java / org / argeo / slc / server / client / impl / SlcServerHttpClientImpl.java
index 20c938301dd0de2f3d1605a58553f144305eb0e2..301afb5b1007327d1f440bc07d2e30ec586e4f31 100644 (file)
@@ -1,5 +1,6 @@
 package org.argeo.slc.server.client.impl;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -15,10 +16,12 @@ import org.argeo.slc.execution.ExecutionModuleDescriptor;
 import org.argeo.slc.msg.ExecutionAnswer;
 import org.argeo.slc.msg.MsgConstants;
 import org.argeo.slc.msg.ObjectList;
+import org.argeo.slc.msg.event.SlcEvent;
 import org.argeo.slc.process.RealizedFlow;
 import org.argeo.slc.process.SlcExecution;
 import org.argeo.slc.runtime.SlcAgentDescriptor;
 import org.argeo.slc.server.client.SlcServerHttpClient;
+import org.argeo.slc.services.EventPublisherAspect;
 
 public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
                implements SlcServerHttpClient {
@@ -28,10 +31,63 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
        private final static Log log = LogFactory
                        .getLog(SlcServerHttpClientImpl.class);
 
-       private Long retryTimeout = 60 * 1000l;
        private Long serverReadyTimeout = 120 * 1000l;
 
-       public ExecutionAnswer startFlow(String agentId, RealizedFlow realizedFlow) {
+       public void waitForSlcExecutionFinished(SlcExecution slcExecution,
+                       Long timeout) {
+               if (slcExecution.getStatus().equals(SlcExecution.STATUS_FINISHED))
+                       return;
+
+               long begin = System.currentTimeMillis();
+               while (System.currentTimeMillis() - begin < timeout(timeout)) {
+                       SlcEvent event = pollEvent(timeout);
+                       String slcExecutionId = event.getHeaders().get(
+                                       MsgConstants.PROPERTY_SLC_EXECUTION_ID);
+                       String status = event.getHeaders().get(
+                                       MsgConstants.PROPERTY_SLC_EXECUTION_STATUS);
+                       if (slcExecutionId.equals(slcExecution.getUuid())
+                                       && status.equals(SlcExecution.STATUS_FINISHED)) {
+                               return;
+                       }
+               }
+               throw new SlcException("SLC Execution not completed after timeout "
+                               + timeout(timeout) + " elapsed.");
+       }
+
+       public SlcEvent pollEvent(Long timeout) {
+               long begin = System.currentTimeMillis();
+               while (System.currentTimeMillis() - begin < timeout(timeout)) {
+                       Object obj = callService(POLL_EVENT, null);
+                       if (obj instanceof ExecutionAnswer) {
+                               ExecutionAnswer answer = (ExecutionAnswer) obj;
+                               if (answer.isError())
+                                       throw new SlcException(
+                                                       "Unexpected exception when pollign event: "
+                                                                       + answer.getMessage());
+                       } else {
+                               return (SlcEvent) obj;
+                       }
+               }
+               throw new SlcException("No event received after timeout "
+                               + timeout(timeout) + " elapsed.");
+       }
+
+       public ExecutionAnswer addEventListener(String eventType, String eventFilter) {
+               Map<String, String> parameters = new HashMap<String, String>();
+               parameters.put(SlcEvent.EVENT_TYPE, eventType);
+               parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
+               return callService(ADD_EVENT_LISTENER, parameters);
+       }
+
+       public ExecutionAnswer removeEventListener(String eventType,
+                       String eventFilter) {
+               Map<String, String> parameters = new HashMap<String, String>();
+               parameters.put(SlcEvent.EVENT_TYPE, eventType);
+               parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
+               return callService(REMOVE_EVENT_LISTENER, parameters);
+       }
+
+       public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow) {
                SlcExecution slcExecution = new SlcExecution();
                slcExecution.setUuid(UUID.randomUUID().toString());
 
@@ -41,10 +97,13 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
                parameters.put(MsgConstants.PROPERTY_SLC_AGENT_ID, agentId);
                ExecutionAnswer answer = callService(NEW_SLC_EXECUTION, parameters,
                                slcExecution);
-               return answer;
+               if (!answer.isOk())
+                       throw new SlcException("Could not start flow on agent " + agentId
+                                       + ": " + answer.getMessage());
+               return slcExecution;
        }
 
-       public ExecutionAnswer startFlowDefault(String moduleName, String flowName,
+       public SlcExecution startFlowDefault(String moduleName, String flowName,
                        Map<String, Object> args) {
                SlcAgentDescriptor agentDescriptor = waitForOneAgent();
                List<ExecutionModuleDescriptor> lst = listModuleDescriptors(agentDescriptor
@@ -71,6 +130,35 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
                realizedFlow.setFlowDescriptor(flowDescriptor);
 
                return startFlow(agentDescriptor.getUuid(), realizedFlow);
+
+               // FIXME: polling not working when called from test: no unique
+               // session is created on server side
+               // SlcExecution slcExecutionFinished = null;
+               // try {
+               // addEventListener(
+               // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
+               // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(),
+               // realizedFlow);
+               //
+               // waitForSlcExecutionFinished(slcExecution, null);
+               //
+               // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null);
+               // for (Serializable sr : ol.getObjects()) {
+               // SlcExecution se = (SlcExecution) sr;
+               // if (se.getUuid().equals(slcExecution.getUuid())) {
+               // slcExecutionFinished = se;
+               // break;
+               // }
+               // }
+               //
+               // } finally {
+               // removeEventListener(
+               // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
+               // }
+               //
+               // if (slcExecutionFinished == null)
+               // throw new SlcException("No finished SLC Execution.");
+               // return slcExecutionFinished;
        }
 
        public static ExecutionModuleDescriptor findModule(
@@ -130,7 +218,7 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
                                                        log.trace("Object list size: " + size);
                                                return size == 1;
                                        }
-                               }, retryTimeout);
+                               }, null);
                return (SlcAgentDescriptor) objectList.getObjects().get(0);
        }
 
@@ -141,14 +229,6 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
                        throw new SlcException("Server is not ready: " + answer);
        }
 
-       /**
-        * Timeout in ms after which a safe call will throw an exception. Default is
-        * 60s.
-        */
-       public void setRetryTimeout(Long retryTimeout) {
-               this.retryTimeout = retryTimeout;
-       }
-
        /**
         * Timeout in ms after which the client will stop waiting for the server to
         * be ready and throw an exception. Default is 120s.