]> git.argeo.org Git - gpl/argeo-slc.git/commitdiff
@update:79; INtroduce event polling (not working yet)
authorMathieu Baudier <mbaudier@argeo.org>
Mon, 8 Jun 2009 15:51:02 +0000 (15:51 +0000)
committerMathieu Baudier <mbaudier@argeo.org>
Mon, 8 Jun 2009 15:51:02 +0000 (15:51 +0000)
git-svn-id: https://svn.argeo.org/slc/trunk@2514 4cfe0d0a-d680-48aa-b62c-e0a02a3f76cc

demo/site/org.argeo.slc.demo.basic/conf/basic.xml
demo/site/org.argeo.slc.demo.log4j/log4j.properties
integration-tests/org.argeo.slc.it.webapp/src/test/java/org/argeo/slc/it/webapp/SimpleScenarioTest.java
runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/SlcServerHttpClient.java
runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/AbstractHttpServicesClient.java
runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/SlcServerHttpClientImpl.java
runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/AddEventListenerController.java
runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/RemoveEventListenerController.java
runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/process/SlcExecution.java
runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java
runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/test/tree/ResultAttributesList.java

index 0e7b93daf62275a973e115b10f92d7693b71cb6d..bc48b51d62aee93d3da2674b57171ef0f47902de 100644 (file)
        <bean id="basic.testResult" parent="slcDefault.test.basicTreeTestResult"\r
                scope="execution">\r
                <property name="listeners" ref="resultListeners" />\r
+               <property name="attributes">\r
+                       <map>\r
+                               <entry key="testedComponentId" value="@{testedComponentId}" />\r
+                       </map>\r
+               </property>\r
                <aop:scoped-proxy />\r
        </bean>\r
 \r
index bae808ee83ac232e2be4a934083826ced3f19cd6..a0ccce5a00ecc18839ac660be898d1ee4d1742ea 100644 (file)
@@ -6,7 +6,8 @@ log4j.logger.org.argeo.slc.execution.ExecutionParameterPostProcessor=TRACE
 log4j.logger.org.argeo.slc.execution.ExecutionContext=DEBUG
 log4j.logger.org.argeo.slc.execution.SimpleExecutionSpec=DEBUG
 
-log4j.logger.org.argeo.slc.web.mvc=DEBUG
+log4j.logger.org.argeo.slc.web.mvc=TRACE
+log4j.logger.org.argeo.slc.jms=TRACE
 
 log4j.logger.org.hibernate=WARN
 
index 9ab5d75847120bcd58affbce686d1b0216cacb7e..16e4239226af2cb6beaadcb6383a8a9017b1a91f 100644 (file)
@@ -2,14 +2,15 @@ package org.argeo.slc.it.webapp;
 
 import org.argeo.slc.Condition;
 import org.argeo.slc.core.test.tree.TreeTestResultList;
+import org.argeo.slc.process.SlcExecution;
 import org.argeo.slc.server.client.SlcServerHttpClient;
 import org.argeo.slc.server.unit.AbstractHttpClientTestCase;
 
 public class SimpleScenarioTest extends AbstractHttpClientTestCase {
        public void testSimpleScenario() throws Exception {
                String moduleName = "org.argeo.slc.demo.basic";
-               assertAnswerOk(getHttpClient().startFlowDefault(moduleName, "main",
-                               null));
+               SlcExecution slcExecution = getHttpClient().startFlowDefault(
+                               moduleName, "main", null);
 
                getHttpClient().callServiceSafe(SlcServerHttpClient.LIST_RESULTS, null,
                                new Condition<TreeTestResultList>() {
@@ -18,5 +19,14 @@ public class SimpleScenarioTest extends AbstractHttpClientTestCase {
                                                return obj.getList().size() == 3;
                                        }
                                }, null);
+               
+               
+               getHttpClient().callServiceSafe(SlcServerHttpClient.LIST_RESULTS, null,
+                               new Condition<TreeTestResultList>() {
+
+                                       public Boolean check(TreeTestResultList obj) {
+                                               return obj.getList().size() == 3;
+                                       }
+                               }, null);
        }
 }
index 6087d2a1b7f470d0f4e15098349892edf64f9bed..59c49b0f6742271af17ed0b6b8aa7b4e89c6e6c8 100644 (file)
@@ -5,7 +5,9 @@ import java.util.Map;
 
 import org.argeo.slc.execution.ExecutionModuleDescriptor;
 import org.argeo.slc.msg.ExecutionAnswer;
+import org.argeo.slc.msg.event.SlcEvent;
 import org.argeo.slc.process.RealizedFlow;
+import org.argeo.slc.process.SlcExecution;
 import org.argeo.slc.runtime.SlcAgentDescriptor;
 
 /** Abstraction of the access to HTTP services of an SLC Server. */
@@ -13,9 +15,27 @@ public interface SlcServerHttpClient extends HttpServicesClient {
        public final static String LIST_AGENTS = "listAgents.service";
        public final static String IS_SERVER_READY = "isServerReady.service";
        public final static String NEW_SLC_EXECUTION = "newSlcExecution.service";
+       public final static String LIST_SLC_EXECUTIONS = "listSlcExecutions.service";
        public final static String GET_MODULE_DESCRIPTOR = "getExecutionDescriptor.service";
        public final static String LIST_MODULE_DESCRIPTORS = "listModulesDescriptors.service";
        public final static String LIST_RESULTS = "listResults.service";
+       public final static String ADD_EVENT_LISTENER = "addEventListener.service";
+       public final static String REMOVE_EVENT_LISTENER = "removeEventListener.service";
+       public final static String POLL_EVENT = "pollEvent.service";
+
+       /** Wait for the provided SlcExecution to be finished. */
+       public void waitForSlcExecutionFinished(SlcExecution slcExecution,
+                       Long timeout);
+
+       /** Block until one of the registered event is finished. */
+       public SlcEvent pollEvent(Long timeout);
+
+       /** Register an event type. */
+       public ExecutionAnswer addEventListener(String eventType, String eventFilter);
+
+       /** Unregister an event type. */
+       public ExecutionAnswer removeEventListener(String eventType,
+                       String eventFilter);
 
        /** Wait for one agent to be available. */
        public SlcAgentDescriptor waitForOneAgent();
@@ -24,16 +44,16 @@ public interface SlcServerHttpClient extends HttpServicesClient {
        public void waitForServerToBeReady();
 
        /** Start an execution flow on the given agent. */
-       public ExecutionAnswer startFlow(String agentId, RealizedFlow realizedFlow);
+       public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow);
 
        /** Assume one agent and one version per module. */
-       public ExecutionAnswer startFlowDefault(String moduleName, String flowName,
+       public SlcExecution startFlowDefault(String moduleName, String flowName,
                        Map<String, Object> args);
 
        /** List execution modules descriptors. */
        public List<ExecutionModuleDescriptor> listModuleDescriptors(String agentId);
 
-       /** Retrieve a single execution module descriptot. */
+       /** Retrieve a single execution module descriptor. */
        public ExecutionModuleDescriptor getModuleDescriptor(String agentId,
                        String moduleName, String version);
 
index 44444f6f13c6c78a3399410f4d0626cbc9332253..a7936916ef110b99d8ba84a4e8f53134fbfe4770 100644 (file)
@@ -8,7 +8,6 @@ import java.io.Writer;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.net.URLEncoder;
-import java.util.Iterator;
 import java.util.Map;
 
 import javax.xml.transform.Source;
@@ -34,7 +33,7 @@ public abstract class AbstractHttpServicesClient implements HttpServicesClient {
        private String encoding = "UTF-8";
 
        private Long retryPeriod = 1000l;
-       private Long defaultTimeout = 10 * 1000l;
+       private Long defaultTimeout = 30 * 1000l;
 
        public <T> T callService(String path, Map<String, String> parameters) {
                return callService(path, parameters, null);
@@ -54,14 +53,11 @@ public abstract class AbstractHttpServicesClient implements HttpServicesClient {
        @SuppressWarnings(value = { "unchecked" })
        public <T> T callServiceSafe(String path, Map<String, String> parameters,
                        Condition<T> condition, Long timeout) {
-               if (timeout == null)
-                       timeout = defaultTimeout;
 
                long begin = System.currentTimeMillis();
                try {
                        Object obj = null;
-                       long duration = System.currentTimeMillis() - begin;
-                       while (duration < timeout) {
+                       while (System.currentTimeMillis() - begin < timeout(timeout)) {
                                try {
                                        obj = callServiceLowLevel(path, parameters, null);
                                } catch (IOException e) {
@@ -93,7 +89,7 @@ public abstract class AbstractHttpServicesClient implements HttpServicesClient {
                                                                + " on "
                                                                + baseUrl
                                                                + " did not return an answer after calling it safely for "
-                                                               + duration + " ms.");
+                                                               + timeout(timeout) + " ms.");
                        return (T) obj;
                } catch (Exception e) {
                        throw new SlcException(
@@ -157,16 +153,19 @@ public abstract class AbstractHttpServicesClient implements HttpServicesClient {
                try {
                        if (parameters != null && parameters.size() != 0) {
                                buf.append('?');
-                               Iterator<String> it = parameters.keySet().iterator();
-                               String key = null;
-                               while (it.hasNext()) {
-                                       if (key != null)
-                                               buf.append('&');
-                                       key = it.next();
-                                       String keyEncoded = URLEncoder.encode(key, urlEncoding);
-                                       String valueEncoded = URLEncoder.encode(
-                                                       parameters.get(key), urlEncoding);
-                                       buf.append(keyEncoded).append('=').append(valueEncoded);
+                               boolean first = true;
+                               for (String key : parameters.keySet()) {
+                                       String value = parameters.get(key);
+                                       if (value != null) {
+                                               if (first)
+                                                       first = false;
+                                               else
+                                                       buf.append('&');
+                                               String keyEncoded = URLEncoder.encode(key, urlEncoding);
+                                               String valueEncoded = URLEncoder.encode(value,
+                                                               urlEncoding);
+                                               buf.append(keyEncoded).append('=').append(valueEncoded);
+                                       }
                                }
                        }
 
@@ -176,6 +175,12 @@ public abstract class AbstractHttpServicesClient implements HttpServicesClient {
                }
        }
 
+       public Long timeout(Long timeout) {
+               if (timeout == null)
+                       timeout = getDefaultTimeout();
+               return timeout;
+       }
+
        public void setUnmarshaller(Unmarshaller unmarshaller) {
                this.unmarshaller = unmarshaller;
        }
@@ -202,9 +207,13 @@ public abstract class AbstractHttpServicesClient implements HttpServicesClient {
                this.encoding = encoding;
        }
 
-       /** Default is 30s*/
+       /** Default is 30s */
        public void setDefaultTimeout(Long defaultTimeout) {
                this.defaultTimeout = defaultTimeout;
        }
 
+       public Long getDefaultTimeout() {
+               return defaultTimeout;
+       }
+
 }
index 20c938301dd0de2f3d1605a58553f144305eb0e2..301afb5b1007327d1f440bc07d2e30ec586e4f31 100644 (file)
@@ -1,5 +1,6 @@
 package org.argeo.slc.server.client.impl;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -15,10 +16,12 @@ import org.argeo.slc.execution.ExecutionModuleDescriptor;
 import org.argeo.slc.msg.ExecutionAnswer;
 import org.argeo.slc.msg.MsgConstants;
 import org.argeo.slc.msg.ObjectList;
+import org.argeo.slc.msg.event.SlcEvent;
 import org.argeo.slc.process.RealizedFlow;
 import org.argeo.slc.process.SlcExecution;
 import org.argeo.slc.runtime.SlcAgentDescriptor;
 import org.argeo.slc.server.client.SlcServerHttpClient;
+import org.argeo.slc.services.EventPublisherAspect;
 
 public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
                implements SlcServerHttpClient {
@@ -28,10 +31,63 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
        private final static Log log = LogFactory
                        .getLog(SlcServerHttpClientImpl.class);
 
-       private Long retryTimeout = 60 * 1000l;
        private Long serverReadyTimeout = 120 * 1000l;
 
-       public ExecutionAnswer startFlow(String agentId, RealizedFlow realizedFlow) {
+       public void waitForSlcExecutionFinished(SlcExecution slcExecution,
+                       Long timeout) {
+               if (slcExecution.getStatus().equals(SlcExecution.STATUS_FINISHED))
+                       return;
+
+               long begin = System.currentTimeMillis();
+               while (System.currentTimeMillis() - begin < timeout(timeout)) {
+                       SlcEvent event = pollEvent(timeout);
+                       String slcExecutionId = event.getHeaders().get(
+                                       MsgConstants.PROPERTY_SLC_EXECUTION_ID);
+                       String status = event.getHeaders().get(
+                                       MsgConstants.PROPERTY_SLC_EXECUTION_STATUS);
+                       if (slcExecutionId.equals(slcExecution.getUuid())
+                                       && status.equals(SlcExecution.STATUS_FINISHED)) {
+                               return;
+                       }
+               }
+               throw new SlcException("SLC Execution not completed after timeout "
+                               + timeout(timeout) + " elapsed.");
+       }
+
+       public SlcEvent pollEvent(Long timeout) {
+               long begin = System.currentTimeMillis();
+               while (System.currentTimeMillis() - begin < timeout(timeout)) {
+                       Object obj = callService(POLL_EVENT, null);
+                       if (obj instanceof ExecutionAnswer) {
+                               ExecutionAnswer answer = (ExecutionAnswer) obj;
+                               if (answer.isError())
+                                       throw new SlcException(
+                                                       "Unexpected exception when pollign event: "
+                                                                       + answer.getMessage());
+                       } else {
+                               return (SlcEvent) obj;
+                       }
+               }
+               throw new SlcException("No event received after timeout "
+                               + timeout(timeout) + " elapsed.");
+       }
+
+       public ExecutionAnswer addEventListener(String eventType, String eventFilter) {
+               Map<String, String> parameters = new HashMap<String, String>();
+               parameters.put(SlcEvent.EVENT_TYPE, eventType);
+               parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
+               return callService(ADD_EVENT_LISTENER, parameters);
+       }
+
+       public ExecutionAnswer removeEventListener(String eventType,
+                       String eventFilter) {
+               Map<String, String> parameters = new HashMap<String, String>();
+               parameters.put(SlcEvent.EVENT_TYPE, eventType);
+               parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
+               return callService(REMOVE_EVENT_LISTENER, parameters);
+       }
+
+       public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow) {
                SlcExecution slcExecution = new SlcExecution();
                slcExecution.setUuid(UUID.randomUUID().toString());
 
@@ -41,10 +97,13 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
                parameters.put(MsgConstants.PROPERTY_SLC_AGENT_ID, agentId);
                ExecutionAnswer answer = callService(NEW_SLC_EXECUTION, parameters,
                                slcExecution);
-               return answer;
+               if (!answer.isOk())
+                       throw new SlcException("Could not start flow on agent " + agentId
+                                       + ": " + answer.getMessage());
+               return slcExecution;
        }
 
-       public ExecutionAnswer startFlowDefault(String moduleName, String flowName,
+       public SlcExecution startFlowDefault(String moduleName, String flowName,
                        Map<String, Object> args) {
                SlcAgentDescriptor agentDescriptor = waitForOneAgent();
                List<ExecutionModuleDescriptor> lst = listModuleDescriptors(agentDescriptor
@@ -71,6 +130,35 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
                realizedFlow.setFlowDescriptor(flowDescriptor);
 
                return startFlow(agentDescriptor.getUuid(), realizedFlow);
+
+               // FIXME: polling not working when called from test: no unique
+               // session is created on server side
+               // SlcExecution slcExecutionFinished = null;
+               // try {
+               // addEventListener(
+               // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
+               // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(),
+               // realizedFlow);
+               //
+               // waitForSlcExecutionFinished(slcExecution, null);
+               //
+               // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null);
+               // for (Serializable sr : ol.getObjects()) {
+               // SlcExecution se = (SlcExecution) sr;
+               // if (se.getUuid().equals(slcExecution.getUuid())) {
+               // slcExecutionFinished = se;
+               // break;
+               // }
+               // }
+               //
+               // } finally {
+               // removeEventListener(
+               // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
+               // }
+               //
+               // if (slcExecutionFinished == null)
+               // throw new SlcException("No finished SLC Execution.");
+               // return slcExecutionFinished;
        }
 
        public static ExecutionModuleDescriptor findModule(
@@ -130,7 +218,7 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
                                                        log.trace("Object list size: " + size);
                                                return size == 1;
                                        }
-                               }, retryTimeout);
+                               }, null);
                return (SlcAgentDescriptor) objectList.getObjects().get(0);
        }
 
@@ -141,14 +229,6 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
                        throw new SlcException("Server is not ready: " + answer);
        }
 
-       /**
-        * Timeout in ms after which a safe call will throw an exception. Default is
-        * 60s.
-        */
-       public void setRetryTimeout(Long retryTimeout) {
-               this.retryTimeout = retryTimeout;
-       }
-
        /**
         * Timeout in ms after which the client will stop waiting for the server to
         * be ready and throw an exception. Default is 120s.
index d72d53beca1e20afa364d459173042cf9b637c37..d23239668bcbb6af10e7729a840d2d1b97f91db7 100644 (file)
@@ -3,6 +3,8 @@ package org.argeo.slc.web.mvc.event;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.argeo.slc.msg.event.SlcEvent;
 import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
 import org.argeo.slc.msg.event.SlcEventListenerRegister;
@@ -10,6 +12,8 @@ import org.argeo.slc.web.mvc.AbstractServiceController;
 import org.springframework.web.servlet.ModelAndView;
 
 public class AddEventListenerController extends AbstractServiceController {
+       private final static Log log = LogFactory
+                       .getLog(AddEventListenerController.class);
 
        private SlcEventListenerRegister eventListenerRegister;
 
@@ -23,6 +27,10 @@ public class AddEventListenerController extends AbstractServiceController {
                eventListenerRegister
                                .addEventListenerDescriptor(new SlcEventListenerDescriptor(
                                                eventType, eventFilter));
+               if (log.isTraceEnabled())
+                       log.trace("Registered listener on register "
+                                       + eventListenerRegister.getId() + " for type " + eventType
+                                       + ", filter=" + eventFilter);
        }
 
        public void setEventListenerRegister(
index 4d9f3624a523db087b08e3358d0322fc60972559..400228e114e03bbbc78c51db694c3406786b244f 100644 (file)
@@ -3,6 +3,8 @@ package org.argeo.slc.web.mvc.event;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.argeo.slc.msg.event.SlcEvent;
 import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
 import org.argeo.slc.msg.event.SlcEventListenerRegister;
@@ -10,6 +12,8 @@ import org.argeo.slc.web.mvc.AbstractServiceController;
 import org.springframework.web.servlet.ModelAndView;
 
 public class RemoveEventListenerController extends AbstractServiceController {
+       private final static Log log = LogFactory
+                       .getLog(RemoveEventListenerController.class);
 
        private SlcEventListenerRegister eventListenerRegister;
 
@@ -23,6 +27,10 @@ public class RemoveEventListenerController extends AbstractServiceController {
                eventListenerRegister
                                .removeEventListenerDescriptor(new SlcEventListenerDescriptor(
                                                eventType, eventFilter));
+               if (log.isTraceEnabled())
+                       log.trace("Removed listener from register "
+                                       + eventListenerRegister.getId() + " for type " + eventType
+                                       + ", filter=" + eventFilter);
        }
 
        public void setEventListenerRegister(
index 5e745a2f0d619a1ac8099516112abbd463d49480..0bf52e20054f7892e62e0ad6c503dd9810b3def1 100644 (file)
@@ -8,6 +8,7 @@ import java.util.TreeMap;
 \r
 public class SlcExecution implements Serializable {\r
        private static final long serialVersionUID = 1L;\r
+       public final static String STATUS_NONE = "DEFAULT";\r
        public final static String STATUS_SCHEDULED = "SCHEDULED";\r
        public final static String STATUS_RUNNING = "RUNNING";\r
        public final static String STATUS_FINISHED = "FINISHED";\r
@@ -20,7 +21,7 @@ public class SlcExecution implements Serializable {
        private String host;\r
        private String user;\r
        private String type;\r
-       private String status;\r
+       private String status = STATUS_NONE;\r
        private Map<String, String> attributes = new TreeMap<String, String>();\r
 \r
        private List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();\r
index eb0fa7b6083429f5e656a42dcaa87942c2078889..0ce259d97373d54e297c6cb5ab841463a733b0f6 100644 (file)
@@ -45,6 +45,8 @@ public class JmsSlcEventListener implements SlcEventListener {
                if (descriptors.size() == 0) {
                        // No listeners, just waiting
                        try {
+                               if(log.isTraceEnabled())
+                                       log.trace("No event listener registered, sleeping...");
                                Thread.sleep(timeout);
                        } catch (InterruptedException e) {
                                // silent
index 237af48b54b7f46b249af68965ecfbea7d3b3808..bd8d56455eb9a96137a8499de54d54b838eed5ab 100644 (file)
@@ -3,6 +3,9 @@ package org.argeo.slc.core.test.tree;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.argeo.slc.msg.ObjectList;
+
+/** @deprecated user {@link ObjectList} instead. */
 public class ResultAttributesList {
        private List<ResultAttributes> list = new ArrayList<ResultAttributes>();