From 171d606f8c2ba89c9bdc518f39aa88662c4d942a Mon Sep 17 00:00:00 2001 From: Mathieu Baudier Date: Mon, 8 Jun 2009 15:51:02 +0000 Subject: [PATCH] @update:79; INtroduce event polling (not working yet) git-svn-id: https://svn.argeo.org/slc/trunk@2514 4cfe0d0a-d680-48aa-b62c-e0a02a3f76cc --- .../org.argeo.slc.demo.basic/conf/basic.xml | 5 + .../org.argeo.slc.demo.log4j/log4j.properties | 3 +- .../slc/it/webapp/SimpleScenarioTest.java | 14 ++- .../server/client/SlcServerHttpClient.java | 26 ++++- .../impl/AbstractHttpServicesClient.java | 45 +++++--- .../client/impl/SlcServerHttpClientImpl.java | 106 +++++++++++++++--- .../mvc/event/AddEventListenerController.java | 8 ++ .../event/RemoveEventListenerController.java | 8 ++ .../org/argeo/slc/process/SlcExecution.java | 3 +- .../argeo/slc/jms/JmsSlcEventListener.java | 2 + .../core/test/tree/ResultAttributesList.java | 3 + 11 files changed, 185 insertions(+), 38 deletions(-) diff --git a/demo/site/org.argeo.slc.demo.basic/conf/basic.xml b/demo/site/org.argeo.slc.demo.basic/conf/basic.xml index 0e7b93daf..bc48b51d6 100644 --- a/demo/site/org.argeo.slc.demo.basic/conf/basic.xml +++ b/demo/site/org.argeo.slc.demo.basic/conf/basic.xml @@ -99,6 +99,11 @@ + + + + + diff --git a/demo/site/org.argeo.slc.demo.log4j/log4j.properties b/demo/site/org.argeo.slc.demo.log4j/log4j.properties index bae808ee8..a0ccce5a0 100644 --- a/demo/site/org.argeo.slc.demo.log4j/log4j.properties +++ b/demo/site/org.argeo.slc.demo.log4j/log4j.properties @@ -6,7 +6,8 @@ log4j.logger.org.argeo.slc.execution.ExecutionParameterPostProcessor=TRACE log4j.logger.org.argeo.slc.execution.ExecutionContext=DEBUG log4j.logger.org.argeo.slc.execution.SimpleExecutionSpec=DEBUG -log4j.logger.org.argeo.slc.web.mvc=DEBUG +log4j.logger.org.argeo.slc.web.mvc=TRACE +log4j.logger.org.argeo.slc.jms=TRACE log4j.logger.org.hibernate=WARN diff --git a/integration-tests/org.argeo.slc.it.webapp/src/test/java/org/argeo/slc/it/webapp/SimpleScenarioTest.java b/integration-tests/org.argeo.slc.it.webapp/src/test/java/org/argeo/slc/it/webapp/SimpleScenarioTest.java index 9ab5d7584..16e423922 100644 --- a/integration-tests/org.argeo.slc.it.webapp/src/test/java/org/argeo/slc/it/webapp/SimpleScenarioTest.java +++ b/integration-tests/org.argeo.slc.it.webapp/src/test/java/org/argeo/slc/it/webapp/SimpleScenarioTest.java @@ -2,14 +2,15 @@ package org.argeo.slc.it.webapp; import org.argeo.slc.Condition; import org.argeo.slc.core.test.tree.TreeTestResultList; +import org.argeo.slc.process.SlcExecution; import org.argeo.slc.server.client.SlcServerHttpClient; import org.argeo.slc.server.unit.AbstractHttpClientTestCase; public class SimpleScenarioTest extends AbstractHttpClientTestCase { public void testSimpleScenario() throws Exception { String moduleName = "org.argeo.slc.demo.basic"; - assertAnswerOk(getHttpClient().startFlowDefault(moduleName, "main", - null)); + SlcExecution slcExecution = getHttpClient().startFlowDefault( + moduleName, "main", null); getHttpClient().callServiceSafe(SlcServerHttpClient.LIST_RESULTS, null, new Condition() { @@ -18,5 +19,14 @@ public class SimpleScenarioTest extends AbstractHttpClientTestCase { return obj.getList().size() == 3; } }, null); + + + getHttpClient().callServiceSafe(SlcServerHttpClient.LIST_RESULTS, null, + new Condition() { + + public Boolean check(TreeTestResultList obj) { + return obj.getList().size() == 3; + } + }, null); } } diff --git a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/SlcServerHttpClient.java b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/SlcServerHttpClient.java index 6087d2a1b..59c49b0f6 100644 --- a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/SlcServerHttpClient.java +++ b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/SlcServerHttpClient.java @@ -5,7 +5,9 @@ import java.util.Map; import org.argeo.slc.execution.ExecutionModuleDescriptor; import org.argeo.slc.msg.ExecutionAnswer; +import org.argeo.slc.msg.event.SlcEvent; import org.argeo.slc.process.RealizedFlow; +import org.argeo.slc.process.SlcExecution; import org.argeo.slc.runtime.SlcAgentDescriptor; /** Abstraction of the access to HTTP services of an SLC Server. */ @@ -13,9 +15,27 @@ public interface SlcServerHttpClient extends HttpServicesClient { public final static String LIST_AGENTS = "listAgents.service"; public final static String IS_SERVER_READY = "isServerReady.service"; public final static String NEW_SLC_EXECUTION = "newSlcExecution.service"; + public final static String LIST_SLC_EXECUTIONS = "listSlcExecutions.service"; public final static String GET_MODULE_DESCRIPTOR = "getExecutionDescriptor.service"; public final static String LIST_MODULE_DESCRIPTORS = "listModulesDescriptors.service"; public final static String LIST_RESULTS = "listResults.service"; + public final static String ADD_EVENT_LISTENER = "addEventListener.service"; + public final static String REMOVE_EVENT_LISTENER = "removeEventListener.service"; + public final static String POLL_EVENT = "pollEvent.service"; + + /** Wait for the provided SlcExecution to be finished. */ + public void waitForSlcExecutionFinished(SlcExecution slcExecution, + Long timeout); + + /** Block until one of the registered event is finished. */ + public SlcEvent pollEvent(Long timeout); + + /** Register an event type. */ + public ExecutionAnswer addEventListener(String eventType, String eventFilter); + + /** Unregister an event type. */ + public ExecutionAnswer removeEventListener(String eventType, + String eventFilter); /** Wait for one agent to be available. */ public SlcAgentDescriptor waitForOneAgent(); @@ -24,16 +44,16 @@ public interface SlcServerHttpClient extends HttpServicesClient { public void waitForServerToBeReady(); /** Start an execution flow on the given agent. */ - public ExecutionAnswer startFlow(String agentId, RealizedFlow realizedFlow); + public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow); /** Assume one agent and one version per module. */ - public ExecutionAnswer startFlowDefault(String moduleName, String flowName, + public SlcExecution startFlowDefault(String moduleName, String flowName, Map args); /** List execution modules descriptors. */ public List listModuleDescriptors(String agentId); - /** Retrieve a single execution module descriptot. */ + /** Retrieve a single execution module descriptor. */ public ExecutionModuleDescriptor getModuleDescriptor(String agentId, String moduleName, String version); diff --git a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/AbstractHttpServicesClient.java b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/AbstractHttpServicesClient.java index 44444f6f1..a7936916e 100644 --- a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/AbstractHttpServicesClient.java +++ b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/AbstractHttpServicesClient.java @@ -8,7 +8,6 @@ import java.io.Writer; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; -import java.util.Iterator; import java.util.Map; import javax.xml.transform.Source; @@ -34,7 +33,7 @@ public abstract class AbstractHttpServicesClient implements HttpServicesClient { private String encoding = "UTF-8"; private Long retryPeriod = 1000l; - private Long defaultTimeout = 10 * 1000l; + private Long defaultTimeout = 30 * 1000l; public T callService(String path, Map parameters) { return callService(path, parameters, null); @@ -54,14 +53,11 @@ public abstract class AbstractHttpServicesClient implements HttpServicesClient { @SuppressWarnings(value = { "unchecked" }) public T callServiceSafe(String path, Map parameters, Condition condition, Long timeout) { - if (timeout == null) - timeout = defaultTimeout; long begin = System.currentTimeMillis(); try { Object obj = null; - long duration = System.currentTimeMillis() - begin; - while (duration < timeout) { + while (System.currentTimeMillis() - begin < timeout(timeout)) { try { obj = callServiceLowLevel(path, parameters, null); } catch (IOException e) { @@ -93,7 +89,7 @@ public abstract class AbstractHttpServicesClient implements HttpServicesClient { + " on " + baseUrl + " did not return an answer after calling it safely for " - + duration + " ms."); + + timeout(timeout) + " ms."); return (T) obj; } catch (Exception e) { throw new SlcException( @@ -157,16 +153,19 @@ public abstract class AbstractHttpServicesClient implements HttpServicesClient { try { if (parameters != null && parameters.size() != 0) { buf.append('?'); - Iterator it = parameters.keySet().iterator(); - String key = null; - while (it.hasNext()) { - if (key != null) - buf.append('&'); - key = it.next(); - String keyEncoded = URLEncoder.encode(key, urlEncoding); - String valueEncoded = URLEncoder.encode( - parameters.get(key), urlEncoding); - buf.append(keyEncoded).append('=').append(valueEncoded); + boolean first = true; + for (String key : parameters.keySet()) { + String value = parameters.get(key); + if (value != null) { + if (first) + first = false; + else + buf.append('&'); + String keyEncoded = URLEncoder.encode(key, urlEncoding); + String valueEncoded = URLEncoder.encode(value, + urlEncoding); + buf.append(keyEncoded).append('=').append(valueEncoded); + } } } @@ -176,6 +175,12 @@ public abstract class AbstractHttpServicesClient implements HttpServicesClient { } } + public Long timeout(Long timeout) { + if (timeout == null) + timeout = getDefaultTimeout(); + return timeout; + } + public void setUnmarshaller(Unmarshaller unmarshaller) { this.unmarshaller = unmarshaller; } @@ -202,9 +207,13 @@ public abstract class AbstractHttpServicesClient implements HttpServicesClient { this.encoding = encoding; } - /** Default is 30s*/ + /** Default is 30s */ public void setDefaultTimeout(Long defaultTimeout) { this.defaultTimeout = defaultTimeout; } + public Long getDefaultTimeout() { + return defaultTimeout; + } + } diff --git a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/SlcServerHttpClientImpl.java b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/SlcServerHttpClientImpl.java index 20c938301..301afb5b1 100644 --- a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/SlcServerHttpClientImpl.java +++ b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/SlcServerHttpClientImpl.java @@ -1,5 +1,6 @@ package org.argeo.slc.server.client.impl; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -15,10 +16,12 @@ import org.argeo.slc.execution.ExecutionModuleDescriptor; import org.argeo.slc.msg.ExecutionAnswer; import org.argeo.slc.msg.MsgConstants; import org.argeo.slc.msg.ObjectList; +import org.argeo.slc.msg.event.SlcEvent; import org.argeo.slc.process.RealizedFlow; import org.argeo.slc.process.SlcExecution; import org.argeo.slc.runtime.SlcAgentDescriptor; import org.argeo.slc.server.client.SlcServerHttpClient; +import org.argeo.slc.services.EventPublisherAspect; public class SlcServerHttpClientImpl extends AbstractHttpServicesClient implements SlcServerHttpClient { @@ -28,10 +31,63 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient private final static Log log = LogFactory .getLog(SlcServerHttpClientImpl.class); - private Long retryTimeout = 60 * 1000l; private Long serverReadyTimeout = 120 * 1000l; - public ExecutionAnswer startFlow(String agentId, RealizedFlow realizedFlow) { + public void waitForSlcExecutionFinished(SlcExecution slcExecution, + Long timeout) { + if (slcExecution.getStatus().equals(SlcExecution.STATUS_FINISHED)) + return; + + long begin = System.currentTimeMillis(); + while (System.currentTimeMillis() - begin < timeout(timeout)) { + SlcEvent event = pollEvent(timeout); + String slcExecutionId = event.getHeaders().get( + MsgConstants.PROPERTY_SLC_EXECUTION_ID); + String status = event.getHeaders().get( + MsgConstants.PROPERTY_SLC_EXECUTION_STATUS); + if (slcExecutionId.equals(slcExecution.getUuid()) + && status.equals(SlcExecution.STATUS_FINISHED)) { + return; + } + } + throw new SlcException("SLC Execution not completed after timeout " + + timeout(timeout) + " elapsed."); + } + + public SlcEvent pollEvent(Long timeout) { + long begin = System.currentTimeMillis(); + while (System.currentTimeMillis() - begin < timeout(timeout)) { + Object obj = callService(POLL_EVENT, null); + if (obj instanceof ExecutionAnswer) { + ExecutionAnswer answer = (ExecutionAnswer) obj; + if (answer.isError()) + throw new SlcException( + "Unexpected exception when pollign event: " + + answer.getMessage()); + } else { + return (SlcEvent) obj; + } + } + throw new SlcException("No event received after timeout " + + timeout(timeout) + " elapsed."); + } + + public ExecutionAnswer addEventListener(String eventType, String eventFilter) { + Map parameters = new HashMap(); + parameters.put(SlcEvent.EVENT_TYPE, eventType); + parameters.put(SlcEvent.EVENT_FILTER, eventFilter); + return callService(ADD_EVENT_LISTENER, parameters); + } + + public ExecutionAnswer removeEventListener(String eventType, + String eventFilter) { + Map parameters = new HashMap(); + parameters.put(SlcEvent.EVENT_TYPE, eventType); + parameters.put(SlcEvent.EVENT_FILTER, eventFilter); + return callService(REMOVE_EVENT_LISTENER, parameters); + } + + public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow) { SlcExecution slcExecution = new SlcExecution(); slcExecution.setUuid(UUID.randomUUID().toString()); @@ -41,10 +97,13 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient parameters.put(MsgConstants.PROPERTY_SLC_AGENT_ID, agentId); ExecutionAnswer answer = callService(NEW_SLC_EXECUTION, parameters, slcExecution); - return answer; + if (!answer.isOk()) + throw new SlcException("Could not start flow on agent " + agentId + + ": " + answer.getMessage()); + return slcExecution; } - public ExecutionAnswer startFlowDefault(String moduleName, String flowName, + public SlcExecution startFlowDefault(String moduleName, String flowName, Map args) { SlcAgentDescriptor agentDescriptor = waitForOneAgent(); List lst = listModuleDescriptors(agentDescriptor @@ -71,6 +130,35 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient realizedFlow.setFlowDescriptor(flowDescriptor); return startFlow(agentDescriptor.getUuid(), realizedFlow); + + // FIXME: polling not working when called from test: no unique + // session is created on server side + // SlcExecution slcExecutionFinished = null; + // try { + // addEventListener( + // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null); + // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(), + // realizedFlow); + // + // waitForSlcExecutionFinished(slcExecution, null); + // + // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null); + // for (Serializable sr : ol.getObjects()) { + // SlcExecution se = (SlcExecution) sr; + // if (se.getUuid().equals(slcExecution.getUuid())) { + // slcExecutionFinished = se; + // break; + // } + // } + // + // } finally { + // removeEventListener( + // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null); + // } + // + // if (slcExecutionFinished == null) + // throw new SlcException("No finished SLC Execution."); + // return slcExecutionFinished; } public static ExecutionModuleDescriptor findModule( @@ -130,7 +218,7 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient log.trace("Object list size: " + size); return size == 1; } - }, retryTimeout); + }, null); return (SlcAgentDescriptor) objectList.getObjects().get(0); } @@ -141,14 +229,6 @@ public class SlcServerHttpClientImpl extends AbstractHttpServicesClient throw new SlcException("Server is not ready: " + answer); } - /** - * Timeout in ms after which a safe call will throw an exception. Default is - * 60s. - */ - public void setRetryTimeout(Long retryTimeout) { - this.retryTimeout = retryTimeout; - } - /** * Timeout in ms after which the client will stop waiting for the server to * be ready and throw an exception. Default is 120s. diff --git a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/AddEventListenerController.java b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/AddEventListenerController.java index d72d53bec..d23239668 100644 --- a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/AddEventListenerController.java +++ b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/AddEventListenerController.java @@ -3,6 +3,8 @@ package org.argeo.slc.web.mvc.event; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.argeo.slc.msg.event.SlcEvent; import org.argeo.slc.msg.event.SlcEventListenerDescriptor; import org.argeo.slc.msg.event.SlcEventListenerRegister; @@ -10,6 +12,8 @@ import org.argeo.slc.web.mvc.AbstractServiceController; import org.springframework.web.servlet.ModelAndView; public class AddEventListenerController extends AbstractServiceController { + private final static Log log = LogFactory + .getLog(AddEventListenerController.class); private SlcEventListenerRegister eventListenerRegister; @@ -23,6 +27,10 @@ public class AddEventListenerController extends AbstractServiceController { eventListenerRegister .addEventListenerDescriptor(new SlcEventListenerDescriptor( eventType, eventFilter)); + if (log.isTraceEnabled()) + log.trace("Registered listener on register " + + eventListenerRegister.getId() + " for type " + eventType + + ", filter=" + eventFilter); } public void setEventListenerRegister( diff --git a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/RemoveEventListenerController.java b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/RemoveEventListenerController.java index 4d9f3624a..400228e11 100644 --- a/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/RemoveEventListenerController.java +++ b/runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/web/mvc/event/RemoveEventListenerController.java @@ -3,6 +3,8 @@ package org.argeo.slc.web.mvc.event; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.argeo.slc.msg.event.SlcEvent; import org.argeo.slc.msg.event.SlcEventListenerDescriptor; import org.argeo.slc.msg.event.SlcEventListenerRegister; @@ -10,6 +12,8 @@ import org.argeo.slc.web.mvc.AbstractServiceController; import org.springframework.web.servlet.ModelAndView; public class RemoveEventListenerController extends AbstractServiceController { + private final static Log log = LogFactory + .getLog(RemoveEventListenerController.class); private SlcEventListenerRegister eventListenerRegister; @@ -23,6 +27,10 @@ public class RemoveEventListenerController extends AbstractServiceController { eventListenerRegister .removeEventListenerDescriptor(new SlcEventListenerDescriptor( eventType, eventFilter)); + if (log.isTraceEnabled()) + log.trace("Removed listener from register " + + eventListenerRegister.getId() + " for type " + eventType + + ", filter=" + eventFilter); } public void setEventListenerRegister( diff --git a/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/process/SlcExecution.java b/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/process/SlcExecution.java index 5e745a2f0..0bf52e200 100644 --- a/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/process/SlcExecution.java +++ b/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/process/SlcExecution.java @@ -8,6 +8,7 @@ import java.util.TreeMap; public class SlcExecution implements Serializable { private static final long serialVersionUID = 1L; + public final static String STATUS_NONE = "DEFAULT"; public final static String STATUS_SCHEDULED = "SCHEDULED"; public final static String STATUS_RUNNING = "RUNNING"; public final static String STATUS_FINISHED = "FINISHED"; @@ -20,7 +21,7 @@ public class SlcExecution implements Serializable { private String host; private String user; private String type; - private String status; + private String status = STATUS_NONE; private Map attributes = new TreeMap(); private List steps = new ArrayList(); diff --git a/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java b/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java index eb0fa7b60..0ce259d97 100644 --- a/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java +++ b/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsSlcEventListener.java @@ -45,6 +45,8 @@ public class JmsSlcEventListener implements SlcEventListener { if (descriptors.size() == 0) { // No listeners, just waiting try { + if(log.isTraceEnabled()) + log.trace("No event listener registered, sleeping..."); Thread.sleep(timeout); } catch (InterruptedException e) { // silent diff --git a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/test/tree/ResultAttributesList.java b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/test/tree/ResultAttributesList.java index 237af48b5..bd8d56455 100644 --- a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/test/tree/ResultAttributesList.java +++ b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/test/tree/ResultAttributesList.java @@ -3,6 +3,9 @@ package org.argeo.slc.core.test.tree; import java.util.ArrayList; import java.util.List; +import org.argeo.slc.msg.ObjectList; + +/** @deprecated user {@link ObjectList} instead. */ public class ResultAttributesList { private List list = new ArrayList(); -- 2.39.2