</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
<dependencies>
+ <!-- Absolutely minimal SLC Agent -->
<dependency>
<groupId>org.argeo.slc</groupId>
- <artifactId>org.argeo.slc.launcher</artifactId>
+ <artifactId>org.argeo.slc.core</artifactId>
+ <version>1.1.12-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.argeo.slc</groupId>
+ <artifactId>org.argeo.slc.support.osgi</artifactId>
<version>1.1.12-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.argeo.slc</groupId>
+ <artifactId>org.argeo.slc.agent</artifactId>
+ <version>1.1.12-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.argeo.commons.base</groupId>
+ <artifactId>org.argeo.osgi.boot</artifactId>
+ <version>${version.argeo-commons}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.argeo.tp</groupId>
+ <artifactId>org.springframework.osgi.extender</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.argeo.commons.base</groupId>
+ <artifactId>org.argeo.dep.log4j</artifactId>
+ <version>${version.argeo-commons}</version>
+ <type>pom</type>
+ </dependency>
+ <!-- JCR Agent -->
<dependency>
<groupId>org.argeo.slc</groupId>
<artifactId>org.argeo.slc.support.jcr</artifactId>
<version>1.1.12-SNAPSHOT</version>
</dependency>
+ <!-- Launcher -->
+ <dependency>
+ <groupId>org.argeo.slc</groupId>
+ <artifactId>org.argeo.slc.launcher</artifactId>
+ <version>1.1.12-SNAPSHOT</version>
+ </dependency>
<!-- Node subset -->
<!-- Default JCR repositories configurations -->
\r
<!-- SERVICES -->\r
<service ref="attachmentUploader" interface="org.argeo.slc.core.attachment.AttachmentUploader" />\r
- \r
+\r
<service interface="org.argeo.slc.execution.ExecutionModulesListener"\r
ref="executionModulesListener" />\r
+\r
<service ref="agent" interface="org.argeo.slc.execution.SlcAgent" />\r
+ <service ref="agentCli" interface="org.argeo.slc.execution.SlcAgentCli" />\r
\r
<service ref="fileSystemManager" interface="org.apache.commons.vfs.FileSystemManager" />\r
</beans:beans>
\ No newline at end of file
<property name="modulesManager" ref="modulesManager" />\r
</bean>\r
\r
+ <bean id="agentCli" class="org.argeo.slc.core.execution.DefaultAgentCli">\r
+ <property name="agent" ref="agent" />\r
+ <property name="authenticationManager" ref="authenticationManager" />\r
+ </bean>\r
+\r
<bean id="executionModulesListener" class="org.argeo.slc.jcr.execution.JcrExecutionModulesListener"\r
init-method="init" destroy-method="destroy">\r
<property name="agent" ref="agent" />\r
SlcAgent slcAgent = findAgent(processNode);
if (slcAgent == null)
throw new SlcException("Cannot find agent for " + processNode);
- slcAgent.kill(process);
+ slcAgent.kill(process.getUuid());
} catch (Exception e) {
if (!process.getStatus().equals(ExecutionProcess.ERROR))
process.setStatus(ExecutionProcess.ERROR);
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.argeo.slc</groupId>
<artifactId>org.argeo.util</artifactId>
<version>${version.argeo-commons}</version>
</dependency>
+ <dependency>
+ <groupId>org.argeo.commons.security</groupId>
+ <artifactId>org.argeo.security.core</artifactId>
+ <version>${version.argeo-commons}</version>
+ </dependency>
<dependency>
<groupId>org.argeo.tp</groupId>
*/
package org.argeo.slc.core.execution;
+import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
+import java.net.URI;
+import java.net.URLDecoder;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.BasicNameVersion;
+import org.argeo.slc.SlcException;
import org.argeo.slc.execution.ExecutionModuleDescriptor;
import org.argeo.slc.execution.ExecutionModulesManager;
import org.argeo.slc.execution.ExecutionProcess;
/** Implements the base methods of an SLC agent. */
public class DefaultAgent implements SlcAgent {
private final static Log log = LogFactory.getLog(DefaultAgent.class);
+ /** UTF-8 charset for encoding. */
+ private final static String UTF8 = "UTF-8";
private SlcAgentDescriptor agentDescriptor;
private ExecutionModulesManager modulesManager;
/** Clean up (needs to be called by overriding method) */
public void destroy() {
-// modulesManager.unregisterProcessNotifier(this,
-// new HashMap<String, String>());
+ // modulesManager.unregisterProcessNotifier(this,
+ // new HashMap<String, String>());
}
/**
}
}
- public void kill(ExecutionProcess process) {
- String processUuid = process.getUuid();
+ public String process(List<URI> uris) {
+ DefaultProcess process = new DefaultProcess();
+ for (URI uri : uris) {
+ String[] path = uri.getPath().split("/");
+ if (path.length < 3)
+ throw new SlcException("Badly formatted URI: " + uri);
+ String module = path[1];
+ StringBuilder flow = new StringBuilder();
+ for (int i = 2; i < path.length; i++)
+ flow.append('/').append(path[i]);
+
+ Map<String, Object> values = new HashMap<String, Object>();
+ if (uri.getQuery() != null)
+ values = getQueryMap(uri.getQuery());
+
+ modulesManager.start(new BasicNameVersion(module, null));
+ ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
+ module, null);
+ process.getRealizedFlows().add(
+ emd.asRealizedFlow(flow.toString(), values));
+ }
+ process(process);
+ return process.getUuid();
+ }
+
+ public void kill(String processUuid) {
if (runningProcesses.containsKey(processUuid)) {
runningProcesses.get(processUuid).interrupt();
+ } else {
+ // assume is finished
+ }
+ }
+
+ public void waitFor(String processUuid, Long millis) {
+ if (runningProcesses.containsKey(processUuid)) {
+ try {
+ if (millis != null)
+ runningProcesses.get(processUuid).join(millis);
+ else
+ runningProcesses.get(processUuid).join();
+ } catch (InterruptedException e) {
+ // silent
+ }
+ } else {
+ // assume is finished
}
}
// {
// }
+ /*
+ * UTILITIES
+ */
+ private static Map<String, Object> getQueryMap(String query) {
+ String[] params = query.split("&");
+ Map<String, Object> map = new LinkedHashMap<String, Object>();
+ for (String param : params) {
+ String name = param.split("=")[0];
+ String value = param.split("=")[1];
+ try {
+ map.put(URLDecoder.decode(name, UTF8),
+ URLDecoder.decode(value, UTF8));
+ } catch (UnsupportedEncodingException e) {
+ throw new SlcException("Cannot decode '" + param + "'", e);
+ }
+ }
+ return map;
+ }
+
/*
* BEAN
*/
--- /dev/null
+package org.argeo.slc.core.execution;
+
+import java.net.URI;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.argeo.security.OsAuthenticationToken;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.execution.SlcAgent;
+import org.argeo.slc.execution.SlcAgentCli;
+import org.springframework.security.Authentication;
+import org.springframework.security.AuthenticationManager;
+import org.springframework.security.context.SecurityContextHolder;
+
+public class DefaultAgentCli implements SlcAgentCli {
+ private final static String UTF8 = "UTF-8";
+ private SlcAgent agent;
+ private AuthenticationManager authenticationManager;
+
+ private Long timeout = 24 * 60 * 60 * 1000l;
+
+ public String process(String[] args) {
+ OsAuthenticationToken oat = new OsAuthenticationToken();
+ Authentication authentication = authenticationManager.authenticate(oat);
+ SecurityContextHolder.getContext().setAuthentication(authentication);
+
+ List<URI> uris = asURIs(args);
+ String processUuid = agent.process(uris);
+ agent.waitFor(processUuid, timeout);
+ return processUuid;
+ }
+
+ public static List<URI> asURIs(String[] args) {
+ try {
+ List<URI> uris = new ArrayList<URI>();
+ List<String> leftOvers = new ArrayList<String>();
+
+ Boolean hasArgs = false;
+ String currKey = null;
+ StringBuilder currUri = null;
+ Iterator<String> argIt = Arrays.asList(args).iterator();
+ while (argIt.hasNext()) {
+ String arg = argIt.next();
+ if (!arg.startsWith("-")) {
+ if (currKey != null) {// value
+ currUri.append(URLEncoder.encode(arg, UTF8));
+ currKey = null;
+ } else { // module
+ if (currUri != null)
+ uris.add(new URI(currUri.toString()));
+ currUri = new StringBuilder("flow:");
+
+ String currModule = arg;
+ currUri.append('/').append(currModule);
+ if (!arg.contains("/")) {
+ // flow path not in arg go to next arg
+ String currFlow = argIt.next();
+ if (!currFlow.startsWith("/"))
+ currFlow = "/" + currFlow;
+ currUri.append(currFlow);
+ }
+ }
+ } else {
+ if (currUri == null) {// first args
+ leftOvers.add(arg);
+ } else {
+ if (!hasArgs) {
+ currUri.append('?');
+ hasArgs = true;
+ } else {
+ currUri.append('&');
+ }
+
+ // deal with boolean keys
+ if (currKey != null) {// value
+ currUri.append(URLEncoder.encode("true", UTF8));
+ currKey = null;
+ }
+
+ String key;
+ if (arg.startsWith("--"))
+ key = arg.substring(2);
+ else if (arg.startsWith("-"))
+ key = arg.substring(1);
+ else
+ throw new SlcException("Cannot intepret key: "
+ + arg);
+ currKey = key;
+ currUri.append(URLEncoder.encode(key, UTF8))
+ .append('=');
+ }
+ }
+ }
+ if (currUri != null)
+ uris.add(new URI(currUri.toString()));
+ return uris;
+ } catch (Exception e) {
+ throw new SlcException("Cannot convert " + Arrays.toString(args)
+ + " to flow URI", e);
+ }
+ }
+
+ public void setAgent(SlcAgent agent) {
+ this.agent = agent;
+ }
+
+ public void setAuthenticationManager(
+ AuthenticationManager authenticationManager) {
+ this.authenticationManager = authenticationManager;
+ }
+
+ public void setTimeout(Long timeout) {
+ this.timeout = timeout;
+ }
+
+}
}
- ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(name, values,
- executionSpec);
+ ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(name, null,
+ values, executionSpec);
if (executionFlow.getPath() != null)
efd.setPath(executionFlow.getPath());
else
executionModulesManager.upgrade(realizedFlow
.getModuleNameVersion());
+ executionModulesManager.start(realizedFlow.getModuleNameVersion());
// START FLOW
executionModulesManager.execute(realizedFlow);
// END FLOW
--- /dev/null
+package org.argeo.slc.core.execution;
+
+import java.net.URI;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+public class DefaultAgentCliTest extends TestCase {
+ public void testArgsToUris() {
+ String[] args = { "org.argeo.slc.demo.minimal", "HelloWorld/WithVar",
+ "--testKey", "555" };
+ List<URI> uris = DefaultAgentCli.asURIs(args);
+ assertEquals(1, uris.size());
+ assertEquals(
+ "flow:/org.argeo.slc.demo.minimal/HelloWorld/WithVar?testKey=555",
+ uris.get(0).toString());
+ }
+}
--- /dev/null
+NIX {
+ com.sun.security.auth.module.UnixLoginModule required debug=true;
+};
\ No newline at end of file
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
-
<configuration>
<instructions>
<Main-Class>org.argeo.slc.cli.SlcMain</Main-Class>
</plugins>
</build>
<dependencies>
- <!-- SLC Agent -->
- <dependency>
- <groupId>org.argeo.slc</groupId>
- <artifactId>org.argeo.slc.core</artifactId>
- <version>1.1.12-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.argeo.slc</groupId>
- <artifactId>org.argeo.slc.support.osgi</artifactId>
- <version>1.1.12-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.argeo.slc</groupId>
- <artifactId>org.argeo.slc.agent</artifactId>
- <version>1.1.12-SNAPSHOT</version>
- </dependency>
-
- <!-- OSGi Boot (and Equinox) -->
<dependency>
<groupId>org.argeo.commons.base</groupId>
<artifactId>org.argeo.osgi.boot</artifactId>
<version>${version.argeo-commons}</version>
</dependency>
-
- <!-- Spring OSGi -->
- <dependency>
- <groupId>org.argeo.tp</groupId>
- <artifactId>org.springframework.osgi.extender</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.argeo.tp</groupId>
- <artifactId>org.apache.commons.cli</artifactId>
- </dependency>
-
- <!-- Logging -->
- <dependency>
- <groupId>org.argeo.commons.base</groupId>
- <artifactId>org.argeo.dep.log4j</artifactId>
- <version>${version.argeo-commons}</version>
- <type>pom</type>
- </dependency>
</dependencies>
</project>
\ No newline at end of file
package org.argeo.slc.cli;
import java.io.File;
-import java.io.FileInputStream;
+import java.lang.reflect.Method;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
-import java.util.Properties;
+import java.util.Map;
+import java.util.ServiceLoader;
import java.util.UUID;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.io.IOUtils;
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginContext;
+
import org.argeo.osgi.boot.OsgiBoot;
-import org.argeo.slc.SlcException;
-import org.eclipse.core.runtime.adaptor.EclipseStarter;
-import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
+import org.osgi.framework.launch.Framework;
+import org.osgi.framework.launch.FrameworkFactory;
+import org.osgi.util.tracker.ServiceTracker;
-@SuppressWarnings("restriction")
+/** Configures an SLC runtime and runs a process. */
public class SlcMain implements Runnable {
- /** Unique launch module */
- public final static String UNIQUE_LAUNCH_MODULE_PROPERTY = "slc.launch.module";
-
- /** Unique launch flow */
- public final static String UNIQUE_LAUNCH_FLOW_PROPERTY = "slc.launch.flow";
-
- /** Unique launch flow */
- public final static String UNIQUE_LAUNCH_ARGS_PROPERTY_BASE = "slc.launch.args";
-
- private final Options options = new Options();
private final String[] args;
- private final String commandName = "slc";
-
// private static String bundlesToInstall = "/usr/share/osgi;in=*.jar";
private String bundlesToInstall = System.getProperty("user.home")
+ "/dev/src/slc/dep/org.argeo.slc.dep.minimal/target/dependency;in=*.jar,"
public SlcMain(String[] args) {
this.args = args;
- // bundlesToStart.add("org.springframework.osgi.extender");
- // bundlesToStart.add("org.argeo.slc.agent");
-
bundlesToStart.add("org.springframework.osgi.extender");
bundlesToStart.add("org.argeo.node.repo.jackrabbit");
bundlesToStart.add("org.argeo.security.dao.os");
bundlesToStart.add("org.argeo.slc.agent.jcr");
}
- @SuppressWarnings("unchecked")
public void run() {
- String module = null;
- String moduleUrl = null;
- String flow = null;
-
+ final LoginContext lc;
try {
+ // Authenticate
+ lc = new LoginContext("NIX");
+ lc.login();
- CommandLineParser clParser = new GnuParser();
- CommandLine cl = clParser.parse(options, args);
-
- List<String> arguments = cl.getArgList();
- if (arguments.size() == 0) {
- // TODO default behaviour
- } else {
- module = arguments.get(0);
- File moduleFile = new File(module);
- if (moduleFile.exists()) {
- if (moduleFile.isDirectory()) {
- moduleUrl = "reference:file:"
- + moduleFile.getCanonicalPath();
- } else {
- moduleUrl = "file:" + moduleFile.getCanonicalPath();
- }
- }
-
- if (arguments.size() == 1) {
- // TODO module info
- } else {
- flow = arguments.get(1);
- }
- }
-
+ // Prepare directories
String executionDir = System.getProperty("user.dir");
File slcDir = new File(executionDir, "target/.slc");
File tempDir = new File(System.getProperty("java.io.tmpdir"));
if (!confDir.exists())
confDir.mkdirs();
- BundleContext bundleContext = null;
- try {
- String[] osgiRuntimeArgs = { "-configuration",
- confDir.getCanonicalPath(), "-data",
- dataDir.getCanonicalPath(), "-console", "-clean" };
- bundleContext = EclipseStarter.startup(osgiRuntimeArgs, null);
- } catch (Exception e) {
- throw new RuntimeException("Cannot start Equinox.", e);
- }
+ System.setProperty("log4j.configuration", "file:./log4j.properties");
+ System.setProperty("argeo.node.repo.configuration",
+ "osgibundle:repository-memory.xml");
+
+ // Start Equinox
+ ServiceLoader<FrameworkFactory> ff = ServiceLoader
+ .load(FrameworkFactory.class);
+ FrameworkFactory frameworkFactory = ff.iterator().next();
+ Map<String, String> configuration = new HashMap<String, String>();
+ configuration.put("osgi.configuration.area",
+ confDir.getCanonicalPath());
+ configuration.put("osgi.instance.area", dataDir.getCanonicalPath());
+ configuration.put("osgi.clean", "true");
+
+ // Spring configs currently require System properties
+ System.getProperties().putAll(configuration);
+
+ Framework framework = frameworkFactory.newFramework(configuration);
+ framework.start();
+ BundleContext bundleContext = framework.getBundleContext();
+ // String[] osgiRuntimeArgs = { "-configuration",
+ // confDir.getCanonicalPath(), "-data",
+ // dataDir.getCanonicalPath(), "-clean" };
+ // BundleContext bundleContext = EclipseStarter.startup(
+ // osgiRuntimeArgs, null);
// OSGi bootstrap
OsgiBoot osgiBoot = new OsgiBoot(bundleContext);
osgiBoot.installUrls(osgiBoot.getBundlesUrls(bundlesToInstall));
- if (moduleUrl != null) {
- Bundle bundle = osgiBoot.installUrl(moduleUrl);
- module = bundle.getSymbolicName();
- // TODO deal with version
- }
-
- System.setProperty(UNIQUE_LAUNCH_MODULE_PROPERTY, module);
- System.setProperty(UNIQUE_LAUNCH_FLOW_PROPERTY, flow);
- System.setProperty("log4j.configuration", "file:./log4j.properties");
- System.setProperty("argeo.node.repo.configuration",
- "osgibundle:repository-memory.xml");
- // start runtime
+ // Start runtime
osgiBoot.startBundles(bundlesToStart);
- } catch (ParseException e) {
- System.err.println("Problem with command line arguments. "
- + e.getMessage());
- badExit();
- } catch (SlcException e) {
- System.err.println(e.getMessage());
- badExit();
+ // Find SLC Agent
+ ServiceTracker agentTracker = new ServiceTracker(bundleContext,
+ "org.argeo.slc.execution.SlcAgentCli", null);
+ agentTracker.open();
+ final Object agentCli = agentTracker.waitForService(30 * 1000);
+
+ // Run as a privileged action
+ Subject.doAs(Subject.getSubject(AccessController.getContext()),
+ new PrivilegedAction<String>() {
+
+ public String run() {
+ try {
+ Class<?>[] parameterTypes = { String[].class };
+ Method method = agentCli.getClass().getMethod(
+ "process", parameterTypes);
+ Object[] methodArgs = { args };
+ Object ret = method
+ .invoke(agentCli, methodArgs);
+ return ret.toString();
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot run "
+ + Arrays.toString(args) + " on "
+ + agentCli, e);
+ }
+ }
+
+ });
+
+ // Shutdown OSGi runtime
+ framework.stop();
+ framework.waitForStop(60 * 1000);
+
+ System.exit(0);
} catch (Exception e) {
- System.err.println("Unexpected exception when bootstrapping.");
e.printStackTrace();
- badExit();
+ System.exit(1);
}
}
new SlcMain(args).run();
}
- public void printUsage() {
- new HelpFormatter().printHelp(commandName, options, true);
- }
-
- protected static void addProperty(Properties properties, String property) {
- int eqIndex = property.indexOf('=');
- if (eqIndex == 0)
- throw new SlcException("Badly formatted property " + property);
-
- if (eqIndex > 0) {
- String key = property.substring(0, eqIndex);
- String value = property.substring(eqIndex + 1);
- properties.setProperty(key, value);
-
- } else {
- properties.setProperty(property, "true");
- }
- }
-
- protected static void loadPropertyFile(Properties properties,
- String propertyFile) {
- FileInputStream in = null;
- try {
- in = new FileInputStream(propertyFile);
- properties.load(in);
- } catch (Exception e) {
- throw new SlcException("Could not load proeprty file "
- + propertyFile);
- } finally {
- IOUtils.closeQuietly(in);
- }
- }
-
- private void badExit() {
- printUsage();
- System.exit(1);
- }
-
protected static void info(Object msg) {
System.out.println(msg);
}
protected static void debug(Object msg) {
System.out.println(msg);
}
+
}
package org.argeo.slc.execution;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.Map;
/**
* Generally, values object are either a <code>PrimitiveAccessor</code> or a
* <code>RefValue</code> but can be other objects.
*/
-public class ExecutionFlowDescriptor implements Serializable {
+public class ExecutionFlowDescriptor implements Serializable, Cloneable {
private static final long serialVersionUID = 7101944857038041216L;
private String name;
private String description;
public ExecutionFlowDescriptor() {
}
- public ExecutionFlowDescriptor(String name, Map<String, Object> values,
- ExecutionSpec executionSpec) {
+ public ExecutionFlowDescriptor(String name, String description,
+ Map<String, Object> values, ExecutionSpec executionSpec) {
this.name = name;
this.values = values;
this.executionSpec = executionSpec;
}
+ /** The referenced {@link ExecutionSpec} is NOT cloned. */
+ @Override
+ protected Object clone() throws CloneNotSupportedException {
+ return new ExecutionFlowDescriptor(name, description,
+ new HashMap<String, Object>(values), executionSpec);
+ }
+
public String getName() {
return name;
}
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import org.argeo.slc.SlcException;
import org.argeo.slc.deploy.ModuleDescriptor;
/** Describes the information required to launch a flow */
return executionFlows;
}
+ /**
+ * Returns a new {@link ExecutionModuleDescriptor} that can be used to build
+ * a {@link RealizedFlow}.
+ */
+ public ExecutionFlowDescriptor cloneFlowDescriptor(String name) {
+ ExecutionFlowDescriptor res = null;
+ for (ExecutionFlowDescriptor efd : executionFlows) {
+ if (efd.getName().equals(name)
+ || ("/" + efd.getName()).equals(name)) {
+ try {
+ res = (ExecutionFlowDescriptor) efd.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new SlcException("Cannot clone " + efd, e);
+ }
+ }
+ }
+ if (res == null)
+ throw new SlcException("Flow " + name + " not found.");
+ return res;
+ }
+
+ public RealizedFlow asRealizedFlow(String flow, Map<String, Object> values) {
+ RealizedFlow realizedFlow = new RealizedFlow();
+ realizedFlow.setFlowDescriptor(cloneFlowDescriptor(flow));
+ realizedFlow.setModuleName(getName());
+ realizedFlow.setModuleVersion(getVersion());
+ realizedFlow.getFlowDescriptor().getValues().putAll(values);
+ return realizedFlow;
+ }
+
public void setExecutionSpecs(List<ExecutionSpec> executionSpecs) {
this.executionSpecs = executionSpecs;
}
*/
package org.argeo.slc.execution;
+import java.net.URI;
import java.util.List;
/**
/** Execute / take part to this process */
public void process(ExecutionProcess process);
+ /**
+ * Asynchronously processes the flows defined as URIs, or interpret a single
+ * UUID URN as a scheduled or template process.
+ *
+ * @return the UUID of the process launched.
+ */
+ public String process(List<URI> uris);
+
/** Kills this process */
- public void kill(ExecutionProcess process);
+ public void kill(String processUuid);
+
+ /**
+ * Wait for this process to finish. returns immediately if it does not
+ * exist.
+ *
+ * @param millis
+ * can be null
+ */
+ public void waitFor(String processUuid, Long millis);
/**
* Describe all the flows provided by this execution module. Typically
--- /dev/null
+package org.argeo.slc.execution;
+
+/**
+ * Interpret a command line and run it in the underlying agent, with the proper
+ * authentication.
+ */
+public interface SlcAgentCli {
+ /**
+ * Synchronously executes.
+ *
+ * @return the UUID of the process
+ */
+ public String process(String[] args);
+}
protected ProcessThread createProcessThread(
ThreadGroup processesThreadGroup,
ExecutionModulesManager modulesManager, ExecutionProcess process) {
- return new JcrProcessThread(processesThreadGroup, modulesManager,
- (JcrExecutionProcess) process);
+ if (process instanceof JcrProcessThread)
+ return new JcrProcessThread(processesThreadGroup, modulesManager,
+ (JcrExecutionProcess) process);
+ else
+ return super.createProcessThread(processesThreadGroup,
+ modulesManager, process);
}
/*
super(processesThreadGroup, executionModulesManager, process);
}
+ /** Overridden in order to set progress status on realized flow nodes. */
@Override
protected void process() throws InterruptedException {
Session session = null;
- try {
- session = getJcrExecutionProcess().getRepository().login();
+ if (getProcess() instanceof JcrExecutionProcess)
+ try {
+ session = ((JcrExecutionProcess) getProcess()).getRepository()
+ .login();
- List<RealizedFlow> realizedFlows = getProcess().getRealizedFlows();
- for (RealizedFlow realizedFlow : realizedFlows) {
- Node realizedFlowNode = session
- .getNode(((JcrRealizedFlow) realizedFlow).getPath());
+ List<RealizedFlow> realizedFlows = getProcess()
+ .getRealizedFlows();
+ for (RealizedFlow realizedFlow : realizedFlows) {
+ Node realizedFlowNode = session
+ .getNode(((JcrRealizedFlow) realizedFlow).getPath());
+ setFlowStatus(realizedFlowNode, ExecutionProcess.RUNNING);
- // set status on realized flow
- realizedFlowNode.setProperty(SLC_STATUS,
- ExecutionProcess.RUNNING);
- realizedFlowNode.getSession().save();
- try {
- //
- // EXECUTE THE FLOW
- //
- execute(realizedFlow, true);
+ try {
+ //
+ // EXECUTE THE FLOW
+ //
+ execute(realizedFlow, true);
- // set status on realized flow
- realizedFlowNode.setProperty(SLC_STATUS,
- ExecutionProcess.COMPLETED);
- realizedFlowNode.getSession().save();
- } catch (RepositoryException e) {
- throw e;
- } catch (InterruptedException e) {
- // set status on realized flow
- realizedFlowNode.setProperty(SLC_STATUS,
- ExecutionProcess.KILLED);
- realizedFlowNode.getSession().save();
- throw e;
- } catch (RuntimeException e) {
- // set status on realized flow
- realizedFlowNode.setProperty(SLC_STATUS,
- ExecutionProcess.ERROR);
- realizedFlowNode.getSession().save();
- throw e;
+ setFlowStatus(realizedFlowNode,
+ ExecutionProcess.COMPLETED);
+ } catch (RepositoryException e) {
+ throw e;
+ } catch (InterruptedException e) {
+ setFlowStatus(realizedFlowNode, ExecutionProcess.KILLED);
+ throw e;
+ } catch (RuntimeException e) {
+ setFlowStatus(realizedFlowNode, ExecutionProcess.ERROR);
+ throw e;
+ }
}
+ } catch (RepositoryException e) {
+ throw new ArgeoException("Cannot process "
+ + getJcrExecutionProcess().getNodePath(), e);
+ } finally {
+ JcrUtils.logoutQuietly(session);
}
- } catch (RepositoryException e) {
- throw new ArgeoException("Cannot process "
- + getJcrExecutionProcess().getNodePath(), e);
- } finally {
- JcrUtils.logoutQuietly(session);
- }
+ else
+ super.process();
}
- //
- // /** CONFIGURE THE REALIZED FLOWS */
- // PROTECTED VOID EXECUTE(NODE REALIZEDFLOWNODE) THROWS REPOSITORYEXCEPTION,
- // INTERRUPTEDEXCEPTION {
- // IF (REALIZEDFLOWNODE.HASNODE(SLC_ADDRESS)) {
- // STRING FLOWPATH = REALIZEDFLOWNODE.GETNODE(SLC_ADDRESS)
- // .GETPROPERTY(PROPERTY.JCR_PATH).GETSTRING();
- // // TODO: CONVERT TO LOCAL PATH IF REMOTE
- //
- // NODE FLOWNODE = REALIZEDFLOWNODE.GETSESSION().GETNODE(FLOWPATH);
- // STRING FLOWNAME = FLOWNODE.GETPROPERTY(SLC_NAME).GETSTRING();
- //
- // NODE EXECUTIONMODULENODE = FLOWNODE.GETSESSION().GETNODE(
- // SLCJCRUTILS.MODULEPATH(FLOWPATH));
- // STRING EXECUTIONMODULENAME = EXECUTIONMODULENODE.GETPROPERTY(
- // SLC_NAME).GETSTRING();
- // STRING EXECUTIONMODULEVERSION = EXECUTIONMODULENODE.GETPROPERTY(
- // SLC_VERSION).GETSTRING();
- //
- // REALIZEDFLOW REALIZEDFLOW = NEW REALIZEDFLOW();
- // REALIZEDFLOW.SETMODULENAME(EXECUTIONMODULENAME);
- // REALIZEDFLOW.SETMODULEVERSION(EXECUTIONMODULEVERSION);
- //
- // // RETRIEVE EXECUTION SPEC
- // DEFAULTEXECUTIONSPEC EXECUTIONSPEC = NEW DEFAULTEXECUTIONSPEC();
- // MAP<STRING, EXECUTIONSPECATTRIBUTE> ATTRS =
- // READEXECUTIONSPECATTRIBUTES(REALIZEDFLOWNODE);
- // EXECUTIONSPEC.SETATTRIBUTES(ATTRS);
- //
- // // SET EXECUTION SPEC NAME
- // IF (FLOWNODE.HASPROPERTY(SLCNAMES.SLC_SPEC)) {
- // NODE EXECUTIONSPECNODE = FLOWNODE.GETPROPERTY(SLC_SPEC)
- // .GETNODE();
- // EXECUTIONSPEC.SETBEANNAME(EXECUTIONSPECNODE.GETPROPERTY(
- // SLC_NAME).GETSTRING());
- // }
- //
- // // EXPLICITLY RETRIEVE VALUES
- // MAP<STRING, OBJECT> VALUES = NEW HASHMAP<STRING, OBJECT>();
- // FOR (STRING ATTRNAME : ATTRS.KEYSET()) {
- // EXECUTIONSPECATTRIBUTE ATTR = ATTRS.GET(ATTRNAME);
- // OBJECT VALUE = ATTR.GETVALUE();
- // VALUES.PUT(ATTRNAME, VALUE);
- // }
- //
- // EXECUTIONFLOWDESCRIPTOR EFD = NEW EXECUTIONFLOWDESCRIPTOR(FLOWNAME,
- // VALUES, EXECUTIONSPEC);
- // REALIZEDFLOW.SETFLOWDESCRIPTOR(EFD);
- //
- // //
- // // EXECUTE THE FLOW
- // //
- // EXECUTE(REALIZEDFLOW, TRUE);
- // //
- // }
- // }
- //
- // PROTECTED MAP<STRING, EXECUTIONSPECATTRIBUTE>
- // READEXECUTIONSPECATTRIBUTES(
- // NODE NODE) {
- // TRY {
- // MAP<STRING, EXECUTIONSPECATTRIBUTE> ATTRS = NEW HASHMAP<STRING,
- // EXECUTIONSPECATTRIBUTE>();
- // FOR (NODEITERATOR NIT = NODE.GETNODES(); NIT.HASNEXT();) {
- // NODE SPECATTRNODE = NIT.NEXTNODE();
- // IF (SPECATTRNODE
- // .ISNODETYPE(SLCTYPES.SLC_PRIMITIVE_SPEC_ATTRIBUTE)) {
- // STRING TYPE = SPECATTRNODE.GETPROPERTY(SLC_TYPE)
- // .GETSTRING();
- // OBJECT VALUE = NULL;
- // IF (SPECATTRNODE.HASPROPERTY(SLC_VALUE)) {
- // STRING VALUESTR = SPECATTRNODE.GETPROPERTY(SLC_VALUE)
- // .GETSTRING();
- // VALUE = PRIMITIVEUTILS.CONVERT(TYPE, VALUESTR);
- // }
- // PRIMITIVESPECATTRIBUTE SPECATTR = NEW PRIMITIVESPECATTRIBUTE(
- // TYPE, VALUE);
- // ATTRS.PUT(SPECATTRNODE.GETNAME(), SPECATTR);
- // } ELSE IF (SPECATTRNODE
- // .ISNODETYPE(SLCTYPES.SLC_REF_SPEC_ATTRIBUTE)) {
- // IF (!SPECATTRNODE.HASPROPERTY(SLC_VALUE)) {
- // CONTINUE;
- // }
- // INTEGER VALUE = (INT) SPECATTRNODE.GETPROPERTY(SLC_VALUE)
- // .GETLONG();
- // REFSPECATTRIBUTE SPECATTR = NEW REFSPECATTRIBUTE();
- // NODEITERATOR CHILDREN = SPECATTRNODE.GETNODES();
- // INT INDEX = 0;
- // STRING ID = NULL;
- // WHILE (CHILDREN.HASNEXT()) {
- // NODE CHILD = CHILDREN.NEXTNODE();
- // IF (INDEX == VALUE)
- // ID = CHILD.GETNAME();
- // INDEX++;
- // }
- // SPECATTR.SETVALUE(ID);
- // ATTRS.PUT(SPECATTRNODE.GETNAME(), SPECATTR);
- // }
- // // THROW NEW SLCEXCEPTION("UNSUPPORTED SPEC ATTRIBUTE "
- // // + SPECATTRNODE);
- // }
- // RETURN ATTRS;
- // } CATCH (REPOSITORYEXCEPTION E) {
- // THROW NEW SLCEXCEPTION("CANNOT READ SPEC ATTRIBUTES FROM " + NODE,
- // E);
- // }
- // }
+ protected void setFlowStatus(Node realizedFlowNode, String status)
+ throws RepositoryException {
+ realizedFlowNode.setProperty(SLC_STATUS, status);
+ realizedFlowNode.getSession().save();
+ }
protected JcrExecutionProcess getJcrExecutionProcess() {
return (JcrExecutionProcess) getProcess();
Node flowNode = realizedFlowNode.getSession().getNode(flowPath);
String flowName = flowNode.getProperty(SLC_NAME).getString();
+ String description = null;
+ if (flowNode.hasProperty(Property.JCR_DESCRIPTION))
+ description = flowNode.getProperty(Property.JCR_DESCRIPTION)
+ .getString();
Node executionModuleNode = flowNode.getSession().getNode(
SlcJcrUtils.modulePath(flowPath));
}
ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(flowName,
- values, executionSpec);
+ description, values, executionSpec);
realizedFlow.setFlowDescriptor(efd);
} else {
throw new SlcException("Unsupported realized flow "
return service;
}
+ public OsgiBundle findRelatedBundle(String moduleName, String moduleVersion) {
+ OsgiBundle osgiBundle = new OsgiBundle(moduleName, moduleVersion);
+ if (osgiBundle.getVersion() == null) {
+ Bundle bundle = findRelatedBundle(osgiBundle);
+ osgiBundle = new OsgiBundle(bundle);
+ }
+ return osgiBundle;
+ }
+
/**
* @param osgiBundle
* cannot be null
bundles: for (Iterator<OsgiBundle> iterator = executionContexts
.keySet().iterator(); iterator.hasNext();) {
OsgiBundle ob = iterator.next();
- if (ob.equals(nameVersion)) {
- osgiBundle = ob;
- break bundles;
+ if (nameVersion.getVersion() != null) {
+ if (ob.equals(nameVersion)) {
+ osgiBundle = ob;
+ break bundles;
+ }
+ } else {
+ if (ob.getName().equals(nameVersion.getName())) {
+ osgiBundle = ob;
+ break bundles;
+ }
}
}
if (osgiBundle == null)
String moduleName, String moduleVersion) {
Map<String, ExecutionFlow> flows = new HashMap<String, ExecutionFlow>();
- OsgiBundle key = new OsgiBundle(moduleName, moduleVersion);
+ OsgiBundle key = bundlesManager.findRelatedBundle(moduleName,
+ moduleVersion);
if (!executionFlows.containsKey(key))
return flows;
Set<ExecutionFlow> flowsT = executionFlows.get(key);
protected synchronized ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter(
String moduleName, String moduleVersion) {
- OsgiBundle osgiBundle = new OsgiBundle(moduleName, moduleVersion);
- return getExecutionFlowDescriptorConverter(osgiBundle);
+ return findExecutionFlowDescriptorConverter(moduleName, moduleVersion);
+ // OsgiBundle osgiBundle = new OsgiBundle(moduleName, moduleVersion);
+ // return getExecutionFlowDescriptorConverter(osgiBundle);
}
protected synchronized ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter(
Bundle bundle = bundlesManager.findRelatedBundle(new OsgiBundle(
nameVersion));
if (bundle == null)
- throw new SlcException("Counld not find bundle for "
+ throw new SlcException("Could not find bundle for "
+ nameVersion);
bundlesManager.startSynchronous(bundle);