<bean id="basic.testResult" parent="slcDefault.test.basicTreeTestResult"\r
scope="execution">\r
<property name="listeners" ref="resultListeners" />\r
+ <property name="attributes">\r
+ <map>\r
+ <entry key="testedComponentId" value="@{testedComponentId}" />\r
+ </map>\r
+ </property>\r
<aop:scoped-proxy />\r
</bean>\r
\r
log4j.logger.org.argeo.slc.execution.ExecutionContext=DEBUG
log4j.logger.org.argeo.slc.execution.SimpleExecutionSpec=DEBUG
-log4j.logger.org.argeo.slc.web.mvc=DEBUG
+log4j.logger.org.argeo.slc.web.mvc=TRACE
+log4j.logger.org.argeo.slc.jms=TRACE
log4j.logger.org.hibernate=WARN
import org.argeo.slc.Condition;
import org.argeo.slc.core.test.tree.TreeTestResultList;
+import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.server.client.SlcServerHttpClient;
import org.argeo.slc.server.unit.AbstractHttpClientTestCase;
public class SimpleScenarioTest extends AbstractHttpClientTestCase {
public void testSimpleScenario() throws Exception {
String moduleName = "org.argeo.slc.demo.basic";
- assertAnswerOk(getHttpClient().startFlowDefault(moduleName, "main",
- null));
+ SlcExecution slcExecution = getHttpClient().startFlowDefault(
+ moduleName, "main", null);
getHttpClient().callServiceSafe(SlcServerHttpClient.LIST_RESULTS, null,
new Condition<TreeTestResultList>() {
return obj.getList().size() == 3;
}
}, null);
+
+
+ getHttpClient().callServiceSafe(SlcServerHttpClient.LIST_RESULTS, null,
+ new Condition<TreeTestResultList>() {
+
+ public Boolean check(TreeTestResultList obj) {
+ return obj.getList().size() == 3;
+ }
+ }, null);
}
}
import org.argeo.slc.execution.ExecutionModuleDescriptor;
import org.argeo.slc.msg.ExecutionAnswer;
+import org.argeo.slc.msg.event.SlcEvent;
import org.argeo.slc.process.RealizedFlow;
+import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.runtime.SlcAgentDescriptor;
/** Abstraction of the access to HTTP services of an SLC Server. */
public final static String LIST_AGENTS = "listAgents.service";
public final static String IS_SERVER_READY = "isServerReady.service";
public final static String NEW_SLC_EXECUTION = "newSlcExecution.service";
+ public final static String LIST_SLC_EXECUTIONS = "listSlcExecutions.service";
public final static String GET_MODULE_DESCRIPTOR = "getExecutionDescriptor.service";
public final static String LIST_MODULE_DESCRIPTORS = "listModulesDescriptors.service";
public final static String LIST_RESULTS = "listResults.service";
+ public final static String ADD_EVENT_LISTENER = "addEventListener.service";
+ public final static String REMOVE_EVENT_LISTENER = "removeEventListener.service";
+ public final static String POLL_EVENT = "pollEvent.service";
+
+ /** Wait for the provided SlcExecution to be finished. */
+ public void waitForSlcExecutionFinished(SlcExecution slcExecution,
+ Long timeout);
+
+ /** Block until one of the registered event is finished. */
+ public SlcEvent pollEvent(Long timeout);
+
+ /** Register an event type. */
+ public ExecutionAnswer addEventListener(String eventType, String eventFilter);
+
+ /** Unregister an event type. */
+ public ExecutionAnswer removeEventListener(String eventType,
+ String eventFilter);
/** Wait for one agent to be available. */
public SlcAgentDescriptor waitForOneAgent();
public void waitForServerToBeReady();
/** Start an execution flow on the given agent. */
- public ExecutionAnswer startFlow(String agentId, RealizedFlow realizedFlow);
+ public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow);
/** Assume one agent and one version per module. */
- public ExecutionAnswer startFlowDefault(String moduleName, String flowName,
+ public SlcExecution startFlowDefault(String moduleName, String flowName,
Map<String, Object> args);
/** List execution modules descriptors. */
public List<ExecutionModuleDescriptor> listModuleDescriptors(String agentId);
- /** Retrieve a single execution module descriptot. */
+ /** Retrieve a single execution module descriptor. */
public ExecutionModuleDescriptor getModuleDescriptor(String agentId,
String moduleName, String version);
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
-import java.util.Iterator;
import java.util.Map;
import javax.xml.transform.Source;
private String encoding = "UTF-8";
private Long retryPeriod = 1000l;
- private Long defaultTimeout = 10 * 1000l;
+ private Long defaultTimeout = 30 * 1000l;
public <T> T callService(String path, Map<String, String> parameters) {
return callService(path, parameters, null);
@SuppressWarnings(value = { "unchecked" })
public <T> T callServiceSafe(String path, Map<String, String> parameters,
Condition<T> condition, Long timeout) {
- if (timeout == null)
- timeout = defaultTimeout;
long begin = System.currentTimeMillis();
try {
Object obj = null;
- long duration = System.currentTimeMillis() - begin;
- while (duration < timeout) {
+ while (System.currentTimeMillis() - begin < timeout(timeout)) {
try {
obj = callServiceLowLevel(path, parameters, null);
} catch (IOException e) {
+ " on "
+ baseUrl
+ " did not return an answer after calling it safely for "
- + duration + " ms.");
+ + timeout(timeout) + " ms.");
return (T) obj;
} catch (Exception e) {
throw new SlcException(
try {
if (parameters != null && parameters.size() != 0) {
buf.append('?');
- Iterator<String> it = parameters.keySet().iterator();
- String key = null;
- while (it.hasNext()) {
- if (key != null)
- buf.append('&');
- key = it.next();
- String keyEncoded = URLEncoder.encode(key, urlEncoding);
- String valueEncoded = URLEncoder.encode(
- parameters.get(key), urlEncoding);
- buf.append(keyEncoded).append('=').append(valueEncoded);
+ boolean first = true;
+ for (String key : parameters.keySet()) {
+ String value = parameters.get(key);
+ if (value != null) {
+ if (first)
+ first = false;
+ else
+ buf.append('&');
+ String keyEncoded = URLEncoder.encode(key, urlEncoding);
+ String valueEncoded = URLEncoder.encode(value,
+ urlEncoding);
+ buf.append(keyEncoded).append('=').append(valueEncoded);
+ }
}
}
}
}
+ public Long timeout(Long timeout) {
+ if (timeout == null)
+ timeout = getDefaultTimeout();
+ return timeout;
+ }
+
public void setUnmarshaller(Unmarshaller unmarshaller) {
this.unmarshaller = unmarshaller;
}
this.encoding = encoding;
}
- /** Default is 30s*/
+ /** Default is 30s */
public void setDefaultTimeout(Long defaultTimeout) {
this.defaultTimeout = defaultTimeout;
}
+ public Long getDefaultTimeout() {
+ return defaultTimeout;
+ }
+
}
package org.argeo.slc.server.client.impl;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.argeo.slc.msg.ExecutionAnswer;
import org.argeo.slc.msg.MsgConstants;
import org.argeo.slc.msg.ObjectList;
+import org.argeo.slc.msg.event.SlcEvent;
import org.argeo.slc.process.RealizedFlow;
import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.runtime.SlcAgentDescriptor;
import org.argeo.slc.server.client.SlcServerHttpClient;
+import org.argeo.slc.services.EventPublisherAspect;
public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
implements SlcServerHttpClient {
private final static Log log = LogFactory
.getLog(SlcServerHttpClientImpl.class);
- private Long retryTimeout = 60 * 1000l;
private Long serverReadyTimeout = 120 * 1000l;
- public ExecutionAnswer startFlow(String agentId, RealizedFlow realizedFlow) {
+ public void waitForSlcExecutionFinished(SlcExecution slcExecution,
+ Long timeout) {
+ if (slcExecution.getStatus().equals(SlcExecution.STATUS_FINISHED))
+ return;
+
+ long begin = System.currentTimeMillis();
+ while (System.currentTimeMillis() - begin < timeout(timeout)) {
+ SlcEvent event = pollEvent(timeout);
+ String slcExecutionId = event.getHeaders().get(
+ MsgConstants.PROPERTY_SLC_EXECUTION_ID);
+ String status = event.getHeaders().get(
+ MsgConstants.PROPERTY_SLC_EXECUTION_STATUS);
+ if (slcExecutionId.equals(slcExecution.getUuid())
+ && status.equals(SlcExecution.STATUS_FINISHED)) {
+ return;
+ }
+ }
+ throw new SlcException("SLC Execution not completed after timeout "
+ + timeout(timeout) + " elapsed.");
+ }
+
+ public SlcEvent pollEvent(Long timeout) {
+ long begin = System.currentTimeMillis();
+ while (System.currentTimeMillis() - begin < timeout(timeout)) {
+ Object obj = callService(POLL_EVENT, null);
+ if (obj instanceof ExecutionAnswer) {
+ ExecutionAnswer answer = (ExecutionAnswer) obj;
+ if (answer.isError())
+ throw new SlcException(
+ "Unexpected exception when pollign event: "
+ + answer.getMessage());
+ } else {
+ return (SlcEvent) obj;
+ }
+ }
+ throw new SlcException("No event received after timeout "
+ + timeout(timeout) + " elapsed.");
+ }
+
+ public ExecutionAnswer addEventListener(String eventType, String eventFilter) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(SlcEvent.EVENT_TYPE, eventType);
+ parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
+ return callService(ADD_EVENT_LISTENER, parameters);
+ }
+
+ public ExecutionAnswer removeEventListener(String eventType,
+ String eventFilter) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(SlcEvent.EVENT_TYPE, eventType);
+ parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
+ return callService(REMOVE_EVENT_LISTENER, parameters);
+ }
+
+ public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow) {
SlcExecution slcExecution = new SlcExecution();
slcExecution.setUuid(UUID.randomUUID().toString());
parameters.put(MsgConstants.PROPERTY_SLC_AGENT_ID, agentId);
ExecutionAnswer answer = callService(NEW_SLC_EXECUTION, parameters,
slcExecution);
- return answer;
+ if (!answer.isOk())
+ throw new SlcException("Could not start flow on agent " + agentId
+ + ": " + answer.getMessage());
+ return slcExecution;
}
- public ExecutionAnswer startFlowDefault(String moduleName, String flowName,
+ public SlcExecution startFlowDefault(String moduleName, String flowName,
Map<String, Object> args) {
SlcAgentDescriptor agentDescriptor = waitForOneAgent();
List<ExecutionModuleDescriptor> lst = listModuleDescriptors(agentDescriptor
realizedFlow.setFlowDescriptor(flowDescriptor);
return startFlow(agentDescriptor.getUuid(), realizedFlow);
+
+ // FIXME: polling not working when called from test: no unique
+ // session is created on server side
+ // SlcExecution slcExecutionFinished = null;
+ // try {
+ // addEventListener(
+ // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
+ // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(),
+ // realizedFlow);
+ //
+ // waitForSlcExecutionFinished(slcExecution, null);
+ //
+ // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null);
+ // for (Serializable sr : ol.getObjects()) {
+ // SlcExecution se = (SlcExecution) sr;
+ // if (se.getUuid().equals(slcExecution.getUuid())) {
+ // slcExecutionFinished = se;
+ // break;
+ // }
+ // }
+ //
+ // } finally {
+ // removeEventListener(
+ // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
+ // }
+ //
+ // if (slcExecutionFinished == null)
+ // throw new SlcException("No finished SLC Execution.");
+ // return slcExecutionFinished;
}
public static ExecutionModuleDescriptor findModule(
log.trace("Object list size: " + size);
return size == 1;
}
- }, retryTimeout);
+ }, null);
return (SlcAgentDescriptor) objectList.getObjects().get(0);
}
throw new SlcException("Server is not ready: " + answer);
}
- /**
- * Timeout in ms after which a safe call will throw an exception. Default is
- * 60s.
- */
- public void setRetryTimeout(Long retryTimeout) {
- this.retryTimeout = retryTimeout;
- }
-
/**
* Timeout in ms after which the client will stop waiting for the server to
* be ready and throw an exception. Default is 120s.
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.argeo.slc.msg.event.SlcEvent;
import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
import org.argeo.slc.msg.event.SlcEventListenerRegister;
import org.springframework.web.servlet.ModelAndView;
public class AddEventListenerController extends AbstractServiceController {
+ private final static Log log = LogFactory
+ .getLog(AddEventListenerController.class);
private SlcEventListenerRegister eventListenerRegister;
eventListenerRegister
.addEventListenerDescriptor(new SlcEventListenerDescriptor(
eventType, eventFilter));
+ if (log.isTraceEnabled())
+ log.trace("Registered listener on register "
+ + eventListenerRegister.getId() + " for type " + eventType
+ + ", filter=" + eventFilter);
}
public void setEventListenerRegister(
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.argeo.slc.msg.event.SlcEvent;
import org.argeo.slc.msg.event.SlcEventListenerDescriptor;
import org.argeo.slc.msg.event.SlcEventListenerRegister;
import org.springframework.web.servlet.ModelAndView;
public class RemoveEventListenerController extends AbstractServiceController {
+ private final static Log log = LogFactory
+ .getLog(RemoveEventListenerController.class);
private SlcEventListenerRegister eventListenerRegister;
eventListenerRegister
.removeEventListenerDescriptor(new SlcEventListenerDescriptor(
eventType, eventFilter));
+ if (log.isTraceEnabled())
+ log.trace("Removed listener from register "
+ + eventListenerRegister.getId() + " for type " + eventType
+ + ", filter=" + eventFilter);
}
public void setEventListenerRegister(
\r
public class SlcExecution implements Serializable {\r
private static final long serialVersionUID = 1L;\r
+ public final static String STATUS_NONE = "DEFAULT";\r
public final static String STATUS_SCHEDULED = "SCHEDULED";\r
public final static String STATUS_RUNNING = "RUNNING";\r
public final static String STATUS_FINISHED = "FINISHED";\r
private String host;\r
private String user;\r
private String type;\r
- private String status;\r
+ private String status = STATUS_NONE;\r
private Map<String, String> attributes = new TreeMap<String, String>();\r
\r
private List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();\r
if (descriptors.size() == 0) {
// No listeners, just waiting
try {
+ if(log.isTraceEnabled())
+ log.trace("No event listener registered, sleeping...");
Thread.sleep(timeout);
} catch (InterruptedException e) {
// silent
import java.util.ArrayList;
import java.util.List;
+import org.argeo.slc.msg.ObjectList;
+
+/** @deprecated user {@link ObjectList} instead. */
public class ResultAttributesList {
private List<ResultAttributes> list = new ArrayList<ResultAttributes>();