<groupId>org.argeo.slc.server</groupId>
<artifactId>org.argeo.slc.ria</artifactId>
</dependency>
+
+ <!-- Test -->
+ <dependency>
+ <groupId>org.argeo.slc.runtime</groupId>
+ <artifactId>org.argeo.slc.launcher</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
</plugins>
</build>
<dependencies>
+ <dependency>
+ <groupId>org.argeo.slc.runtime</groupId>
+ <artifactId>org.argeo.slc.support.castor</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.argeo.dep.osgi</groupId>
<artifactId>org.argeo.dep.osgi.commons.cli</artifactId>
</dependency>
<dependency>
- <groupId>org.argeo.slc.dep</groupId>
- <artifactId>org.argeo.slc.dep.agent</artifactId>
- <version>${project.version}</version>
+ <groupId>org.eclipse.osgi</groupId>
+ <artifactId>org.eclipse.osgi</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
+++ /dev/null
-package org.argeo.slc.cli;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Properties;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.PropertyConfigurator;
-import org.argeo.slc.SlcException;
-import org.springframework.core.io.DefaultResourceLoader;
-import org.springframework.util.ResourceUtils;
-import org.springframework.util.SystemPropertyUtils;
-
-public class Log4jUtils {
-
- /**
- * Configure log4j based on properties, with the following priorities (from
- * highest to lowest):<br>
- * 1. System properties<br>
- * 2. configuration file itself
- */
- public static void initLog4j(String configuration) {
- // clears previous configuration
- shutDownLog4j();
-
- ClassLoader cl = Log4jUtils.class.getClassLoader();
- Properties properties = new Properties();
- if (configuration != null) {
- InputStream in = null;
- try {
- if (configuration
- .startsWith(ResourceUtils.CLASSPATH_URL_PREFIX)) {
- String path = configuration
- .substring(ResourceUtils.CLASSPATH_URL_PREFIX
- .length());
- in = cl.getResourceAsStream(path);
- } else {
- in = new DefaultResourceLoader(cl).getResource(
- configuration).getInputStream();
- }
-
- properties.load(in);
- } catch (IOException e) {
- throw new SlcException("Cannot load properties from "
- + configuration);
- } finally {
- IOUtils.closeQuietly(in);
- }
- }
-
- // Overrides with System properties
- overrideLog4jProperties(properties, System.getProperties());
-
- PropertyConfigurator.configure(properties);
- }
-
- private static void overrideLog4jProperties(Properties target,
- Properties additional) {
- for (Object obj : additional.keySet()) {
- String key = obj.toString();
- if (key.startsWith("log4j.")) {
- if (!key.equals("log4j.configuration")) {
- String value = SystemPropertyUtils
- .resolvePlaceholders(additional.getProperty(key));
- target.put(key, value);
- }
- }
- }
- }
-
- public static void shutDownLog4j() {
- LogManager.shutdown();
- }
-
- private Log4jUtils() {
-
- }
-}
+++ /dev/null
-package org.argeo.slc.cli;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-
-import org.eclipse.core.runtime.adaptor.EclipseStarter;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.BundleException;
-
-public class OsgiLauncher {
- public final static String PROP_SLC_OSGI_EQUINOX_ARGS = "slc.osgi.equinox.args";
- public final static String PROP_SLC_OSGI_START = "slc.osgi.start";
-
- public static void main(String[] args) {
- try {
- String baseUrl = args[0];
- String config = args[1];
-
- List<MavenFile> mavenFiles = new ArrayList<MavenFile>();
- BufferedReader in = new BufferedReader(new FileReader(config));
- String line = null;
- while ((line = in.readLine()) != null) {
- try {
- line = line.trim();
- if (line.equals("")
- || line
- .startsWith("The following files have been resolved:"))
- continue;// skip
-
- mavenFiles.add(convert(line));
- } catch (Exception e) {
- System.err.println("Could not load line " + line);
- }
- }
-
- List<String> urls = asUrls(baseUrl, mavenFiles);
-
- // Start Equinox
- File baseDir = new File(System.getProperty("user.dir"))
- .getCanonicalFile();
- String equinoxConfigurationPath = baseDir.getPath()
- + File.separator + "slc-detached" + File.separator
- + "equinoxConfiguration";
-
- String equinoxArgsLineDefault = "-console -noExit -clean -debug -configuration "
- + equinoxConfigurationPath;
- String equinoxArgsLine = System.getProperty(
- PROP_SLC_OSGI_EQUINOX_ARGS, equinoxArgsLineDefault);
- String[] equinoxArgs = equinoxArgsLine.split(" ");
-
- BundleContext bundleContext = EclipseStarter.startup(equinoxArgs,
- null);
-
- Map<String, Bundle> installedBundles = getInstalledBundles(bundleContext);
- for (String url : urls) {
- try {
-
- if (installedBundles.containsKey(url)) {
- Bundle bundle = installedBundles.get(url);
- // bundle.update();
- info("Bundle " + bundle.getSymbolicName()
- + " already installed from " + url);
- } else {
- Bundle bundle = bundleContext.installBundle(url);
- info("Installed bundle " + bundle.getSymbolicName()
- + " from " + url);
- }
- } catch (BundleException e) {
- warn("Could not install bundle from " + url + ": "
- + e.getMessage());
- }
- }
-
- String bundlesToStart = System.getProperty(PROP_SLC_OSGI_START,
- "org.springframework.osgi.extender");
- StringTokenizer st = new StringTokenizer(bundlesToStart, ",");
- Map<String, Bundle> bundles = getBundles(bundleContext);
- while (st.hasMoreTokens()) {
- String name = st.nextToken().trim();
- Bundle bundle = bundles.get(name);
- bundle.start();
-
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- protected static Map<String, Bundle> getInstalledBundles(
- BundleContext bundleContext) {
- Map<String, Bundle> installedBundles = new HashMap<String, Bundle>();
- for (Bundle bundle : bundleContext.getBundles())
- installedBundles.put(bundle.getLocation(), bundle);
- return installedBundles;
- }
-
- protected static Map<String, Bundle> getBundles(BundleContext bundleContext) {
- Map<String, Bundle> installedBundles = new HashMap<String, Bundle>();
- for (Bundle bundle : bundleContext.getBundles())
- installedBundles.put(bundle.getSymbolicName(), bundle);
- return installedBundles;
- }
-
- protected static List<String> asUrls(String baseUrl,
- List<MavenFile> mavenFiles) {
- List<String> urls = new ArrayList<String>();
- for (MavenFile mf : mavenFiles)
- urls.add(convertToUrl(baseUrl, mf));
- return urls;
- }
-
- protected static String convertToUrl(String baseUrl, MavenFile mf) {
- return baseUrl + mf.getGroupId().replace('.', '/') + '/'
- + mf.getArtifactId() + '/' + mf.getVersion() + '/'
- + mf.getArtifactId() + '-' + mf.getVersion() + '.'
- + mf.getType();
- }
-
- protected static MavenFile convert(String str) {
- StringTokenizer st = new StringTokenizer(str, ":");
- MavenFile component = new MavenFile();
- component.setGroupId(st.nextToken());
- component.setArtifactId(st.nextToken());
- component.setType(st.nextToken());
- component.setVersion(st.nextToken());
- component.setScope(st.nextToken());
- return component;
- }
-
- private static void info(Object obj) {
- System.out.println("[INFO] " + obj);
- }
-
- private static void warn(Object obj) {
- System.err.println("[WARN] " + obj);
- }
-
- static class MavenFile {
- private String groupId;
- private String artifactId;
- private String version;
- private String type;
- private String classifier;
- private String scope;
-
- public String getScope() {
- return scope;
- }
-
- public void setScope(String scope) {
- this.scope = scope;
- }
-
- private String distributionId;
-
- public String getDistributionId() {
- return distributionId;
- }
-
- public void setDistributionId(String distributionId) {
- this.distributionId = distributionId;
- }
-
- public String getGroupId() {
- return groupId;
- }
-
- public void setGroupId(String groupId) {
- this.groupId = groupId;
- }
-
- public String getArtifactId() {
- return artifactId;
- }
-
- public void setArtifactId(String artifactId) {
- this.artifactId = artifactId;
- }
-
- public String getVersion() {
- return version;
- }
-
- public void setVersion(String version) {
- this.version = version;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public String getClassifier() {
- return classifier;
- }
-
- public void setClassifier(String classifier) {
- this.classifier = classifier;
- }
-
- }
-
-}
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.argeo.slc.SlcException;
-import org.argeo.slc.runtime.SlcExecutionContext;
-import org.argeo.slc.runtime.SlcRuntime;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.support.ClassPathXmlApplicationContext;
-import org.springframework.context.support.FileSystemXmlApplicationContext;
+import org.argeo.slc.server.client.SlcServerHttpClient;
+import org.argeo.slc.server.client.impl.SlcServerHttpClientImpl;
public class SlcMain {
- public enum Mode {
- single, agent, osgi
+ public enum Type {
+ standalone, agent, server
}
- private static Log log = null;
+ private static Boolean debug = true;
private final static String BOOTSTRAP_LOG4J_CONFIG = "org/argeo/slc/cli/bootstrapLog4j.properties";
private final static String DEFAULT_AGENT_CONTEXT = "classpath:org/argeo/slc/cli/spring-agent-default.xml";
- private final static Option modeOpt = OptionBuilder.withLongOpt("mode")
+ private final static Option typeOpt = OptionBuilder.withLongOpt("mode")
.withArgName("mode").hasArg().withDescription(
- "SLC execution mode, one of: " + listModeValues()).create(
- 'm');
+ "Execution type, one of: " + listTypeValues()).create('t');
private final static Option propertyOpt = OptionBuilder.withLongOpt(
"property").withArgName("prop1=val1,prop2=val2").hasArgs()
.withValueSeparator(',').withDescription(
"load properties from file (-p has priority)").create('P');
- private final static Option scriptOpt = OptionBuilder.withLongOpt("script")
- .withArgName("script").hasArg().withDescription(
- "SLC script to execute").create('s');
+ private final static Option moduleOpt = OptionBuilder.withLongOpt("module")
+ .withArgName("module").hasArg().withDescription("Execution module")
+ .create('m');
- private final static Option targetsOpt = OptionBuilder.withLongOpt(
- "targets").withArgName("targets").hasArg().withDescription(
- "Targets to execute").create('t');
+ private final static Option flowsOpt = OptionBuilder.withLongOpt("flows")
+ .withArgName("flows").hasArg().withDescription("Flows to execute")
+ .create('f');
private final static Option runtimeOpt = OptionBuilder.withLongOpt(
"runtime").withArgName("runtime").hasArg().withDescription(
- "Runtime to use, either a full path or relative to slc app conf dir: "
- + "<conf dir>/runtime/<runtime>/.xml").create('r');
+ "Runtime URL").create('r');
private final static Options options;
static {
options = new Options();
- options.addOption(modeOpt);
- options.addOption(scriptOpt);
- options.addOption(targetsOpt);
+ options.addOption(typeOpt);
+ options.addOption(moduleOpt);
+ options.addOption(flowsOpt);
options.addOption(propertyOpt);
options.addOption(propertiesOpt);
options.addOption(runtimeOpt);
}
public static void main(String[] args) {
- Mode mode = null;
+ Type type = null;
Properties properties = new Properties();
- String script = null;
- String targets = null;
- String runtimeStr = null;
+ String module = null;
+ String flows = null;
+ String urlStr = null;
try {
CommandLine cl = clParser.parse(options, args);
// Mode
- String modeStr = cl.getOptionValue(modeOpt.getOpt());
- if (modeStr == null) {
- mode = Mode.single;
+ String typeStr = cl.getOptionValue(typeOpt.getOpt());
+ if (typeStr == null) {
+ type = Type.standalone;
} else {
try {
- mode = Mode.valueOf(modeStr);
+ type = Type.valueOf(typeStr);
} catch (IllegalArgumentException e) {
- throw new SlcException("Unrecognized mode '" + modeStr
+ throw new SlcException("Unrecognized mode '" + typeStr
+ "'", e);
}
}
// Script
- if (mode.equals(Mode.single)) {
- if (!cl.hasOption(scriptOpt.getOpt()))
- throw new SlcException("Mode " + Mode.single
- + " requires option '" + scriptOpt.getLongOpt()
+ if (type.equals(Type.standalone)) {
+ if (!cl.hasOption(moduleOpt.getOpt()))
+ throw new SlcException("Type " + Type.standalone
+ + " requires option '" + moduleOpt.getLongOpt()
+ "'");
- script = cl.getOptionValue(scriptOpt.getOpt());
+ module = cl.getOptionValue(moduleOpt.getOpt());
// Targets
- if (cl.hasOption(targetsOpt.getOpt()))
- targets = cl.getOptionValue(targetsOpt.getOpt());
+ if (cl.hasOption(flowsOpt.getOpt()))
+ flows = cl.getOptionValue(flowsOpt.getOpt());
}
// Properties
// Runtime
if (cl.hasOption(runtimeOpt.getOpt())) {
- runtimeStr = cl.getOptionValue(runtimeOpt.getOpt());
+ urlStr = cl.getOptionValue(runtimeOpt.getOpt());
}
} catch (ParseException e) {
System.err.println("Problem with command line arguments. "
badExit();
}
- // Initializes logging and log arguments
- initLogging(properties);
- if (log.isDebugEnabled()) {
- log.debug("Mode: " + mode);
- if (runtimeStr != null)
- log.debug("Runtime: " + runtimeStr);
- log.debug("User properties: " + properties);
- if (script != null)
- log.debug("Script: " + script);
- if (targets != null)
- log.debug("Targets: " + targets);
+ if (debug) {
+ debug("Mode: " + type);
+ if (urlStr != null)
+ debug("Runtime: " + urlStr);
+ debug("User properties: " + properties);
+ if (module != null)
+ debug("Module: " + module);
+ if (flows != null)
+ debug("Flows: " + flows);
}
- // Execution
- if (mode.equals(Mode.single)) {
- try {
- // DefaultSlcRuntime runtime = new DefaultSlcRuntime();
- // FIXME: inject this more cleanly
- ClassLoader cl = Thread.currentThread().getContextClassLoader();
- Class clss = cl.loadClass("org.argeo.slc.ant.AntSlcRuntime");
- SlcRuntime<? extends SlcExecutionContext> runtime = (SlcRuntime<? extends SlcExecutionContext>) clss
- .newInstance();
- runtime.executeScript(runtimeStr, script, targets, properties,
- null, null);
- // System.exit(0);
- } catch (Exception e) {
- log.error("SLC client terminated with an error: ", e);
- System.exit(1);
- }
+ // Standalone
+ if (type.equals(Type.standalone)) {
}
// Agent
- else if (mode.equals(Mode.agent)) {
- final ConfigurableApplicationContext applicationContext;
- if (runtimeStr == null) {
- applicationContext = new ClassPathXmlApplicationContext(
- DEFAULT_AGENT_CONTEXT);
- } else {
- applicationContext = new FileSystemXmlApplicationContext(
- runtimeStr);
- }
- applicationContext.registerShutdownHook();
- applicationContext.start();
- log.info("SLC Agent context started.");
+ else if (type.equals(Type.agent)) {
}
- // OSGi
- else if (mode.equals(Mode.osgi)) {
+ // Server
+ else if (type.equals(Type.server)) {
+ SlcServerHttpClientImpl slcServerHttpClient = new SlcServerHttpClientImpl();
+ slcServerHttpClient.setBaseUrl(urlStr);
}
}
new HelpFormatter().printHelp(commandName, options, true);
}
- private static String listModeValues() {
+ private static String listTypeValues() {
StringBuffer buf = new StringBuffer("");
- for (Mode mode : Mode.values()) {
+ for (Type mode : Type.values()) {
buf.append(mode).append(", ");
}
String str = buf.toString();
}
}
- private static void initLogging(Properties userProperties) {
- System.setProperty("log4j.defaultInitOverride", "true");
-
- // Add log4j user properties to System properties
- for (Object obj : userProperties.keySet()) {
- String key = obj.toString();
- if (key.startsWith("log4j.")) {
- System.setProperty(key, userProperties.getProperty(key));
- }
- }
- Log4jUtils.initLog4j(System.getProperty("log4j.configuration",
- "classpath:" + BOOTSTRAP_LOG4J_CONFIG));
- log = LogFactory.getLog(SlcMain.class);
-
- }
-
private static void badExit() {
printUsage();
System.exit(1);
}
+
+ protected static void info(Object msg) {
+ System.out.println(msg);
+ }
+
+ protected static void debug(Object msg) {
+ System.out.println(msg);
+ }
}
--- /dev/null
+package org.argeo.slc.server.client;
+
+import java.util.Map;
+
+import org.argeo.slc.Condition;
+
+/** Abstraction of the access to HTTP services . */
+public interface HttpServicesClient {
+ /** Call service, failing if it is not available. */
+ public <T> T callService(String path, Map<String, String> parameters);
+
+ /**
+ * Call service, waiting and retrying until the timeout is reached if it is
+ * not immediately available.
+ *
+ * @param path
+ * service path
+ * @param condition
+ * if not null, a condition to be applied on received object,
+ * keep trying if it returns false.
+ * @param timeout
+ * timeout after which an exception is thrown
+ */
+ public <T> T callServiceSafe(String path, Map<String, String> parameters,
+ Condition<T> condition, Long timeout);
+
+}
--- /dev/null
+package org.argeo.slc.server.client;
+
+import java.util.List;
+import java.util.Map;
+
+import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.msg.ExecutionAnswer;
+import org.argeo.slc.msg.event.SlcEvent;
+import org.argeo.slc.process.RealizedFlow;
+import org.argeo.slc.process.SlcExecution;
+import org.argeo.slc.runtime.SlcAgentDescriptor;
+import org.argeo.slc.server.HttpServices;
+
+/** Abstraction of the access to HTTP services of an SLC Server. */
+public interface SlcServerHttpClient extends HttpServicesClient,HttpServices {
+ /** Wait for the provided SlcExecution to be finished. */
+ public void waitForSlcExecutionFinished(SlcExecution slcExecution,
+ Long timeout);
+
+ /** Block until one of the registered event is finished. */
+ public SlcEvent pollEvent(Long timeout);
+
+ /** Register an event type. */
+ public ExecutionAnswer addEventListener(String eventType, String eventFilter);
+
+ /** Unregister an event type. */
+ public ExecutionAnswer removeEventListener(String eventType,
+ String eventFilter);
+
+ /** Wait for one agent to be available. */
+ public SlcAgentDescriptor waitForOneAgent();
+
+ /** Wait for the http server to be ready. */
+ public void waitForServerToBeReady();
+
+ /** Start an execution flow on the given agent. */
+ public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow);
+
+ /** Assume one agent and one version per module. */
+ public SlcExecution startFlowDefault(String moduleName, String flowName,
+ Map<String, Object> args);
+
+ /** List execution modules descriptors. */
+ public List<ExecutionModuleDescriptor> listModuleDescriptors(String agentId);
+
+ /** Retrieve a single execution module descriptor. */
+ public ExecutionModuleDescriptor getModuleDescriptor(String agentId,
+ String moduleName, String version);
+
+}
--- /dev/null
+package org.argeo.slc.server.client.impl;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Map;
+
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.transform.stream.StreamSource;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.Condition;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.server.client.HttpServicesClient;
+import org.springframework.oxm.Marshaller;
+import org.springframework.oxm.Unmarshaller;
+import org.springframework.util.Assert;
+
+public abstract class AbstractHttpServicesClient implements HttpServicesClient {
+ private final static Log log = LogFactory
+ .getLog(AbstractHttpServicesClient.class);
+ private Unmarshaller unmarshaller;
+ private Marshaller marshaller;
+ private String baseUrl;
+ private String encoding = "UTF-8";
+
+ private Long retryPeriod = 1000l;
+ private Long defaultTimeout = 30 * 1000l;
+
+ @SuppressWarnings(value = { "unchecked" })
+ public <T> T callService(String path, Map<String, String> parameters) {
+ return (T)callService(path, parameters, null);
+ }
+
+ @SuppressWarnings(value = { "unchecked" })
+ public <T> T callService(String path, Map<String, String> parameters,
+ Object body) {
+ try {
+ return (T) callServiceLowLevel(path, parameters, body);
+ } catch (Exception e) {
+ throw new SlcException("Cannot call service " + path + " on "
+ + baseUrl, e);
+ }
+ }
+
+ @SuppressWarnings(value = { "unchecked" })
+ public <T> T callServiceSafe(String path, Map<String, String> parameters,
+ Condition<T> condition, Long timeout) {
+
+ long begin = System.currentTimeMillis();
+ try {
+ Object obj = null;
+ while (System.currentTimeMillis() - begin < timeout(timeout)) {
+ try {
+ obj = callServiceLowLevel(path, parameters, null);
+ } catch (IOException e) {
+ if (log.isTraceEnabled())
+ log.trace("Exception when calling service " + path
+ + " on " + baseUrl, e);
+ }
+
+ if (obj != null) {
+ if (condition == null)
+ break;
+ else {
+ if (condition.check((T) obj))
+ break;
+ }
+ }
+ // wait a bit
+ try {
+ Thread.sleep(retryPeriod);
+ } catch (InterruptedException e) {
+ // silent
+ }
+ }
+
+ if (obj == null)
+ throw new SlcException(
+ "Service "
+ + path
+ + " on "
+ + baseUrl
+ + " did not return an answer after calling it safely for "
+ + timeout(timeout) + " ms.");
+ return (T) obj;
+ } catch (Exception e) {
+ throw new SlcException(
+ "Unexpected exception when safely calling service " + path
+ + " on " + baseUrl, e);
+ }
+ }
+
+ protected Object callServiceLowLevel(String path,
+ Map<String, String> parameters, Object body) throws IOException {
+ Assert.notNull(baseUrl, "base url");
+ HttpURLConnection connection = null;
+ Writer writer = null;
+ Reader reader = null;
+ try {
+ URL url = createUrl(path, parameters);
+ connection = (HttpURLConnection) url.openConnection();
+
+ if (body != null) {
+ connection.setRequestMethod("POST");
+ connection.setDoOutput(true);
+ connection.setDoInput(true);
+ connection.setUseCaches(false);
+ connection.setAllowUserInteraction(false);
+ connection.setRequestProperty("Content-type",
+ "text/xml; charset=" + encoding);
+ }
+
+ // Establish the connection
+ connection.connect();
+
+ if (body != null) {
+ writer = new OutputStreamWriter(connection.getOutputStream(),
+ encoding);
+ StreamResult result = new StreamResult(writer);
+ marshaller.marshal(body, result);
+ writer.flush();
+ IOUtils.closeQuietly(writer);
+ }
+
+ // Read answer
+ reader = new InputStreamReader(connection.getInputStream(),
+ encoding);
+ Source source = new StreamSource(reader);
+ Object obj = unmarshaller.unmarshal(source);
+ return obj;
+ } finally {
+ IOUtils.closeQuietly(reader);
+ IOUtils.closeQuietly(writer);
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+ }
+
+ protected URL createUrl(String service, Map<String, String> parameters) {
+ // URL encoded with UTF-8, as recommended by W3C
+ final String urlEncoding = "UTF-8";
+
+ StringBuffer buf = new StringBuffer(baseUrl + service);
+ try {
+ if (parameters != null && parameters.size() != 0) {
+ buf.append('?');
+ boolean first = true;
+ for (String key : parameters.keySet()) {
+ String value = parameters.get(key);
+ if (value != null) {
+ if (first)
+ first = false;
+ else
+ buf.append('&');
+ String keyEncoded = URLEncoder.encode(key, urlEncoding);
+ String valueEncoded = URLEncoder.encode(value,
+ urlEncoding);
+ buf.append(keyEncoded).append('=').append(valueEncoded);
+ }
+ }
+ }
+
+ return new URL(buf.toString());
+ } catch (Exception e) {
+ throw new SlcException("Cannot create URL: " + buf, e);
+ }
+ }
+
+ public Long timeout(Long timeout) {
+ if (timeout == null)
+ timeout = getDefaultTimeout();
+ return timeout;
+ }
+
+ public void setUnmarshaller(Unmarshaller unmarshaller) {
+ this.unmarshaller = unmarshaller;
+ }
+
+ public void setBaseUrl(String baseUrl) {
+ this.baseUrl = baseUrl;
+ }
+
+ public Long getRetryPeriod() {
+ return retryPeriod;
+ }
+
+ /** Retry period in ms when accessing service safely. Default is 1000 ms. */
+ public void setRetryPeriod(Long retryPeriod) {
+ this.retryPeriod = retryPeriod;
+ }
+
+ public void setMarshaller(Marshaller marshaller) {
+ this.marshaller = marshaller;
+ }
+
+ /** Default is UTF-8. */
+ public void setEncoding(String encoding) {
+ this.encoding = encoding;
+ }
+
+ /** Default is 30s */
+ public void setDefaultTimeout(Long defaultTimeout) {
+ this.defaultTimeout = defaultTimeout;
+ }
+
+ public Long getDefaultTimeout() {
+ return defaultTimeout;
+ }
+
+}
--- /dev/null
+package org.argeo.slc.server.client.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.Condition;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.execution.ExecutionFlowDescriptor;
+import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.msg.ExecutionAnswer;
+import org.argeo.slc.msg.MsgConstants;
+import org.argeo.slc.msg.ObjectList;
+import org.argeo.slc.msg.event.SlcEvent;
+import org.argeo.slc.process.RealizedFlow;
+import org.argeo.slc.process.SlcExecution;
+import org.argeo.slc.runtime.SlcAgentDescriptor;
+import org.argeo.slc.server.client.SlcServerHttpClient;
+
+public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
+ implements SlcServerHttpClient {
+
+ protected final static String PARAM_AGENT_ID = "agentId";
+
+ private final static Log log = LogFactory
+ .getLog(SlcServerHttpClientImpl.class);
+
+ private Long serverReadyTimeout = 120 * 1000l;
+
+ public void waitForSlcExecutionFinished(SlcExecution slcExecution,
+ Long timeout) {
+ if (slcExecution.getStatus().equals(SlcExecution.STATUS_FINISHED))
+ return;
+
+ long begin = System.currentTimeMillis();
+ while (System.currentTimeMillis() - begin < timeout(timeout)) {
+ SlcEvent event = pollEvent(timeout);
+ String slcExecutionId = event.getHeaders().get(
+ MsgConstants.PROPERTY_SLC_EXECUTION_ID);
+ String status = event.getHeaders().get(
+ MsgConstants.PROPERTY_SLC_EXECUTION_STATUS);
+ if (slcExecutionId.equals(slcExecution.getUuid())
+ && status.equals(SlcExecution.STATUS_FINISHED)) {
+ return;
+ }
+ }
+ throw new SlcException("SLC Execution not completed after timeout "
+ + timeout(timeout) + " elapsed.");
+ }
+
+ public SlcEvent pollEvent(Long timeout) {
+ long begin = System.currentTimeMillis();
+ while (System.currentTimeMillis() - begin < timeout(timeout)) {
+ Object obj = callService(POLL_EVENT, null);
+ if (obj instanceof ExecutionAnswer) {
+ ExecutionAnswer answer = (ExecutionAnswer) obj;
+ if (answer.isError())
+ throw new SlcException(
+ "Unexpected exception when polling event: "
+ + answer.getMessage());
+ } else {
+ return (SlcEvent) obj;
+ }
+ }
+ throw new SlcException("No event received after timeout "
+ + timeout(timeout) + " elapsed.");
+ }
+
+ public ExecutionAnswer addEventListener(String eventType, String eventFilter) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(SlcEvent.EVENT_TYPE, eventType);
+ parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
+ return callService(ADD_EVENT_LISTENER, parameters);
+ }
+
+ public ExecutionAnswer removeEventListener(String eventType,
+ String eventFilter) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(SlcEvent.EVENT_TYPE, eventType);
+ parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
+ return callService(REMOVE_EVENT_LISTENER, parameters);
+ }
+
+ public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow) {
+ SlcExecution slcExecution = new SlcExecution();
+ slcExecution.setUuid(UUID.randomUUID().toString());
+
+ slcExecution.getRealizedFlows().add(realizedFlow);
+
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(MsgConstants.PROPERTY_SLC_AGENT_ID, agentId);
+ ExecutionAnswer answer = callService(NEW_SLC_EXECUTION, parameters,
+ slcExecution);
+ if (!answer.isOk())
+ throw new SlcException("Could not start flow on agent " + agentId
+ + ": " + answer.getMessage());
+ return slcExecution;
+ }
+
+ public SlcExecution startFlowDefault(String moduleName, String flowName,
+ Map<String, Object> args) {
+ SlcAgentDescriptor agentDescriptor = waitForOneAgent();
+ List<ExecutionModuleDescriptor> lst = listModuleDescriptors(agentDescriptor
+ .getUuid());
+ ExecutionModuleDescriptor moduleDescMinimal = findModule(lst,
+ moduleName);
+ if (moduleDescMinimal == null)
+ throw new SlcException("Cannot find module " + moduleName);
+ String moduleVersion = moduleDescMinimal.getVersion();
+
+ ExecutionModuleDescriptor moduleDesc = getModuleDescriptor(
+ agentDescriptor.getUuid(), moduleName, moduleVersion);
+
+ RealizedFlow realizedFlow = new RealizedFlow();
+ realizedFlow.setModuleName(moduleName);
+ realizedFlow.setModuleVersion(moduleDesc.getVersion());
+
+ ExecutionFlowDescriptor flowDescriptor = findFlow(moduleDesc, flowName);
+ if (args != null) {
+ for (String key : args.keySet()) {
+ if (flowDescriptor.getValues().containsKey(key)) {
+ flowDescriptor.getValues().put(key, args.get(key));
+ }
+ }
+ }
+ realizedFlow.setFlowDescriptor(flowDescriptor);
+
+ return startFlow(agentDescriptor.getUuid(), realizedFlow);
+
+ // FIXME: polling not working when called from test: no unique
+ // session is created on server side
+ // SlcExecution slcExecutionFinished = null;
+ // try {
+ // addEventListener(
+ // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
+ // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(),
+ // realizedFlow);
+ //
+ // waitForSlcExecutionFinished(slcExecution, null);
+ //
+ // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null);
+ // for (Serializable sr : ol.getObjects()) {
+ // SlcExecution se = (SlcExecution) sr;
+ // if (se.getUuid().equals(slcExecution.getUuid())) {
+ // slcExecutionFinished = se;
+ // break;
+ // }
+ // }
+ //
+ // } finally {
+ // removeEventListener(
+ // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
+ // }
+ //
+ // if (slcExecutionFinished == null)
+ // throw new SlcException("No finished SLC Execution.");
+ // return slcExecutionFinished;
+ }
+
+ public static ExecutionModuleDescriptor findModule(
+ List<ExecutionModuleDescriptor> lst, String moduleName) {
+ ExecutionModuleDescriptor moduleDesc = null;
+ for (ExecutionModuleDescriptor desc : lst) {
+ if (desc.getName().equals(moduleName)) {
+ if (moduleDesc != null)
+ throw new SlcException(
+ "There is more than one module named " + moduleName
+ + " (versions: " + moduleDesc + " and "
+ + desc.getVersion() + ")");
+ moduleDesc = desc;
+ }
+ }
+ return moduleDesc;
+ }
+
+ public static ExecutionFlowDescriptor findFlow(
+ ExecutionModuleDescriptor moduleDesc, String flowName) {
+ ExecutionFlowDescriptor flowDesc = null;
+ for (ExecutionFlowDescriptor desc : moduleDesc.getExecutionFlows()) {
+ if (desc.getName().equals(flowName)) {
+ flowDesc = desc;
+ }
+ }
+ return flowDesc;
+ }
+
+ public List<ExecutionModuleDescriptor> listModuleDescriptors(String agentId) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(PARAM_AGENT_ID, agentId);
+
+ List<ExecutionModuleDescriptor> moduleDescriptors = new ArrayList<ExecutionModuleDescriptor>();
+ ObjectList ol = callService(LIST_MODULE_DESCRIPTORS, parameters);
+ ol.fill(moduleDescriptors);
+ return moduleDescriptors;
+ }
+
+ public ExecutionModuleDescriptor getModuleDescriptor(String agentId,
+ String moduleName, String version) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(PARAM_AGENT_ID, agentId);
+ parameters.put("moduleName", moduleName);
+ parameters.put("version", version);
+ ExecutionModuleDescriptor moduleDescriptor = callService(
+ GET_MODULE_DESCRIPTOR, parameters);
+ return moduleDescriptor;
+ }
+
+ public SlcAgentDescriptor waitForOneAgent() {
+ ObjectList objectList = callServiceSafe(LIST_AGENTS, null,
+ new Condition<ObjectList>() {
+ public Boolean check(ObjectList obj) {
+ int size = obj.getObjects().size();
+ if (log.isTraceEnabled())
+ log.trace("Object list size: " + size);
+ return size == 1;
+ }
+ }, null);
+ return (SlcAgentDescriptor) objectList.getObjects().get(0);
+ }
+
+ public void waitForServerToBeReady() {
+ ExecutionAnswer answer = callServiceSafe(IS_SERVER_READY, null, null,
+ serverReadyTimeout);
+ if (!answer.isOk())
+ throw new SlcException("Server is not ready: " + answer);
+ }
+
+ /**
+ * Timeout in ms after which the client will stop waiting for the server to
+ * be ready and throw an exception. Default is 120s.
+ */
+ public void setServerReadyTimeout(Long serverReadyTimeout) {
+ this.serverReadyTimeout = serverReadyTimeout;
+ }
+
+}
--- /dev/null
+package org.argeo.slc.server.unit;
+
+import org.argeo.slc.msg.ExecutionAnswer;
+import org.argeo.slc.server.client.SlcServerHttpClient;
+import org.argeo.slc.unit.AbstractSpringTestCase;
+
+public abstract class AbstractHttpClientTestCase extends AbstractSpringTestCase {
+ private SlcServerHttpClient httpClient = null;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ httpClient = createHttpClient();
+ httpClient.waitForServerToBeReady();
+ }
+
+ protected SlcServerHttpClient createHttpClient() {
+ SlcServerHttpClient httpClient = getBean(SlcServerHttpClient.class);
+ return httpClient;
+ }
+
+ protected SlcServerHttpClient getHttpClient() {
+ return httpClient;
+ }
+
+ protected void assertAnswerOk(ExecutionAnswer answer) {
+ if (!answer.isOk()) {
+ fail("Server execution answer is not ok: " + answer.getMessage());
+ }
+ }
+}
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"
- default-lazy-init="false">
-
-
- <bean
- class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
- lazy-init="false">
- <property name="properties">
- <props>
- <prop key="org.apache.activemq.brokerURL">
- tcp://localhost:61616
- </prop>
- </props>
- </property>
- <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
- <property name="ignoreUnresolvablePlaceholders" value="true" />
- </bean>
-
- <import resource="classpath:org/argeo/slc/activemq/spring-agent.xml" />
-
- <bean id="slcDefault.cli.slcApplication" class="org.argeo.slc.ant.AntSlcApplication"
- init-method="initFromSlcRootFile">
- <property name="slcRootFile" value="${slc.rootFile}" />
- </bean>
-
-</beans>
\ No newline at end of file
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"
+ default-lazy-init="false">
+
+
+ <bean
+ class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
+ lazy-init="false">
+ <property name="properties">
+ <props>
+ <prop key="org.apache.activemq.brokerURL">
+ tcp://localhost:61616
+ </prop>
+ </props>
+ </property>
+ <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
+ <property name="ignoreUnresolvablePlaceholders" value="true" />
+ </bean>
+
+ <import resource="classpath:org/argeo/slc/activemq/spring-agent.xml" />
+
+ <bean id="slcDefault.cli.slcApplication" class="org.argeo.slc.ant.AntSlcApplication"
+ init-method="initFromSlcRootFile">
+ <property name="slcRootFile" value="${slc.rootFile}" />
+ </bean>
+
+</beans>
\ No newline at end of file
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:aop="http://www.springframework.org/schema/aop"
+ xmlns:tx="http://www.springframework.org/schema/tx"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd"
+ default-lazy-init="true">
+
+ <import resource="classpath:org/argeo/slc/castor/spring.xml" />
+
+ <!-- Http client -->
+ <bean id="slcDefault.serverHttpClient" class="org.argeo.slc.server.client.impl.SlcServerHttpClientImpl"
+ lazy-init="true">
+ <property name="unmarshaller" ref="slcDefault.castor.marshaller" />
+ <property name="marshaller" ref="slcDefault.castor.marshaller" />
+ <property name="baseUrl" value="http://localhost:7070/org.argeo.slc.webapp/" />
+ </bean>
+</beans>
\ No newline at end of file
+++ /dev/null
-package org.argeo.slc.server.client;
-
-import java.util.Map;
-
-import org.argeo.slc.Condition;
-
-/** Abstraction of the access to HTTP services . */
-public interface HttpServicesClient {
- /** Call service, failing if it is not available. */
- public <T> T callService(String path, Map<String, String> parameters);
-
- /**
- * Call service, waiting and retrying until the timeout is reached if it is
- * not immediately available.
- *
- * @param path
- * service path
- * @param condition
- * if not null, a condition to be applied on received object,
- * keep trying if it returns false.
- * @param timeout
- * timeout after which an exception is thrown
- */
- public <T> T callServiceSafe(String path, Map<String, String> parameters,
- Condition<T> condition, Long timeout);
-
-}
+++ /dev/null
-package org.argeo.slc.server.client;
-
-import java.util.List;
-import java.util.Map;
-
-import org.argeo.slc.execution.ExecutionModuleDescriptor;
-import org.argeo.slc.msg.ExecutionAnswer;
-import org.argeo.slc.msg.event.SlcEvent;
-import org.argeo.slc.process.RealizedFlow;
-import org.argeo.slc.process.SlcExecution;
-import org.argeo.slc.runtime.SlcAgentDescriptor;
-
-/** Abstraction of the access to HTTP services of an SLC Server. */
-public interface SlcServerHttpClient extends HttpServicesClient {
- public final static String LIST_AGENTS = "listAgents.service";
- public final static String IS_SERVER_READY = "isServerReady.service";
- public final static String NEW_SLC_EXECUTION = "newSlcExecution.service";
- public final static String LIST_SLC_EXECUTIONS = "listSlcExecutions.service";
- public final static String GET_MODULE_DESCRIPTOR = "getExecutionDescriptor.service";
- public final static String LIST_MODULE_DESCRIPTORS = "listModulesDescriptors.service";
- public final static String LIST_RESULTS = "listResults.service";
- public final static String ADD_EVENT_LISTENER = "addEventListener.service";
- public final static String REMOVE_EVENT_LISTENER = "removeEventListener.service";
- public final static String POLL_EVENT = "pollEvent.service";
-
- /** Wait for the provided SlcExecution to be finished. */
- public void waitForSlcExecutionFinished(SlcExecution slcExecution,
- Long timeout);
-
- /** Block until one of the registered event is finished. */
- public SlcEvent pollEvent(Long timeout);
-
- /** Register an event type. */
- public ExecutionAnswer addEventListener(String eventType, String eventFilter);
-
- /** Unregister an event type. */
- public ExecutionAnswer removeEventListener(String eventType,
- String eventFilter);
-
- /** Wait for one agent to be available. */
- public SlcAgentDescriptor waitForOneAgent();
-
- /** Wait for the http server to be ready. */
- public void waitForServerToBeReady();
-
- /** Start an execution flow on the given agent. */
- public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow);
-
- /** Assume one agent and one version per module. */
- public SlcExecution startFlowDefault(String moduleName, String flowName,
- Map<String, Object> args);
-
- /** List execution modules descriptors. */
- public List<ExecutionModuleDescriptor> listModuleDescriptors(String agentId);
-
- /** Retrieve a single execution module descriptor. */
- public ExecutionModuleDescriptor getModuleDescriptor(String agentId,
- String moduleName, String version);
-
-}
+++ /dev/null
-package org.argeo.slc.server.client.impl;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Reader;
-import java.io.Writer;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.net.URLEncoder;
-import java.util.Map;
-
-import javax.xml.transform.Source;
-import javax.xml.transform.stream.StreamResult;
-import javax.xml.transform.stream.StreamSource;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.argeo.slc.Condition;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.server.client.HttpServicesClient;
-import org.springframework.oxm.Marshaller;
-import org.springframework.oxm.Unmarshaller;
-import org.springframework.util.Assert;
-
-public abstract class AbstractHttpServicesClient implements HttpServicesClient {
- private final static Log log = LogFactory
- .getLog(AbstractHttpServicesClient.class);
- private Unmarshaller unmarshaller;
- private Marshaller marshaller;
- private String baseUrl;
- private String encoding = "UTF-8";
-
- private Long retryPeriod = 1000l;
- private Long defaultTimeout = 30 * 1000l;
-
- @SuppressWarnings(value = { "unchecked" })
- public <T> T callService(String path, Map<String, String> parameters) {
- return (T)callService(path, parameters, null);
- }
-
- @SuppressWarnings(value = { "unchecked" })
- public <T> T callService(String path, Map<String, String> parameters,
- Object body) {
- try {
- return (T) callServiceLowLevel(path, parameters, body);
- } catch (Exception e) {
- throw new SlcException("Cannot call service " + path + " on "
- + baseUrl, e);
- }
- }
-
- @SuppressWarnings(value = { "unchecked" })
- public <T> T callServiceSafe(String path, Map<String, String> parameters,
- Condition<T> condition, Long timeout) {
-
- long begin = System.currentTimeMillis();
- try {
- Object obj = null;
- while (System.currentTimeMillis() - begin < timeout(timeout)) {
- try {
- obj = callServiceLowLevel(path, parameters, null);
- } catch (IOException e) {
- if (log.isTraceEnabled())
- log.trace("Exception when calling service " + path
- + " on " + baseUrl, e);
- }
-
- if (obj != null) {
- if (condition == null)
- break;
- else {
- if (condition.check((T) obj))
- break;
- }
- }
- // wait a bit
- try {
- Thread.sleep(retryPeriod);
- } catch (InterruptedException e) {
- // silent
- }
- }
-
- if (obj == null)
- throw new SlcException(
- "Service "
- + path
- + " on "
- + baseUrl
- + " did not return an answer after calling it safely for "
- + timeout(timeout) + " ms.");
- return (T) obj;
- } catch (Exception e) {
- throw new SlcException(
- "Unexpected exception when safely calling service " + path
- + " on " + baseUrl, e);
- }
- }
-
- protected Object callServiceLowLevel(String path,
- Map<String, String> parameters, Object body) throws IOException {
- Assert.notNull(baseUrl, "base url");
- HttpURLConnection connection = null;
- Writer writer = null;
- Reader reader = null;
- try {
- URL url = createUrl(path, parameters);
- connection = (HttpURLConnection) url.openConnection();
-
- if (body != null) {
- connection.setRequestMethod("POST");
- connection.setDoOutput(true);
- connection.setDoInput(true);
- connection.setUseCaches(false);
- connection.setAllowUserInteraction(false);
- connection.setRequestProperty("Content-type",
- "text/xml; charset=" + encoding);
- }
-
- // Establish the connection
- connection.connect();
-
- if (body != null) {
- writer = new OutputStreamWriter(connection.getOutputStream(),
- encoding);
- StreamResult result = new StreamResult(writer);
- marshaller.marshal(body, result);
- writer.flush();
- IOUtils.closeQuietly(writer);
- }
-
- // Read answer
- reader = new InputStreamReader(connection.getInputStream(),
- encoding);
- Source source = new StreamSource(reader);
- Object obj = unmarshaller.unmarshal(source);
- return obj;
- } finally {
- IOUtils.closeQuietly(reader);
- IOUtils.closeQuietly(writer);
- if (connection != null) {
- connection.disconnect();
- }
- }
- }
-
- protected URL createUrl(String service, Map<String, String> parameters) {
- // URL encoded with UTF-8, as recommended by W3C
- final String urlEncoding = "UTF-8";
-
- StringBuffer buf = new StringBuffer(baseUrl + service);
- try {
- if (parameters != null && parameters.size() != 0) {
- buf.append('?');
- boolean first = true;
- for (String key : parameters.keySet()) {
- String value = parameters.get(key);
- if (value != null) {
- if (first)
- first = false;
- else
- buf.append('&');
- String keyEncoded = URLEncoder.encode(key, urlEncoding);
- String valueEncoded = URLEncoder.encode(value,
- urlEncoding);
- buf.append(keyEncoded).append('=').append(valueEncoded);
- }
- }
- }
-
- return new URL(buf.toString());
- } catch (Exception e) {
- throw new SlcException("Cannot create URL: " + buf, e);
- }
- }
-
- public Long timeout(Long timeout) {
- if (timeout == null)
- timeout = getDefaultTimeout();
- return timeout;
- }
-
- public void setUnmarshaller(Unmarshaller unmarshaller) {
- this.unmarshaller = unmarshaller;
- }
-
- public void setBaseUrl(String baseUrl) {
- this.baseUrl = baseUrl;
- }
-
- public Long getRetryPeriod() {
- return retryPeriod;
- }
-
- /** Retry period in ms when accessing service safely. Default is 1000 ms. */
- public void setRetryPeriod(Long retryPeriod) {
- this.retryPeriod = retryPeriod;
- }
-
- public void setMarshaller(Marshaller marshaller) {
- this.marshaller = marshaller;
- }
-
- /** Default is UTF-8. */
- public void setEncoding(String encoding) {
- this.encoding = encoding;
- }
-
- /** Default is 30s */
- public void setDefaultTimeout(Long defaultTimeout) {
- this.defaultTimeout = defaultTimeout;
- }
-
- public Long getDefaultTimeout() {
- return defaultTimeout;
- }
-
-}
+++ /dev/null
-package org.argeo.slc.server.client.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.argeo.slc.Condition;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.execution.ExecutionFlowDescriptor;
-import org.argeo.slc.execution.ExecutionModuleDescriptor;
-import org.argeo.slc.msg.ExecutionAnswer;
-import org.argeo.slc.msg.MsgConstants;
-import org.argeo.slc.msg.ObjectList;
-import org.argeo.slc.msg.event.SlcEvent;
-import org.argeo.slc.process.RealizedFlow;
-import org.argeo.slc.process.SlcExecution;
-import org.argeo.slc.runtime.SlcAgentDescriptor;
-import org.argeo.slc.server.client.SlcServerHttpClient;
-
-public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
- implements SlcServerHttpClient {
-
- protected final static String PARAM_AGENT_ID = "agentId";
-
- private final static Log log = LogFactory
- .getLog(SlcServerHttpClientImpl.class);
-
- private Long serverReadyTimeout = 120 * 1000l;
-
- public void waitForSlcExecutionFinished(SlcExecution slcExecution,
- Long timeout) {
- if (slcExecution.getStatus().equals(SlcExecution.STATUS_FINISHED))
- return;
-
- long begin = System.currentTimeMillis();
- while (System.currentTimeMillis() - begin < timeout(timeout)) {
- SlcEvent event = pollEvent(timeout);
- String slcExecutionId = event.getHeaders().get(
- MsgConstants.PROPERTY_SLC_EXECUTION_ID);
- String status = event.getHeaders().get(
- MsgConstants.PROPERTY_SLC_EXECUTION_STATUS);
- if (slcExecutionId.equals(slcExecution.getUuid())
- && status.equals(SlcExecution.STATUS_FINISHED)) {
- return;
- }
- }
- throw new SlcException("SLC Execution not completed after timeout "
- + timeout(timeout) + " elapsed.");
- }
-
- public SlcEvent pollEvent(Long timeout) {
- long begin = System.currentTimeMillis();
- while (System.currentTimeMillis() - begin < timeout(timeout)) {
- Object obj = callService(POLL_EVENT, null);
- if (obj instanceof ExecutionAnswer) {
- ExecutionAnswer answer = (ExecutionAnswer) obj;
- if (answer.isError())
- throw new SlcException(
- "Unexpected exception when polling event: "
- + answer.getMessage());
- } else {
- return (SlcEvent) obj;
- }
- }
- throw new SlcException("No event received after timeout "
- + timeout(timeout) + " elapsed.");
- }
-
- public ExecutionAnswer addEventListener(String eventType, String eventFilter) {
- Map<String, String> parameters = new HashMap<String, String>();
- parameters.put(SlcEvent.EVENT_TYPE, eventType);
- parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
- return callService(ADD_EVENT_LISTENER, parameters);
- }
-
- public ExecutionAnswer removeEventListener(String eventType,
- String eventFilter) {
- Map<String, String> parameters = new HashMap<String, String>();
- parameters.put(SlcEvent.EVENT_TYPE, eventType);
- parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
- return callService(REMOVE_EVENT_LISTENER, parameters);
- }
-
- public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow) {
- SlcExecution slcExecution = new SlcExecution();
- slcExecution.setUuid(UUID.randomUUID().toString());
-
- slcExecution.getRealizedFlows().add(realizedFlow);
-
- Map<String, String> parameters = new HashMap<String, String>();
- parameters.put(MsgConstants.PROPERTY_SLC_AGENT_ID, agentId);
- ExecutionAnswer answer = callService(NEW_SLC_EXECUTION, parameters,
- slcExecution);
- if (!answer.isOk())
- throw new SlcException("Could not start flow on agent " + agentId
- + ": " + answer.getMessage());
- return slcExecution;
- }
-
- public SlcExecution startFlowDefault(String moduleName, String flowName,
- Map<String, Object> args) {
- SlcAgentDescriptor agentDescriptor = waitForOneAgent();
- List<ExecutionModuleDescriptor> lst = listModuleDescriptors(agentDescriptor
- .getUuid());
- ExecutionModuleDescriptor moduleDescMinimal = findModule(lst,
- moduleName);
- if (moduleDescMinimal == null)
- throw new SlcException("Cannot find module " + moduleName);
- String moduleVersion = moduleDescMinimal.getVersion();
-
- ExecutionModuleDescriptor moduleDesc = getModuleDescriptor(
- agentDescriptor.getUuid(), moduleName, moduleVersion);
-
- RealizedFlow realizedFlow = new RealizedFlow();
- realizedFlow.setModuleName(moduleName);
- realizedFlow.setModuleVersion(moduleDesc.getVersion());
-
- ExecutionFlowDescriptor flowDescriptor = findFlow(moduleDesc, flowName);
- if (args != null) {
- for (String key : args.keySet()) {
- if (flowDescriptor.getValues().containsKey(key)) {
- flowDescriptor.getValues().put(key, args.get(key));
- }
- }
- }
- realizedFlow.setFlowDescriptor(flowDescriptor);
-
- return startFlow(agentDescriptor.getUuid(), realizedFlow);
-
- // FIXME: polling not working when called from test: no unique
- // session is created on server side
- // SlcExecution slcExecutionFinished = null;
- // try {
- // addEventListener(
- // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
- // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(),
- // realizedFlow);
- //
- // waitForSlcExecutionFinished(slcExecution, null);
- //
- // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null);
- // for (Serializable sr : ol.getObjects()) {
- // SlcExecution se = (SlcExecution) sr;
- // if (se.getUuid().equals(slcExecution.getUuid())) {
- // slcExecutionFinished = se;
- // break;
- // }
- // }
- //
- // } finally {
- // removeEventListener(
- // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
- // }
- //
- // if (slcExecutionFinished == null)
- // throw new SlcException("No finished SLC Execution.");
- // return slcExecutionFinished;
- }
-
- public static ExecutionModuleDescriptor findModule(
- List<ExecutionModuleDescriptor> lst, String moduleName) {
- ExecutionModuleDescriptor moduleDesc = null;
- for (ExecutionModuleDescriptor desc : lst) {
- if (desc.getName().equals(moduleName)) {
- if (moduleDesc != null)
- throw new SlcException(
- "There is more than one module named " + moduleName
- + " (versions: " + moduleDesc + " and "
- + desc.getVersion() + ")");
- moduleDesc = desc;
- }
- }
- return moduleDesc;
- }
-
- public static ExecutionFlowDescriptor findFlow(
- ExecutionModuleDescriptor moduleDesc, String flowName) {
- ExecutionFlowDescriptor flowDesc = null;
- for (ExecutionFlowDescriptor desc : moduleDesc.getExecutionFlows()) {
- if (desc.getName().equals(flowName)) {
- flowDesc = desc;
- }
- }
- return flowDesc;
- }
-
- public List<ExecutionModuleDescriptor> listModuleDescriptors(String agentId) {
- Map<String, String> parameters = new HashMap<String, String>();
- parameters.put(PARAM_AGENT_ID, agentId);
-
- List<ExecutionModuleDescriptor> moduleDescriptors = new ArrayList<ExecutionModuleDescriptor>();
- ObjectList ol = callService(LIST_MODULE_DESCRIPTORS, parameters);
- ol.fill(moduleDescriptors);
- return moduleDescriptors;
- }
-
- public ExecutionModuleDescriptor getModuleDescriptor(String agentId,
- String moduleName, String version) {
- Map<String, String> parameters = new HashMap<String, String>();
- parameters.put(PARAM_AGENT_ID, agentId);
- parameters.put("moduleName", moduleName);
- parameters.put("version", version);
- ExecutionModuleDescriptor moduleDescriptor = callService(
- GET_MODULE_DESCRIPTOR, parameters);
- return moduleDescriptor;
- }
-
- public SlcAgentDescriptor waitForOneAgent() {
- ObjectList objectList = callServiceSafe(LIST_AGENTS, null,
- new Condition<ObjectList>() {
- public Boolean check(ObjectList obj) {
- int size = obj.getObjects().size();
- if (log.isTraceEnabled())
- log.trace("Object list size: " + size);
- return size == 1;
- }
- }, null);
- return (SlcAgentDescriptor) objectList.getObjects().get(0);
- }
-
- public void waitForServerToBeReady() {
- ExecutionAnswer answer = callServiceSafe(IS_SERVER_READY, null, null,
- serverReadyTimeout);
- if (!answer.isOk())
- throw new SlcException("Server is not ready: " + answer);
- }
-
- /**
- * Timeout in ms after which the client will stop waiting for the server to
- * be ready and throw an exception. Default is 120s.
- */
- public void setServerReadyTimeout(Long serverReadyTimeout) {
- this.serverReadyTimeout = serverReadyTimeout;
- }
-
-}
+++ /dev/null
-package org.argeo.slc.server.unit;
-
-import org.argeo.slc.msg.ExecutionAnswer;
-import org.argeo.slc.server.client.SlcServerHttpClient;
-import org.argeo.slc.unit.AbstractSpringTestCase;
-
-public abstract class AbstractHttpClientTestCase extends AbstractSpringTestCase {
- private SlcServerHttpClient httpClient = null;
-
- protected void setUp() throws Exception {
- super.setUp();
- httpClient = createHttpClient();
- httpClient.waitForServerToBeReady();
- }
-
- protected SlcServerHttpClient createHttpClient() {
- SlcServerHttpClient httpClient = getBean(SlcServerHttpClient.class);
- return httpClient;
- }
-
- protected SlcServerHttpClient getHttpClient() {
- return httpClient;
- }
-
- protected void assertAnswerOk(ExecutionAnswer answer) {
- if (!answer.isOk()) {
- fail("Server execution answer is not ok: " + answer.getMessage());
- }
- }
-}
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:aop="http://www.springframework.org/schema/aop"
- xmlns:tx="http://www.springframework.org/schema/tx"
- xsi:schemaLocation="
- http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd"
- default-lazy-init="true">
-
- <import resource="classpath:org/argeo/slc/castor/spring.xml" />
-
- <!-- Http client -->
- <bean id="slcDefault.serverHttpClient" class="org.argeo.slc.server.client.impl.SlcServerHttpClientImpl"
- lazy-init="true">
- <property name="unmarshaller" ref="slcDefault.castor.marshaller" />
- <property name="marshaller" ref="slcDefault.castor.marshaller" />
- <property name="baseUrl" value="http://localhost:7070/org.argeo.slc.webapp/" />
- </bean>
-</beans>
\ No newline at end of file
--- /dev/null
+package org.argeo.slc.server;
+
+public interface HttpServices {
+ public final static String LIST_AGENTS = "listAgents.service";
+ public final static String IS_SERVER_READY = "isServerReady.service";
+ public final static String NEW_SLC_EXECUTION = "newSlcExecution.service";
+ public final static String LIST_SLC_EXECUTIONS = "listSlcExecutions.service";
+ public final static String GET_MODULE_DESCRIPTOR = "getExecutionDescriptor.service";
+ public final static String LIST_MODULE_DESCRIPTORS = "listModulesDescriptors.service";
+ public final static String LIST_RESULTS = "listResults.service";
+ public final static String ADD_EVENT_LISTENER = "addEventListener.service";
+ public final static String REMOVE_EVENT_LISTENER = "removeEventListener.service";
+ public final static String POLL_EVENT = "pollEvent.service";
+
+}
}
public void messageLogged(BuildEvent event) {
- log.info(event.getMessage());
+ if (event.getPriority() == Project.MSG_DEBUG) {
+ if (log.isTraceEnabled())
+ log.trace(event.getMessage());
+ } else if (event.getPriority() == Project.MSG_VERBOSE) {
+ if (log.isDebugEnabled())
+ log.debug(event.getMessage());
+ } else if (event.getPriority() == Project.MSG_INFO) {
+ log.info(event.getMessage());
+
+ } else if (event.getPriority() == Project.MSG_WARN) {
+ log.warn(event.getMessage());
+
+ } else if (event.getPriority() == Project.MSG_ERR) {
+ log.error(event.getMessage());
+ } else {
+ log.error(event.getMessage());
+ }
}
public void targetFinished(BuildEvent event) {