<version>2.1.17-SNAPSHOT</version>
</dependency>
+ <!-- OSGi Boot for platform generation only, as it could be used by regular
+ Java applications to launch an OSGi runtime. -->
+ <dependency>
+ <groupId>org.argeo.commons</groupId>
+ <artifactId>org.argeo.osgi.boot</artifactId>
+ <version>2.1.86-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- Legacy Argeo Commons platform (Eclipse 3) -->
<dependency>
<groupId>org.argeo.slc.legacy.commons</groupId>
<artifactId>org.argeo.slc.api</artifactId>
<version>2.1.17-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.argeo.slc</groupId>
+ <artifactId>org.argeo.slc.runtime</artifactId>
+ <version>2.1.17-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.argeo.slc</groupId>
+ <artifactId>org.argeo.slc.jcr</artifactId>
+ <version>2.1.17-SNAPSHOT</version>
+ </dependency>
<!-- CLI Agent -->
<!-- <dependency> -->
<SLC-ExecutionModule>default</SLC-ExecutionModule>
<!-- Minimal imports + some optional convenience imports -->
<Import-Package>
- org.springframework.cglib.proxy,
- org.springframework.cglib.core,
- org.springframework.cglib.reflect,
- org.aopalliance.aop,
- org.argeo.slc.core.execution,
- org.argeo.slc.core.execution.tasks,
+ org.springframework.cglib.proxy;resolution:="optional",
+ org.springframework.cglib.core;resolution:="optional",
+ org.springframework.cglib.reflect;resolution:="optional",
+ org.aopalliance.aop;resolution:="optional",
+ org.argeo.slc.runtime,
+ org.argeo.slc.core.execution;resolution:="optional",
+ org.argeo.slc.core.execution.tasks;resolution:="optional",
org.argeo.slc.execution,
- org.argeo.slc.osgi,
+ org.argeo.slc.osgi;resolution:="optional",
org.argeo.slc.test,
- org.springframework.aop,
- org.springframework.aop.framework,
- org.springframework.aop.scope,
- org.springframework.beans.factory.config,
- org.springframework.core.io,
+ org.springframework.aop;resolution:="optional",
+ org.springframework.aop.framework;resolution:="optional",
+ org.springframework.aop.scope;resolution:="optional",
+ org.springframework.beans.factory.config;resolution:="optional",
+ org.springframework.core.io;resolution:="optional",
${additionalImports.slc-lib},
*
</Import-Package>
interface="org.argeo.slc.execution.ExecutionModulesManager" />\r
\r
<!-- SERVICES -->\r
- <service ref="attachmentUploader" interface="org.argeo.slc.core.attachment.AttachmentUploader" />\r
+<!-- <service ref="attachmentUploader" interface="org.argeo.slc.core.attachment.AttachmentUploader" /> -->\r
\r
<service interface="org.argeo.slc.execution.ExecutionModulesListener"\r
ref="executionModulesListener" />\r
<!-- <property name="repository" ref="repository" /> -->\r
<!-- </bean> -->\r
\r
- <bean id="attachmentUploader" class="org.argeo.slc.jcr.execution.JcrAttachmentUploader">\r
-<!-- <property name="session" ref="session" /> -->\r
- </bean>\r
+<!-- <bean id="attachmentUploader" class="org.argeo.slc.jcr.execution.JcrAttachmentUploader"> -->\r
+<!-- </bean> -->\r
\r
\r
<bean\r
--- /dev/null
+package org.argeo.slc.execution;
+
+import org.argeo.slc.SlcException;
+
+/** The stack trace of such exceptions does not need to be displayed */
+public class FlowConfigurationException extends SlcException {
+ private static final long serialVersionUID = 8456260596346797321L;
+
+ public FlowConfigurationException(String message) {
+ super(message);
+ }
+}
Import-Package: javax.jcr.nodetype,\
+javax.jcr.security,\
*
\ No newline at end of file
</parent>
<artifactId>org.argeo.slc.jcr</artifactId>
<name>SLC JCR implementations</name>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <skipTests>true</skipTests>
- </configuration>
- </plugin>
- </plugins>
- </build>
<dependencies>
+ <!-- SLC -->
+ <dependency>
+ <groupId>org.argeo.slc</groupId>
+ <artifactId>org.argeo.slc.api</artifactId>
+ <version>2.1.17-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.argeo.slc</groupId>
+ <artifactId>org.argeo.slc.runtime</artifactId>
+ <version>2.1.17-SNAPSHOT</version>
+ </dependency>
+
<!-- Commons -->
<dependency>
<groupId>org.argeo.commons</groupId>
<version>${version.argeo-commons}</version>
</dependency>
- <!-- SLC -->
- <dependency>
- <groupId>org.argeo.slc</groupId>
- <artifactId>org.argeo.slc.api</artifactId>
- <version>2.1.17-SNAPSHOT</version>
- </dependency>
</dependencies>
</project>
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (C) 2007-2012 Argeo GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.argeo.slc.jcr.execution;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.UUID;
+
+import javax.jcr.Node;
+import javax.jcr.Repository;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.security.Privilege;
+
+import org.argeo.jcr.JcrUtils;
+import org.argeo.slc.SlcConstants;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.SlcNames;
+import org.argeo.slc.SlcTypes;
+import org.argeo.slc.runtime.DefaultAgent;
+import org.argeo.slc.execution.ExecutionModulesManager;
+import org.argeo.slc.execution.ExecutionProcess;
+import org.argeo.slc.jcr.SlcJcrConstants;
+import org.argeo.slc.runtime.ProcessThread;
+
+/** SLC VM agent synchronizing with a JCR repository. */
+public class JcrAgent extends DefaultAgent implements SlcNames {
+ // final static String ROLE_REMOTE = "ROLE_REMOTE";
+ final static String NODE_REPO_URI = "argeo.node.repo.uri";
+
+ private Repository repository;
+
+ private String agentNodeName = "default";
+
+ /*
+ * LIFECYCLE
+ */
+ protected String initAgentUuid() {
+ Session session = null;
+ try {
+ session = repository.login();
+
+ String agentFactoryPath = getAgentFactoryPath();
+ Node vmAgentFactoryNode = JcrUtils.mkdirsSafe(session, agentFactoryPath, SlcTypes.SLC_AGENT_FACTORY);
+ JcrUtils.addPrivilege(session, SlcJcrConstants.SLC_BASE_PATH, SlcConstants.ROLE_SLC, Privilege.JCR_ALL);
+ if (!vmAgentFactoryNode.hasNode(agentNodeName)) {
+ String uuid = UUID.randomUUID().toString();
+ Node agentNode = vmAgentFactoryNode.addNode(agentNodeName, SlcTypes.SLC_AGENT);
+ agentNode.setProperty(SLC_UUID, uuid);
+ }
+ session.save();
+ return vmAgentFactoryNode.getNode(agentNodeName).getProperty(SLC_UUID).getString();
+ } catch (RepositoryException e) {
+ JcrUtils.discardQuietly(session);
+ throw new SlcException("Cannot find JCR agent UUID", e);
+ } finally {
+ JcrUtils.logoutQuietly(session);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ super.destroy();
+ }
+
+ /*
+ * SLC AGENT
+ */
+ @Override
+ protected ProcessThread createProcessThread(ThreadGroup processesThreadGroup,
+ ExecutionModulesManager modulesManager, ExecutionProcess process) {
+ if (process instanceof JcrExecutionProcess)
+ return new JcrProcessThread(processesThreadGroup, modulesManager, (JcrExecutionProcess) process);
+ else
+ return super.createProcessThread(processesThreadGroup, modulesManager, process);
+ }
+
+ /*
+ * UTILITIES
+ */
+ public String getNodePath() {
+ return getAgentFactoryPath() + '/' + getAgentNodeName();
+ }
+
+ public String getAgentFactoryPath() {
+ try {
+ Boolean isRemote = System.getProperty(NODE_REPO_URI) != null;
+ String agentFactoryPath;
+ if (isRemote) {
+ InetAddress localhost = InetAddress.getLocalHost();
+ agentFactoryPath = SlcJcrConstants.AGENTS_BASE_PATH + "/" + localhost.getCanonicalHostName();
+
+ if (agentFactoryPath.equals(SlcJcrConstants.VM_AGENT_FACTORY_PATH))
+ throw new SlcException("Unsupported hostname " + localhost.getCanonicalHostName());
+ } else {// local
+ agentFactoryPath = SlcJcrConstants.VM_AGENT_FACTORY_PATH;
+ }
+ return agentFactoryPath;
+ } catch (UnknownHostException e) {
+ throw new SlcException("Cannot find agent factory base path", e);
+ }
+ }
+
+ /*
+ * BEAN
+ */
+ public String getAgentNodeName() {
+ return agentNodeName;
+ }
+
+ public void setRepository(Repository repository) {
+ this.repository = repository;
+ }
+
+ public void setAgentNodeName(String agentNodeName) {
+ this.agentNodeName = agentNodeName;
+ }
+}
--- /dev/null
+/*
+ * Copyright (C) 2007-2012 Argeo GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.argeo.slc.jcr.execution;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.Property;
+import javax.jcr.Repository;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.nodetype.NodeType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.jcr.JcrUtils;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.SlcNames;
+import org.argeo.slc.SlcTypes;
+import org.argeo.slc.deploy.ModuleDescriptor;
+import org.argeo.slc.execution.ExecutionFlowDescriptor;
+import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.execution.ExecutionModulesListener;
+import org.argeo.slc.execution.ExecutionModulesManager;
+import org.argeo.slc.execution.ExecutionSpec;
+import org.argeo.slc.execution.ExecutionSpecAttribute;
+import org.argeo.slc.execution.RefSpecAttribute;
+import org.argeo.slc.execution.RefValueChoice;
+import org.argeo.slc.jcr.SlcJcrUtils;
+import org.argeo.slc.primitive.PrimitiveSpecAttribute;
+import org.argeo.slc.primitive.PrimitiveValue;
+
+/**
+ * Synchronizes the local execution runtime with a JCR repository. For the time
+ * being the state is completely reset from one start to another.
+ */
+public class JcrExecutionModulesListener implements ExecutionModulesListener,
+ SlcNames {
+ private final static String SLC_EXECUTION_MODULES_PROPERTY = "slc.executionModules";
+
+ private final static Log log = LogFactory
+ .getLog(JcrExecutionModulesListener.class);
+ private JcrAgent agent;
+
+ private ExecutionModulesManager modulesManager;
+
+ private Repository repository;
+ /**
+ * We don't use a thread bound session because many different threads will
+ * call this critical component and we don't want to login each time. We
+ * therefore rather protect access to this session via synchronized.
+ */
+ private Session session;
+
+ /*
+ * LIFECYCLE
+ */
+ public void init() {
+ try {
+ session = repository.login();
+ clearAgent();
+ if (modulesManager != null) {
+ Node agentNode = session.getNode(agent.getNodePath());
+
+ List<ModuleDescriptor> moduleDescriptors = modulesManager
+ .listModules();
+
+ // scan SLC-ExecutionModule metadata
+ for (ModuleDescriptor md : moduleDescriptors) {
+ if (md.getMetadata().containsKey(
+ ExecutionModuleDescriptor.SLC_EXECUTION_MODULE)) {
+ String moduleNodeName = SlcJcrUtils
+ .getModuleNodeName(md);
+ Node moduleNode = agentNode.hasNode(moduleNodeName) ? agentNode
+ .getNode(moduleNodeName) : agentNode
+ .addNode(moduleNodeName);
+ moduleNode.addMixin(SlcTypes.SLC_EXECUTION_MODULE);
+ moduleNode.setProperty(SLC_NAME, md.getName());
+ moduleNode.setProperty(SLC_VERSION, md.getVersion());
+ moduleNode.setProperty(Property.JCR_TITLE,
+ md.getTitle());
+ moduleNode.setProperty(Property.JCR_DESCRIPTION,
+ md.getDescription());
+ moduleNode.setProperty(SLC_STARTED, md.getStarted());
+ }
+ }
+
+ // scan execution modules property
+ String executionModules = System
+ .getProperty(SLC_EXECUTION_MODULES_PROPERTY);
+ if (executionModules != null) {
+ for (String executionModule : executionModules.split(",")) {
+ allModules: for (ModuleDescriptor md : moduleDescriptors) {
+ String moduleNodeName = SlcJcrUtils
+ .getModuleNodeName(md);
+ if (md.getName().equals(executionModule)) {
+ Node moduleNode = agentNode
+ .hasNode(moduleNodeName) ? agentNode
+ .getNode(moduleNodeName) : agentNode
+ .addNode(moduleNodeName);
+ moduleNode
+ .addMixin(SlcTypes.SLC_EXECUTION_MODULE);
+ moduleNode.setProperty(SLC_NAME, md.getName());
+ moduleNode.setProperty(SLC_VERSION,
+ md.getVersion());
+ moduleNode.setProperty(Property.JCR_TITLE,
+ md.getTitle());
+ moduleNode.setProperty(
+ Property.JCR_DESCRIPTION,
+ md.getDescription());
+ moduleNode.setProperty(SLC_STARTED,
+ md.getStarted());
+ break allModules;
+ }
+ }
+ }
+
+ // save if needed
+ if (session.hasPendingChanges())
+ session.save();
+ }
+ }
+ } catch (RepositoryException e) {
+ JcrUtils.discardQuietly(session);
+ JcrUtils.logoutQuietly(session);
+ throw new SlcException("Cannot initialize modules", e);
+ }
+ }
+
+ public void destroy() {
+ clearAgent();
+ JcrUtils.logoutQuietly(session);
+ }
+
+ protected synchronized void clearAgent() {
+ try {
+ Node agentNode = session.getNode(agent.getNodePath());
+ for (NodeIterator nit = agentNode.getNodes(); nit.hasNext();)
+ nit.nextNode().remove();
+ session.save();
+ } catch (RepositoryException e) {
+ JcrUtils.discardQuietly(session);
+ throw new SlcException("Cannot clear agent " + agent, e);
+ }
+ }
+
+ /*
+ * EXECUTION MODULES LISTENER
+ */
+
+ public synchronized void executionModuleAdded(
+ ModuleDescriptor moduleDescriptor) {
+ syncExecutionModule(moduleDescriptor);
+ }
+
+ protected void syncExecutionModule(ModuleDescriptor moduleDescriptor) {
+ try {
+ Node agentNode = session.getNode(agent.getNodePath());
+ String moduleNodeName = SlcJcrUtils
+ .getModuleNodeName(moduleDescriptor);
+ Node moduleNode = agentNode.hasNode(moduleNodeName) ? agentNode
+ .getNode(moduleNodeName) : agentNode
+ .addNode(moduleNodeName);
+ moduleNode.addMixin(SlcTypes.SLC_EXECUTION_MODULE);
+ moduleNode.setProperty(SLC_NAME, moduleDescriptor.getName());
+ moduleNode.setProperty(SLC_VERSION, moduleDescriptor.getVersion());
+ moduleNode.setProperty(Property.JCR_TITLE,
+ moduleDescriptor.getTitle());
+ moduleNode.setProperty(Property.JCR_DESCRIPTION,
+ moduleDescriptor.getDescription());
+ moduleNode.setProperty(SLC_STARTED, moduleDescriptor.getStarted());
+ session.save();
+ } catch (RepositoryException e) {
+ JcrUtils.discardQuietly(session);
+ throw new SlcException("Cannot sync module " + moduleDescriptor, e);
+ }
+ }
+
+ public synchronized void executionModuleRemoved(
+ ModuleDescriptor moduleDescriptor) {
+ try {
+ String moduleName = SlcJcrUtils.getModuleNodeName(moduleDescriptor);
+ Node agentNode = session.getNode(agent.getNodePath());
+ if (agentNode.hasNode(moduleName)) {
+ Node moduleNode = agentNode.getNode(moduleName);
+ for (NodeIterator nit = moduleNode.getNodes(); nit.hasNext();) {
+ nit.nextNode().remove();
+ }
+ moduleNode.setProperty(SLC_STARTED, false);
+ }
+ session.save();
+ } catch (RepositoryException e) {
+ JcrUtils.discardQuietly(session);
+ throw new SlcException("Cannot remove module " + moduleDescriptor,
+ e);
+ }
+ }
+
+ public synchronized void executionFlowAdded(ModuleDescriptor module,
+ ExecutionFlowDescriptor efd) {
+ try {
+ Node agentNode = session.getNode(agent.getNodePath());
+ Node moduleNode = agentNode.getNode(SlcJcrUtils
+ .getModuleNodeName(module));
+ String relativePath = getExecutionFlowRelativePath(efd);
+ @SuppressWarnings("unused")
+ Node flowNode = null;
+ if (!moduleNode.hasNode(relativePath)) {
+ flowNode = createExecutionFlowNode(moduleNode, relativePath,
+ efd);
+ session.save();
+ } else {
+ flowNode = moduleNode.getNode(relativePath);
+ }
+
+ if (log.isTraceEnabled())
+ log.trace("Flow " + efd + " added to JCR");
+ } catch (RepositoryException e) {
+ JcrUtils.discardQuietly(session);
+ throw new SlcException("Cannot add flow " + efd + " from module "
+ + module, e);
+ }
+
+ }
+
+ protected Node createExecutionFlowNode(Node moduleNode,
+ String relativePath, ExecutionFlowDescriptor efd)
+ throws RepositoryException {
+ Node flowNode = null;
+ List<String> pathTokens = Arrays.asList(relativePath.split("/"));
+
+ Iterator<String> names = pathTokens.iterator();
+ // create intermediary paths
+ Node currNode = moduleNode;
+ while (names.hasNext()) {
+ String name = names.next();
+ if (currNode.hasNode(name))
+ currNode = currNode.getNode(name);
+ else {
+ if (names.hasNext())
+ currNode = currNode.addNode(name);
+ else
+ flowNode = currNode.addNode(name,
+ SlcTypes.SLC_EXECUTION_FLOW);
+ }
+ }
+
+ // name, description
+ flowNode.setProperty(SLC_NAME, efd.getName());
+ String endName = pathTokens.get(pathTokens.size() - 1);
+ flowNode.setProperty(Property.JCR_TITLE, endName);
+ if (efd.getDescription() != null
+ && !efd.getDescription().trim().equals("")) {
+ flowNode.setProperty(Property.JCR_DESCRIPTION, efd.getDescription());
+ } else {
+ flowNode.setProperty(Property.JCR_DESCRIPTION, endName);
+ }
+
+ // execution spec
+ ExecutionSpec executionSpec = efd.getExecutionSpec();
+ String esName = executionSpec.getName();
+ if (esName == null || esName.equals(ExecutionSpec.INTERNAL_NAME)
+ || esName.contains("#")/* automatically generated bean name */) {
+ // internal spec node
+ mapExecutionSpec(flowNode, executionSpec);
+ } else {
+ // reference spec node
+ Node executionSpecsNode = moduleNode.hasNode(SLC_EXECUTION_SPECS) ? moduleNode
+ .getNode(SLC_EXECUTION_SPECS) : moduleNode
+ .addNode(SLC_EXECUTION_SPECS);
+ Node executionSpecNode = executionSpecsNode.addNode(esName,
+ SlcTypes.SLC_EXECUTION_SPEC);
+ executionSpecNode.setProperty(SLC_NAME, esName);
+ executionSpecNode.setProperty(Property.JCR_TITLE, esName);
+ if (executionSpec.getDescription() != null
+ && !executionSpec.getDescription().trim().equals(""))
+ executionSpecNode.setProperty(Property.JCR_DESCRIPTION,
+ executionSpec.getDescription());
+ mapExecutionSpec(executionSpecNode, executionSpec);
+ flowNode.setProperty(SLC_SPEC, executionSpecNode);
+ }
+
+ // flow values
+ for (String attr : efd.getValues().keySet()) {
+ ExecutionSpecAttribute esa = executionSpec.getAttributes()
+ .get(attr);
+ if (esa instanceof PrimitiveSpecAttribute) {
+ PrimitiveSpecAttribute psa = (PrimitiveSpecAttribute) esa;
+ // if spec reference there will be no node at this stage
+ Node valueNode = JcrUtils.getOrAdd(flowNode, attr);
+ valueNode.setProperty(SLC_TYPE, psa.getType());
+ SlcJcrUtils.setPrimitiveAsProperty(valueNode, SLC_VALUE,
+ (PrimitiveValue) efd.getValues().get(attr));
+ }
+ }
+
+ return flowNode;
+ }
+
+ /**
+ * Base can be either an execution spec node, or an execution flow node (in
+ * case the execution spec is internal)
+ */
+ protected void mapExecutionSpec(Node baseNode, ExecutionSpec executionSpec)
+ throws RepositoryException {
+ for (String attrName : executionSpec.getAttributes().keySet()) {
+ ExecutionSpecAttribute esa = executionSpec.getAttributes().get(
+ attrName);
+ Node attrNode = baseNode.addNode(attrName);
+ // booleans
+ attrNode.addMixin(SlcTypes.SLC_EXECUTION_SPEC_ATTRIBUTE);
+ attrNode.setProperty(SLC_IS_IMMUTABLE, esa.getIsImmutable());
+ attrNode.setProperty(SLC_IS_CONSTANT, esa.getIsConstant());
+ attrNode.setProperty(SLC_IS_HIDDEN, esa.getIsHidden());
+
+ if (esa instanceof PrimitiveSpecAttribute) {
+ attrNode.addMixin(SlcTypes.SLC_PRIMITIVE_SPEC_ATTRIBUTE);
+ PrimitiveSpecAttribute psa = (PrimitiveSpecAttribute) esa;
+ SlcJcrUtils.setPrimitiveAsProperty(attrNode, SLC_VALUE, psa);
+ attrNode.setProperty(SLC_TYPE, psa.getType());
+ } else if (esa instanceof RefSpecAttribute) {
+ attrNode.addMixin(SlcTypes.SLC_REF_SPEC_ATTRIBUTE);
+ RefSpecAttribute rsa = (RefSpecAttribute) esa;
+ attrNode.setProperty(SLC_TYPE, rsa.getTargetClassName());
+ Object value = rsa.getValue();
+ if (rsa.getChoices() != null) {
+ Integer index = null;
+ int count = 0;
+ for (RefValueChoice choice : rsa.getChoices()) {
+ String name = choice.getName();
+ if (value != null && name.equals(value.toString()))
+ index = count;
+ Node choiceNode = attrNode.addNode(choice.getName());
+ choiceNode.addMixin(NodeType.MIX_TITLE);
+ choiceNode.setProperty(Property.JCR_TITLE,
+ choice.getName());
+ if (choice.getDescription() != null
+ && !choice.getDescription().trim().equals(""))
+ choiceNode.setProperty(Property.JCR_DESCRIPTION,
+ choice.getDescription());
+ count++;
+ }
+
+ if (index != null)
+ attrNode.setProperty(SLC_VALUE, index);
+ }
+ }
+ }
+ }
+
+ public synchronized void executionFlowRemoved(ModuleDescriptor module,
+ ExecutionFlowDescriptor executionFlow) {
+ try {
+ Node agentNode = session.getNode(agent.getNodePath());
+ Node moduleNode = agentNode.getNode(SlcJcrUtils
+ .getModuleNodeName(module));
+ String relativePath = getExecutionFlowRelativePath(executionFlow);
+ if (moduleNode.hasNode(relativePath))
+ moduleNode.getNode(relativePath).remove();
+ agentNode.getSession().save();
+ } catch (RepositoryException e) {
+ throw new SlcException("Cannot remove flow " + executionFlow
+ + " from module " + module, e);
+ }
+ }
+
+ /*
+ * UTILITIES
+ */
+ /** @return the relative path, never starts with '/' */
+ @SuppressWarnings("deprecation")
+ protected String getExecutionFlowRelativePath(
+ ExecutionFlowDescriptor executionFlow) {
+ String relativePath = executionFlow.getPath() == null ? executionFlow
+ .getName() : executionFlow.getPath() + '/'
+ + executionFlow.getName();
+ // we assume that it is more than one char long
+ if (relativePath.charAt(0) == '/')
+ relativePath = relativePath.substring(1);
+ // FIXME quick hack to avoid duplicate '/'
+ relativePath = relativePath.replaceAll("//", "/");
+ return relativePath;
+ }
+
+ /*
+ * BEAN
+ */
+ public void setAgent(JcrAgent agent) {
+ this.agent = agent;
+ }
+
+ public void setRepository(Repository repository) {
+ this.repository = repository;
+ }
+
+ public void setModulesManager(ExecutionModulesManager modulesManager) {
+ this.modulesManager = modulesManager;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (C) 2007-2012 Argeo GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.argeo.slc.jcr.execution;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.Property;
+import javax.jcr.Repository;
+import javax.jcr.RepositoryException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.jcr.JcrUtils;
+import org.argeo.slc.NameVersion;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.SlcNames;
+import org.argeo.slc.SlcTypes;
+import org.argeo.slc.execution.ExecutionProcess;
+import org.argeo.slc.execution.ExecutionStep;
+import org.argeo.slc.execution.RealizedFlow;
+import org.argeo.slc.jcr.SlcJcrUtils;
+import org.argeo.slc.runtime.ProcessThread;
+
+/** Execution process implementation based on a JCR node. */
+public class JcrExecutionProcess implements ExecutionProcess, SlcNames {
+ private final static Log log = LogFactory.getLog(JcrExecutionProcess.class);
+ private final Node node;
+
+ private Long nextLogLine = 1l;
+
+ public JcrExecutionProcess(Node node) {
+ this.node = node;
+ }
+
+ public synchronized String getUuid() {
+ try {
+ return node.getProperty(SLC_UUID).getString();
+ } catch (RepositoryException e) {
+ throw new SlcException("Cannot get uuid for " + node, e);
+ }
+ }
+
+ public synchronized String getStatus() {
+ try {
+ return node.getProperty(SLC_STATUS).getString();
+ } catch (RepositoryException e) {
+ log.error("Cannot get status: " + e);
+ // we should re-throw exception because this information can
+ // probably used for monitoring in case there are already unexpected
+ // exceptions
+ return UNKOWN;
+ }
+ }
+
+ public synchronized void setStatus(String status) {
+ try {
+ node.setProperty(SLC_STATUS, status);
+ // last modified properties needs to be manually updated
+ // see https://issues.apache.org/jira/browse/JCR-2233
+ JcrUtils.updateLastModified(node);
+ node.getSession().save();
+ } catch (RepositoryException e) {
+ JcrUtils.discardUnderlyingSessionQuietly(node);
+ // we should re-throw exception because this information can
+ // probably used for monitoring in case there are already unexpected
+ // exceptions
+ log.error("Cannot set status " + status + ": " + e);
+ }
+ }
+
+ /**
+ * Synchronized in order to make sure that there is no concurrent
+ * modification of {@link #nextLogLine}.
+ */
+ public synchronized void addSteps(List<ExecutionStep> steps) {
+ try {
+ steps: for (ExecutionStep step : steps) {
+ String type;
+ if (step.getType().equals(ExecutionStep.TRACE))
+ type = SlcTypes.SLC_LOG_TRACE;
+ else if (step.getType().equals(ExecutionStep.DEBUG))
+ type = SlcTypes.SLC_LOG_DEBUG;
+ else if (step.getType().equals(ExecutionStep.INFO))
+ type = SlcTypes.SLC_LOG_INFO;
+ else if (step.getType().equals(ExecutionStep.WARNING))
+ type = SlcTypes.SLC_LOG_WARNING;
+ else if (step.getType().equals(ExecutionStep.ERROR))
+ type = SlcTypes.SLC_LOG_ERROR;
+ else
+ // skip
+ continue steps;
+
+ String relPath = SLC_LOG + '/'
+ + step.getThread().replace('/', '_') + '/'
+ + step.getLocation().replace('.', '/');
+ String path = node.getPath() + '/' + relPath;
+ // clean special character
+ // TODO factorize in JcrUtils
+ path = path.replace('@', '_');
+
+ Node location = JcrUtils.mkdirs(node.getSession(), path);
+ Node logEntry = location.addNode(Long.toString(nextLogLine),
+ type);
+ logEntry.setProperty(SLC_MESSAGE, step.getLog());
+ Calendar calendar = new GregorianCalendar();
+ calendar.setTime(step.getTimestamp());
+ logEntry.setProperty(SLC_TIMESTAMP, calendar);
+
+ // System.out.println("Logged " + logEntry.getPath());
+
+ nextLogLine++;
+ }
+
+ // last modified properties needs to be manually updated
+ // see https://issues.apache.org/jira/browse/JCR-2233
+ JcrUtils.updateLastModified(node);
+
+ node.getSession().save();
+ } catch (Exception e) {
+ JcrUtils.discardUnderlyingSessionQuietly(node);
+ e.printStackTrace();
+ }
+ }
+
+ // public Node getNode() {
+ // return node;
+ // }
+
+ public List<RealizedFlow> getRealizedFlows() {
+ try {
+ List<RealizedFlow> realizedFlows = new ArrayList<RealizedFlow>();
+ Node rootRealizedFlowNode = node.getNode(SLC_FLOW);
+ // we just manage one level for the time being
+ NodeIterator nit = rootRealizedFlowNode.getNodes(SLC_FLOW);
+ while (nit.hasNext()) {
+ Node realizedFlowNode = nit.nextNode();
+
+ if (realizedFlowNode.hasNode(SLC_ADDRESS)) {
+ String flowPath = realizedFlowNode.getNode(SLC_ADDRESS)
+ .getProperty(Property.JCR_PATH).getString();
+ NameVersion moduleNameVersion = SlcJcrUtils
+ .moduleNameVersion(flowPath);
+ ((ProcessThread) Thread.currentThread())
+ .getExecutionModulesManager().start(
+ moduleNameVersion);
+ }
+
+ RealizedFlow realizedFlow = new JcrRealizedFlow(
+ realizedFlowNode);
+ if (realizedFlow != null)
+ realizedFlows.add(realizedFlow);
+ }
+ return realizedFlows;
+ } catch (RepositoryException e) {
+ throw new SlcException("Cannot get realized flows", e);
+ }
+ }
+
+ public String getNodePath() {
+ try {
+ return node.getPath();
+ } catch (RepositoryException e) {
+ throw new SlcException("Cannot get process node path for " + node,
+ e);
+ }
+ }
+
+ public Repository getRepository() {
+ try {
+ return node.getSession().getRepository();
+ } catch (RepositoryException e) {
+ throw new SlcException("Cannot get process JCR repository for "
+ + node, e);
+ }
+ }
+}
--- /dev/null
+/*
+
+ * Copyright (C) 2007-2012 Argeo GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.argeo.slc.jcr.execution;
+
+import java.util.List;
+
+import javax.jcr.Node;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+
+import org.argeo.jcr.JcrUtils;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.SlcNames;
+import org.argeo.slc.execution.ExecutionModulesManager;
+import org.argeo.slc.execution.ExecutionProcess;
+import org.argeo.slc.execution.RealizedFlow;
+import org.argeo.slc.runtime.ProcessThread;
+
+/** Where the actual execution takes place */
+public class JcrProcessThread extends ProcessThread implements SlcNames {
+
+ public JcrProcessThread(ThreadGroup processesThreadGroup,
+ ExecutionModulesManager executionModulesManager,
+ JcrExecutionProcess process) {
+ super(processesThreadGroup, executionModulesManager, process);
+ }
+
+ /** Overridden in order to set progress status on realized flow nodes. */
+ @Override
+ protected void process() throws InterruptedException {
+ Session session = null;
+ if (getProcess() instanceof JcrExecutionProcess)
+ try {
+ session = ((JcrExecutionProcess) getProcess()).getRepository()
+ .login();
+
+ List<RealizedFlow> realizedFlows = getProcess()
+ .getRealizedFlows();
+ for (RealizedFlow realizedFlow : realizedFlows) {
+ Node realizedFlowNode = session
+ .getNode(((JcrRealizedFlow) realizedFlow).getPath());
+ setFlowStatus(realizedFlowNode, ExecutionProcess.RUNNING);
+
+ try {
+ //
+ // EXECUTE THE FLOW
+ //
+ execute(realizedFlow, true);
+
+ setFlowStatus(realizedFlowNode,
+ ExecutionProcess.COMPLETED);
+ } catch (RepositoryException e) {
+ throw e;
+ } catch (InterruptedException e) {
+ setFlowStatus(realizedFlowNode, ExecutionProcess.KILLED);
+ throw e;
+ } catch (RuntimeException e) {
+ setFlowStatus(realizedFlowNode, ExecutionProcess.ERROR);
+ throw e;
+ }
+ }
+ } catch (RepositoryException e) {
+ throw new SlcException("Cannot process "
+ + getJcrExecutionProcess().getNodePath(), e);
+ } finally {
+ JcrUtils.logoutQuietly(session);
+ }
+ else
+ super.process();
+ }
+
+ protected void setFlowStatus(Node realizedFlowNode, String status)
+ throws RepositoryException {
+ realizedFlowNode.setProperty(SLC_STATUS, status);
+ realizedFlowNode.getSession().save();
+ }
+
+ protected JcrExecutionProcess getJcrExecutionProcess() {
+ return (JcrExecutionProcess) getProcess();
+ }
+}
--- /dev/null
+package org.argeo.slc.jcr.execution;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.Property;
+import javax.jcr.RepositoryException;
+
+import org.argeo.slc.SlcException;
+import org.argeo.slc.SlcNames;
+import org.argeo.slc.SlcTypes;
+import org.argeo.slc.execution.ExecutionFlowDescriptor;
+import org.argeo.slc.execution.ExecutionSpecAttribute;
+import org.argeo.slc.execution.RealizedFlow;
+import org.argeo.slc.execution.RefSpecAttribute;
+import org.argeo.slc.jcr.SlcJcrUtils;
+import org.argeo.slc.primitive.PrimitiveSpecAttribute;
+import org.argeo.slc.primitive.PrimitiveUtils;
+import org.argeo.slc.runtime.DefaultExecutionSpec;
+
+public class JcrRealizedFlow extends RealizedFlow implements SlcNames {
+ private static final long serialVersionUID = -3709453850260712001L;
+ private String path;
+
+ public JcrRealizedFlow(Node node) {
+ try {
+ this.path = node.getPath();
+ loadFromNode(node);
+ } catch (RepositoryException e) {
+ throw new SlcException("Cannot initialize from " + node, e);
+ }
+ }
+
+ protected void loadFromNode(Node realizedFlowNode) throws RepositoryException {
+ if (realizedFlowNode.hasNode(SLC_ADDRESS)) {
+ String flowPath = realizedFlowNode.getNode(SLC_ADDRESS).getProperty(Property.JCR_PATH).getString();
+ // TODO: convert to local path if remote
+ // FIXME start related module
+ Node flowNode = realizedFlowNode.getSession().getNode(flowPath);
+ String flowName = flowNode.getProperty(SLC_NAME).getString();
+ String description = null;
+ if (flowNode.hasProperty(Property.JCR_DESCRIPTION))
+ description = flowNode.getProperty(Property.JCR_DESCRIPTION).getString();
+
+ Node executionModuleNode = flowNode.getSession().getNode(SlcJcrUtils.modulePath(flowPath));
+ String executionModuleName = executionModuleNode.getProperty(SLC_NAME).getString();
+ String executionModuleVersion = executionModuleNode.getProperty(SLC_VERSION).getString();
+
+ RealizedFlow realizedFlow = this;
+ realizedFlow.setModuleName(executionModuleName);
+ realizedFlow.setModuleVersion(executionModuleVersion);
+
+ // retrieve execution spec
+ DefaultExecutionSpec executionSpec = new DefaultExecutionSpec();
+ Map<String, ExecutionSpecAttribute> attrs = readExecutionSpecAttributes(realizedFlowNode);
+ executionSpec.setAttributes(attrs);
+
+ // set execution spec name
+ if (flowNode.hasProperty(SlcNames.SLC_SPEC)) {
+ Node executionSpecNode = flowNode.getProperty(SLC_SPEC).getNode();
+ executionSpec.setName(executionSpecNode.getProperty(SLC_NAME).getString());
+ }
+
+ // explicitly retrieve values
+ Map<String, Object> values = new HashMap<String, Object>();
+ for (String attrName : attrs.keySet()) {
+ ExecutionSpecAttribute attr = attrs.get(attrName);
+ Object value = attr.getValue();
+ values.put(attrName, value);
+ }
+
+ ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(flowName, description, values, executionSpec);
+ realizedFlow.setFlowDescriptor(efd);
+ } else {
+ throw new SlcException("Unsupported realized flow " + realizedFlowNode);
+ }
+ }
+
+ protected Map<String, ExecutionSpecAttribute> readExecutionSpecAttributes(Node node) {
+ try {
+ Map<String, ExecutionSpecAttribute> attrs = new HashMap<String, ExecutionSpecAttribute>();
+ for (NodeIterator nit = node.getNodes(); nit.hasNext();) {
+ Node specAttrNode = nit.nextNode();
+ if (specAttrNode.isNodeType(SlcTypes.SLC_PRIMITIVE_SPEC_ATTRIBUTE)) {
+ String type = specAttrNode.getProperty(SLC_TYPE).getString();
+ Object value = null;
+ if (specAttrNode.hasProperty(SLC_VALUE)) {
+ String valueStr = specAttrNode.getProperty(SLC_VALUE).getString();
+ value = PrimitiveUtils.convert(type, valueStr);
+ }
+ PrimitiveSpecAttribute specAttr = new PrimitiveSpecAttribute(type, value);
+ attrs.put(specAttrNode.getName(), specAttr);
+ } else if (specAttrNode.isNodeType(SlcTypes.SLC_REF_SPEC_ATTRIBUTE)) {
+ if (!specAttrNode.hasProperty(SLC_VALUE)) {
+ continue;
+ }
+ Integer value = (int) specAttrNode.getProperty(SLC_VALUE).getLong();
+ RefSpecAttribute specAttr = new RefSpecAttribute();
+ NodeIterator children = specAttrNode.getNodes();
+ int index = 0;
+ String id = null;
+ while (children.hasNext()) {
+ Node child = children.nextNode();
+ if (index == value)
+ id = child.getName();
+ index++;
+ }
+ specAttr.setValue(id);
+ attrs.put(specAttrNode.getName(), specAttr);
+ }
+ // throw new SlcException("Unsupported spec attribute "
+ // + specAttrNode);
+ }
+ return attrs;
+ } catch (RepositoryException e) {
+ throw new SlcException("Cannot read spec attributes from " + node, e);
+ }
+ }
+
+ public String getPath() {
+ return path;
+ }
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11"/>
+ <classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
+ <classpathentry kind="src" path="src"/>
+ <classpathentry kind="output" path="bin"/>
+</classpath>
--- /dev/null
+/bin/
+/target/
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>org.argeo.slc.runtime</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.eclipse.pde.ManifestBuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.eclipse.pde.SchemaBuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.pde.PluginNature</nature>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
--- /dev/null
+/MANIFEST.MF
--- /dev/null
+Import-Package: org.argeo.slc.deploy,\
+*
\ No newline at end of file
--- /dev/null
+source.. = src/
+output.. = bin/
+bin.includes = META-INF/,\
+ .
--- /dev/null
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.argeo.slc</groupId>
+ <artifactId>argeo-slc</artifactId>
+ <version>2.1.17-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+ <artifactId>org.argeo.slc.runtime</artifactId>
+ <name>SLC Runtime</name>
+ <dependencies>
+ <!-- SLC -->
+ <dependency>
+ <groupId>org.argeo.slc</groupId>
+ <artifactId>org.argeo.slc.api</artifactId>
+ <version>2.1.17-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (C) 2007-2012 Argeo GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.argeo.slc.runtime;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.execution.ExecutionContext;
+import org.argeo.slc.execution.ExecutionFlow;
+import org.argeo.slc.execution.ExecutionFlowDescriptorConverter;
+import org.argeo.slc.execution.ExecutionModulesManager;
+import org.argeo.slc.execution.RealizedFlow;
+
+/** Provides the base feature of an execution module manager. */
+public abstract class AbstractExecutionModulesManager implements
+ ExecutionModulesManager {
+ private final static Log log = LogFactory
+ .getLog(AbstractExecutionModulesManager.class);
+
+ // private List<FilteredNotifier> filteredNotifiers = Collections
+ // .synchronizedList(new ArrayList<FilteredNotifier>());
+
+ protected abstract ExecutionFlow findExecutionFlow(String moduleName,
+ String moduleVersion, String flowName);
+
+ protected abstract ExecutionContext findExecutionContext(String moduleName,
+ String moduleVersion);
+
+ protected abstract ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter(
+ String moduleName, String moduleVersion);
+
+ public void execute(RealizedFlow realizedFlow) {
+ if (log.isTraceEnabled())
+ log.trace("Executing " + realizedFlow);
+
+ String moduleName = realizedFlow.getModuleName();
+ String moduleVersion = realizedFlow.getModuleVersion();
+
+ Map<? extends String, ? extends Object> variablesToAdd = getExecutionFlowDescriptorConverter(
+ moduleName, moduleVersion).convertValues(
+ realizedFlow.getFlowDescriptor());
+ ExecutionContext executionContext = findExecutionContext(moduleName,
+ moduleVersion);
+ for (String key : variablesToAdd.keySet())
+ executionContext.setVariable(key, variablesToAdd.get(key));
+
+ ExecutionFlow flow = findExecutionFlow(moduleName, moduleVersion,
+ realizedFlow.getFlowDescriptor().getName());
+
+ //
+ // Actually runs the flow, IN THIS THREAD
+ //
+ executionContext.beforeFlow(flow);
+ try {
+ flow.run();
+ } finally {
+ executionContext.afterFlow(flow);
+ }
+ //
+ //
+ //
+ }
+
+ // public void dispatchUpdateStatus(ExecutionProcess process,
+ // String oldStatus, String newStatus) {
+ // // filtered notifiers
+ // for (Iterator<FilteredNotifier> it = filteredNotifiers.iterator(); it
+ // .hasNext();) {
+ // FilteredNotifier filteredNotifier = it.next();
+ // if (filteredNotifier.receiveFrom(process))
+ // filteredNotifier.getNotifier().updateStatus(process, oldStatus,
+ // newStatus);
+ // }
+ //
+ // }
+
+ // public void dispatchAddSteps(ExecutionProcess process,
+ // List<ExecutionStep> steps) {
+ // process.addSteps(steps);
+ // for (Iterator<FilteredNotifier> it = filteredNotifiers.iterator(); it
+ // .hasNext();) {
+ // FilteredNotifier filteredNotifier = it.next();
+ // if (filteredNotifier.receiveFrom(process))
+ // filteredNotifier.getNotifier().addSteps(process, steps);
+ // }
+ // }
+
+ // public void registerProcessNotifier(ExecutionProcessNotifier notifier,
+ // Map<String, String> properties) {
+ // filteredNotifiers.add(new FilteredNotifier(notifier, properties));
+ // }
+ //
+ // public void unregisterProcessNotifier(ExecutionProcessNotifier notifier,
+ // Map<String, String> properties) {
+ // filteredNotifiers.remove(notifier);
+ // }
+
+ // protected class FilteredNotifier {
+ // private final ExecutionProcessNotifier notifier;
+ // private final String processId;
+ //
+ // public FilteredNotifier(ExecutionProcessNotifier notifier,
+ // Map<String, String> properties) {
+ // super();
+ // this.notifier = notifier;
+ // if (properties == null)
+ // properties = new HashMap<String, String>();
+ // if (properties.containsKey(SLC_PROCESS_ID))
+ // processId = properties.get(SLC_PROCESS_ID);
+ // else
+ // processId = null;
+ // }
+ //
+ // /**
+ // * Whether event from this process should be received by this listener.
+ // */
+ // public Boolean receiveFrom(ExecutionProcess process) {
+ // if (processId != null)
+ // if (process.getUuid().equals(processId))
+ // return true;
+ // else
+ // return false;
+ // return true;
+ // }
+ //
+ // @Override
+ // public int hashCode() {
+ // return notifier.hashCode();
+ // }
+ //
+ // @Override
+ // public boolean equals(Object obj) {
+ // if (obj instanceof FilteredNotifier) {
+ // FilteredNotifier fn = (FilteredNotifier) obj;
+ // return notifier.equals(fn.notifier);
+ // } else if (obj instanceof ExecutionProcessNotifier) {
+ // ExecutionProcessNotifier epn = (ExecutionProcessNotifier) obj;
+ // return notifier.equals(epn);
+ // } else
+ // return false;
+ // }
+ //
+ // public ExecutionProcessNotifier getNotifier() {
+ // return notifier;
+ // }
+ //
+ // }
+}
--- /dev/null
+/*
+ * Copyright (C) 2007-2012 Argeo GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.argeo.slc.runtime;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.argeo.slc.DefaultNameVersion;
+import org.argeo.slc.NameVersion;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.execution.ExecutionModulesManager;
+import org.argeo.slc.execution.ExecutionProcess;
+import org.argeo.slc.execution.SlcAgent;
+
+/** Implements the base methods of an SLC agent. */
+public class DefaultAgent implements SlcAgent {
+ // private final static Log log = LogFactory.getLog(DefaultAgent.class);
+ /** UTF-8 charset for encoding. */
+ private final static String UTF8 = "UTF-8";
+
+ private String agentUuid = null;
+ private ExecutionModulesManager modulesManager;
+
+ private ThreadGroup processesThreadGroup;
+ private Map<String, ProcessThread> runningProcesses = Collections
+ .synchronizedMap(new HashMap<String, ProcessThread>());
+
+ private String defaultModulePrefix = null;
+
+ /*
+ * LIFECYCLE
+ */
+ /** Initialization */
+ public void init() {
+ agentUuid = initAgentUuid();
+ processesThreadGroup = new ThreadGroup("SLC Processes of Agent #"
+ + agentUuid);
+ }
+
+ /** Clean up (needs to be called by overriding method) */
+ public void destroy() {
+ }
+
+ /**
+ * Called during initialization in order to determines the agent UUID. To be
+ * overridden. By default creates a new one per instance.
+ */
+ protected String initAgentUuid() {
+ return UUID.randomUUID().toString();
+ }
+
+ /*
+ * SLC AGENT
+ */
+ public void process(ExecutionProcess process) {
+ ProcessThread processThread = createProcessThread(processesThreadGroup,
+ modulesManager, process);
+ processThread.start();
+ runningProcesses.put(process.getUuid(), processThread);
+
+ // clean up old processes
+ Iterator<ProcessThread> it = runningProcesses.values().iterator();
+ while (it.hasNext()) {
+ ProcessThread pThread = it.next();
+ if (!pThread.isAlive())
+ it.remove();
+ }
+ }
+
+ public String process(List<URI> uris) {
+ DefaultProcess process = new DefaultProcess();
+ for (URI uri : uris) {
+ String[] path = uri.getPath().split("/");
+ if (path.length < 3)
+ throw new SlcException("Badly formatted URI: " + uri);
+ NameVersion nameVersion = new DefaultNameVersion(path[1]);
+ StringBuilder flow = new StringBuilder();
+ for (int i = 2; i < path.length; i++)
+ flow.append('/').append(path[i]);
+
+ Map<String, Object> values = getQueryMap(uri.getQuery());
+ // Get execution module descriptor
+ ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
+ nameVersion.getName(), nameVersion.getVersion());
+ process.getRealizedFlows().add(
+ emd.asRealizedFlow(flow.toString(), values));
+ }
+ process(process);
+ return process.getUuid();
+ }
+
+ public void kill(String processUuid) {
+ if (runningProcesses.containsKey(processUuid)) {
+ runningProcesses.get(processUuid).interrupt();
+ } else {
+ // assume is finished
+ }
+ }
+
+ public void waitFor(String processUuid, Long millis) {
+ if (runningProcesses.containsKey(processUuid)) {
+ try {
+ if (millis != null)
+ runningProcesses.get(processUuid).join(millis);
+ else
+ runningProcesses.get(processUuid).join();
+ } catch (InterruptedException e) {
+ // silent
+ }
+ } else {
+ // assume is finished
+ }
+ }
+
+ /** Creates the thread which will coordinate the execution for this agent. */
+ protected ProcessThread createProcessThread(
+ ThreadGroup processesThreadGroup,
+ ExecutionModulesManager modulesManager, ExecutionProcess process) {
+ ProcessThread processThread = new ProcessThread(processesThreadGroup,
+ modulesManager, process);
+ return processThread;
+ }
+
+ public ExecutionModuleDescriptor getExecutionModuleDescriptor(
+ String moduleName, String moduleVersion) {
+ // Get execution module descriptor
+ ExecutionModuleDescriptor emd;
+ try {
+ modulesManager
+ .start(new DefaultNameVersion(moduleName, moduleVersion));
+ emd = modulesManager.getExecutionModuleDescriptor(moduleName,
+ moduleVersion);
+ } catch (SlcException e) {
+ if (defaultModulePrefix != null) {
+ moduleName = defaultModulePrefix + "." + moduleName;
+ modulesManager.start(new DefaultNameVersion(moduleName,
+ moduleVersion));
+ emd = modulesManager.getExecutionModuleDescriptor(moduleName,
+ moduleVersion);
+ } else
+ throw e;
+ }
+ return emd;
+ }
+
+ public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
+ return modulesManager.listExecutionModules();
+ }
+
+ public boolean ping() {
+ return true;
+ }
+
+ /*
+ * UTILITIES
+ */
+ /**
+ * @param query
+ * can be null
+ */
+ static Map<String, Object> getQueryMap(String query) {
+ Map<String, Object> map = new LinkedHashMap<String, Object>();
+ if (query == null)
+ return map;
+ String[] params = query.split("&");
+ for (String param : params) {
+ String[] arr = param.split("=");
+ String name = arr[0];
+ Object value = arr.length > 1 ? param.split("=")[1] : Boolean.TRUE;
+ try {
+ map.put(URLDecoder.decode(name, UTF8),
+ URLDecoder.decode(value.toString(), UTF8));
+ } catch (UnsupportedEncodingException e) {
+ throw new SlcException("Cannot decode '" + param + "'", e);
+ }
+ }
+ return map;
+ }
+
+ /*
+ * BEAN
+ */
+ public void setModulesManager(ExecutionModulesManager modulesManager) {
+ this.modulesManager = modulesManager;
+ }
+
+ public void setDefaultModulePrefix(String defaultModulePrefix) {
+ this.defaultModulePrefix = defaultModulePrefix;
+ }
+
+ public String getAgentUuid() {
+ return agentUuid;
+ }
+
+ @Override
+ public String toString() {
+ return "Agent #" + getAgentUuid();
+ }
+}
--- /dev/null
+package org.argeo.slc.runtime;
+
+import java.net.URI;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.DefaultNameVersion;
+import org.argeo.slc.NameVersion;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.execution.ExecutionFlowDescriptor;
+import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.execution.ExecutionSpec;
+import org.argeo.slc.execution.ExecutionSpecAttribute;
+import org.argeo.slc.execution.SlcAgent;
+import org.argeo.slc.execution.SlcAgentCli;
+
+/**
+ * Authenticates thread and executes synchronously a command line execution.
+ * Reference implementation of args to URIs algorithm.
+ */
+public class DefaultAgentCli implements SlcAgentCli {
+ private final static Log log = LogFactory.getLog(DefaultAgentCli.class);
+
+ private final static String UTF8 = "UTF-8";
+ private SlcAgent agent;
+// private AuthenticationManager authenticationManager;
+
+ private Long timeout = 24 * 60 * 60 * 1000l;
+
+ public String process(String[] args) {
+// if (SecurityContextHolder.getContext().getAuthentication() == null) {
+// OsAuthenticationToken oat = new OsAuthenticationToken();
+// Authentication authentication = authenticationManager
+// .authenticate(oat);
+// SecurityContextHolder.getContext()
+// .setAuthentication(authentication);
+// }
+
+ if (args.length > 0 && args[0].equals("help")) {
+ StringBuilder buf = new StringBuilder();
+ help(args, buf);
+ log.info("\n" + buf);
+ return buf.toString();
+ } else {
+ List<URI> uris = asURIs(args);
+ String processUuid = agent.process(uris);
+ agent.waitFor(processUuid, timeout);
+ return processUuid;
+ }
+ }
+
+ protected void help(String[] rawArgs, StringBuilder buf) {
+ String[] args = Arrays.copyOfRange(rawArgs, 1, rawArgs.length);
+ if (args.length == 0) {// modules
+ for (ExecutionModuleDescriptor emd : agent
+ .listExecutionModuleDescriptors()) {
+ appendModule(emd, buf);
+ }
+ } else if (args.length == 1 && !args[0].contains("/")) {// single module
+ NameVersion nameVersion = new DefaultNameVersion(args[0]);
+ ExecutionModuleDescriptor emd = agent.getExecutionModuleDescriptor(
+ nameVersion.getName(), nameVersion.getVersion());
+ appendModule(emd, buf);
+
+ // flows
+ for (ExecutionFlowDescriptor efd : emd.getExecutionFlows()) {
+ buf.append(" ").append(efd.getName());
+ if (efd.getDescription() != null
+ && !efd.getDescription().trim().equals(""))
+ buf.append(" : ").append(" ").append(efd.getDescription());
+ buf.append('\n');
+ }
+ return;
+ } else {
+ List<URI> uris = asURIs(args);
+ for (URI uri : uris) {
+ appendUriHelp(uri, buf);
+ }
+ }
+ }
+
+ protected void appendUriHelp(URI uri, StringBuilder buf) {
+ String[] path = uri.getPath().split("/");
+ NameVersion nameVersion = new DefaultNameVersion(path[1]);
+ ExecutionModuleDescriptor emd = agent.getExecutionModuleDescriptor(
+ nameVersion.getName(), nameVersion.getVersion());
+
+ StringBuilder flow = new StringBuilder();
+ for (int i = 2; i < path.length; i++)
+ flow.append('/').append(path[i]);
+ String flowPath = flow.toString();
+ ExecutionFlowDescriptor efd = findExecutionFlowDescriptor(emd, flowPath);
+ if (efd == null)
+ throw new SlcException("Flow " + uri + " not found");
+
+ appendModule(emd, buf);
+
+ buf.append(" ").append(efd.getName());
+ if (efd.getDescription() != null
+ && !efd.getDescription().trim().equals(""))
+ buf.append(" : ").append(" ").append(efd.getDescription());
+ buf.append('\n');
+ Map<String, Object> values = DefaultAgent.getQueryMap(uri.getQuery());
+ ExecutionSpec spec = efd.getExecutionSpec();
+ for (String attrKey : spec.getAttributes().keySet()) {
+ ExecutionSpecAttribute esa = spec.getAttributes().get(attrKey);
+ buf.append(" --").append(attrKey);
+ if (values.containsKey(attrKey))
+ buf.append(" ").append(values.get(attrKey));
+ if (esa.getValue() != null)
+ buf.append(" (").append(esa.getValue()).append(')');
+ buf.append('\n');
+ }
+ }
+
+ private void appendModule(ExecutionModuleDescriptor emd, StringBuilder buf) {
+ buf.append("# ").append(emd.getName());
+ if (emd.getDescription() != null
+ && !emd.getDescription().trim().equals(""))
+ buf.append(" : ").append(emd.getDescription());
+ if (emd.getVersion() != null)
+ buf.append(" (v").append(emd.getVersion()).append(")");
+ buf.append('\n');
+ }
+
+ public static List<URI> asURIs(String[] args) {
+ try {
+ List<URI> uris = new ArrayList<URI>();
+ List<String> leftOvers = new ArrayList<String>();
+
+ Boolean hasArgs = false;
+ String currKey = null;
+ StringBuilder currUri = null;
+ Iterator<String> argIt = Arrays.asList(args).iterator();
+ while (argIt.hasNext()) {
+ String arg = argIt.next();
+ if (!arg.startsWith("-")) {
+ if (currKey != null) {// value
+ currUri.append(URLEncoder.encode(arg, UTF8));
+ currKey = null;
+ } else { // module
+ if (currUri != null) {
+ uris.add(new URI(currUri.toString()));
+ }
+ currUri = new StringBuilder("flow:");
+
+ String currModule = arg;
+ currUri.append('/').append(currModule);
+ if (!arg.contains("/")) {
+ // flow path not in arg go to next arg
+ if (!argIt.hasNext())
+ throw new SlcException("No flow found");
+ String currFlow = argIt.next();
+ if (!currFlow.startsWith("/"))
+ currFlow = "/" + currFlow;
+ currUri.append(currFlow);
+ }
+ }
+ } else {
+ if (currUri == null) {// first args
+ leftOvers.add(arg);
+ } else {
+ String key;
+ if (arg.startsWith("--"))
+ key = arg.substring(2);
+ else if (arg.startsWith("-"))
+ key = arg.substring(1);
+ else {
+ throw new SlcException("Cannot intepret key: "
+ + arg);
+ }
+
+ if (!hasArgs) {
+ currUri.append('?');
+ hasArgs = true;
+ } else {
+ currUri.append('&');
+ }
+
+ // deal with boolean keys
+ if (currKey != null) {// value
+ currUri.append(URLEncoder.encode("true", UTF8));
+ currKey = null;
+ }
+
+ currKey = key;
+ currUri.append(URLEncoder.encode(key, UTF8))
+ .append('=');
+ }
+ }
+ }
+ if (currUri != null)
+ uris.add(new URI(currUri.toString()));
+ return uris;
+ } catch (Exception e) {
+ throw new SlcException("Cannot convert " + Arrays.toString(args)
+ + " to flow URI", e);
+ }
+ }
+
+ private ExecutionFlowDescriptor findExecutionFlowDescriptor(
+ ExecutionModuleDescriptor emd, String flowPath) {
+ ExecutionFlowDescriptor flowDescriptor = null;
+ for (ExecutionFlowDescriptor efd : emd.getExecutionFlows()) {
+ String name = efd.getName();
+ // normalize name as flow path
+ if (!name.startsWith("/"))
+ name = "/" + name;
+ if (name.endsWith("/"))
+ name = name.substring(0, name.length() - 1);
+ if (name.equals(flowPath)) {
+ flowDescriptor = efd;
+ break;
+ }
+ }
+ return flowDescriptor;
+ }
+
+ public void setAgent(SlcAgent agent) {
+ this.agent = agent;
+ }
+
+// public void setAuthenticationManager(
+// AuthenticationManager authenticationManager) {
+// this.authenticationManager = authenticationManager;
+// }
+
+ public void setTimeout(Long timeout) {
+ this.timeout = timeout;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (C) 2007-2012 Argeo GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.argeo.slc.runtime;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.execution.ExecutionContext;
+import org.argeo.slc.execution.ExecutionFlow;
+import org.argeo.slc.execution.ExecutionSpec;
+import org.argeo.slc.execution.ExecutionSpecAttribute;
+
+/** Default implementation of an execution flow. */
+public class DefaultExecutionFlow implements ExecutionFlow {
+ private final static Log log = LogFactory.getLog(DefaultExecutionFlow.class);
+
+ private final ExecutionSpec executionSpec;
+ private String name = null;
+ private Map<String, Object> parameters = new HashMap<String, Object>();
+ private List<Runnable> executables = new ArrayList<Runnable>();
+
+ private String path;
+
+ private Boolean failOnError = true;
+
+ // Only needed if stacked execution flows are used
+ private ExecutionContext executionContext = null;
+
+ public DefaultExecutionFlow() {
+ this.executionSpec = new DefaultExecutionSpec();
+ }
+
+ public DefaultExecutionFlow(ExecutionSpec executionSpec) {
+ this.executionSpec = executionSpec;
+ }
+
+ public DefaultExecutionFlow(ExecutionSpec executionSpec, Map<String, Object> parameters) {
+ // be sure to have an execution spec
+ this.executionSpec = (executionSpec == null) ? new DefaultExecutionSpec() : executionSpec;
+
+ // only parameters contained in the executionSpec can be set
+ for (String parameter : parameters.keySet()) {
+ if (!executionSpec.getAttributes().containsKey(parameter)) {
+ throw new SlcException("Parameter " + parameter + " is not defined in the ExecutionSpec");
+ }
+ }
+
+ // set the parameters
+ this.parameters.putAll(parameters);
+
+ // check that all the required parameters are defined
+// MapBindingResult errors = new MapBindingResult(parameters, "execution#"
+// + getName());
+ Map<String, String> errors = new HashMap<>();
+ for (String key : executionSpec.getAttributes().keySet()) {
+ ExecutionSpecAttribute attr = executionSpec.getAttributes().get(key);
+
+ if (attr.getIsImmutable() && !isSetAsParameter(key)) {
+ errors.put(key, "Immutable but not set");
+ break;
+ }
+
+ if (attr.getIsConstant() && !isSetAsParameter(key)) {
+ errors.put(key, "Constant but not set as parameter");
+ break;
+ }
+
+ if (attr.getIsHidden() && !isSetAsParameter(key)) {
+ errors.put(key, "Hidden but not set as parameter");
+ break;
+ }
+ }
+
+ if (!errors.isEmpty())
+ throw new SlcException("Could not prepare execution flow: " + errors.toString());
+
+ }
+
+ public void run() {
+ try {
+ for (Runnable executable : executables) {
+ if (Thread.interrupted()) {
+ log.error("Flow '" + getName() + "' killed before '" + executable + "'");
+ Thread.currentThread().interrupt();
+ return;
+ // throw new ThreadDeath();
+ }
+ this.doExecuteRunnable(executable);
+ }
+ } catch (RuntimeException e) {
+ if (Thread.interrupted()) {
+ log.error("Flow '" + getName() + "' killed while receiving an unrelated exception", e);
+ Thread.currentThread().interrupt();
+ return;
+ // throw new ThreadDeath();
+ }
+ if (failOnError)
+ throw e;
+ else {
+ log.error("Execution flow failed," + " but process did not fail" + " because failOnError property"
+ + " is set to false: " + e);
+ if (log.isTraceEnabled())
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * List sub-runnables that would be executed if run() method would be called.
+ */
+ public Iterator<Runnable> runnables() {
+ return executables.iterator();
+ }
+
+ /**
+ * If there is one and only one runnable wrapped return it, throw an exeception
+ * otherwise.
+ */
+ public Runnable getRunnable() {
+ if (executables.size() == 1)
+ return executables.get(0);
+ else
+ throw new SlcException("There are " + executables.size() + " runnables in flow " + getName());
+ }
+
+ public void doExecuteRunnable(Runnable runnable) {
+ try {
+ if (executionContext != null)
+ if (runnable instanceof ExecutionFlow)
+ executionContext.beforeFlow((ExecutionFlow) runnable);
+ runnable.run();
+ } finally {
+ if (executionContext != null)
+ if (runnable instanceof ExecutionFlow)
+ executionContext.afterFlow((ExecutionFlow) runnable);
+ }
+ }
+
+ public void init() throws Exception {
+ if (path == null) {
+ if (name.charAt(0) == '/') {
+ path = name.substring(0, name.lastIndexOf('/'));
+ }
+ }
+
+ if (path != null) {
+ for (Runnable executable : executables) {
+ if (executable instanceof DefaultExecutionFlow) {
+ // so we don't need to have DefaultExecutionFlow
+ // implementing StructureAware
+ // FIXME: probably has side effects
+ DefaultExecutionFlow flow = (DefaultExecutionFlow) executable;
+ String newPath = path + '/' + flow.getName();
+ flow.setPath(newPath);
+ log.warn(newPath + " was forcibly set on " + flow);
+ }
+ }
+ }
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setExecutables(List<Runnable> executables) {
+ this.executables = executables;
+ }
+
+ public void setParameters(Map<String, Object> attributes) {
+ this.parameters = attributes;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public ExecutionSpec getExecutionSpec() {
+ return executionSpec;
+ }
+
+ public Object getParameter(String parameterName) {
+ // Verify that there is a spec attribute
+ ExecutionSpecAttribute specAttr = null;
+ if (executionSpec.getAttributes().containsKey(parameterName)) {
+ specAttr = executionSpec.getAttributes().get(parameterName);
+ } else {
+ throw new SlcException("Key " + parameterName + " is not defined in the specifications of " + toString());
+ }
+
+ if (parameters.containsKey(parameterName)) {
+ Object paramValue = parameters.get(parameterName);
+ return paramValue;
+ } else {
+ if (specAttr.getValue() != null) {
+ return specAttr.getValue();
+ }
+ }
+ throw new SlcException("Key " + parameterName + " is not set as parameter in " + toString());
+ }
+
+ public Boolean isSetAsParameter(String key) {
+ return parameters.containsKey(key) || (executionSpec.getAttributes().containsKey(key)
+ && executionSpec.getAttributes().get(key).getValue() != null);
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuffer("Execution flow ").append(name).toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return ((ExecutionFlow) obj).getName().equals(name);
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public Boolean getFailOnError() {
+ return failOnError;
+ }
+
+ public void setFailOnError(Boolean failOnError) {
+ this.failOnError = failOnError;
+ }
+
+ public void setExecutionContext(ExecutionContext executionContext) {
+ this.executionContext = executionContext;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (C) 2007-2012 Argeo GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.argeo.slc.runtime;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.argeo.slc.execution.ExecutionSpec;
+import org.argeo.slc.execution.ExecutionSpecAttribute;
+import org.argeo.slc.execution.RefSpecAttribute;
+import org.argeo.slc.execution.RefValueChoice;
+
+/** Spring based implementation of execution specifications. */
+public class DefaultExecutionSpec implements ExecutionSpec, Serializable {
+ private static final long serialVersionUID = 7042162759380893595L;
+ private String description;
+ private Map<String, ExecutionSpecAttribute> attributes = new HashMap<String, ExecutionSpecAttribute>();
+
+ private String name = INTERNAL_NAME;
+
+ public Map<String, ExecutionSpecAttribute> getAttributes() {
+ return attributes;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public void setAttributes(Map<String, ExecutionSpecAttribute> attributes) {
+ this.attributes = attributes;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * The Spring bean name (only relevant for specs declared has high-level beans)
+ */
+ public String getName() {
+ return name;
+ }
+
+ public boolean equals(Object obj) {
+ return ((ExecutionSpec) obj).getName().equals(name);
+ }
+
+ /**
+ * The Spring bean description (only relevant for specs declared has high-level
+ * beans)
+ */
+ public String getDescription() {
+ return description;
+ }
+
+ /**
+ * Generates a list of ref value choices based on the bean available in the
+ * application ocntext.
+ */
+ protected List<RefValueChoice> buildRefValueChoices(RefSpecAttribute rsa) {
+ List<RefValueChoice> choices = new ArrayList<RefValueChoice>();
+ // FIXME implement something
+ return choices;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (C) 2007-2012 Argeo GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.argeo.slc.runtime;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Stack;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.execution.ExecutionFlow;
+import org.argeo.slc.execution.ExecutionSpecAttribute;
+import org.argeo.slc.execution.ExecutionStack;
+
+/** Canonical implementation of an execution stack. */
+public class DefaultExecutionStack implements ExecutionStack {
+
+ private final static Log log = LogFactory
+ .getLog(DefaultExecutionStack.class);
+
+ private final Stack<ExecutionFlowRuntime> stack = new Stack<ExecutionFlowRuntime>();
+
+ public synchronized void enterFlow(ExecutionFlow executionFlow) {
+ ExecutionFlowRuntime runtime = new ExecutionFlowRuntime(executionFlow);
+ stack.push(runtime);
+
+ Map<String, ExecutionSpecAttribute> specAttrs = executionFlow
+ .getExecutionSpec().getAttributes();
+ for (String key : specAttrs.keySet()) {
+ if (executionFlow.isSetAsParameter(key)) {
+ runtime.getLocalVariables().put(key,
+ executionFlow.getParameter(key));
+ }
+ }
+ }
+
+ public synchronized String getCurrentStackLevelUuid() {
+ return stack.peek().getUuid();
+ }
+
+ public synchronized Integer getStackSize() {
+ return stack.size();
+ }
+
+ /**
+ * Looks for a set variable in the stack, starting at the upper flows
+ *
+ * @return the variable or <code>null</code> if not found
+ */
+ public synchronized Object findLocalVariable(String key) {
+ Object obj = null;
+ for (int i = 0; i < stack.size(); i++) {
+ if (stack.get(i).getLocalVariables().containsKey(key)) {
+ obj = stack.get(i).getLocalVariables().get(key);
+ break;
+ }
+ }
+ return obj;
+ }
+
+ public synchronized void leaveFlow(ExecutionFlow executionFlow) {
+ ExecutionFlowRuntime leftEf = stack.pop();
+
+ if (!leftEf.getExecutionFlow().getName()
+ .equals(executionFlow.getName()))
+ throw new SlcException("Asked to leave " + executionFlow
+ + " but last is " + leftEf);
+
+ leftEf.getScopedObjects().clear();
+ leftEf.getLocalVariables().clear();
+ }
+
+ public synchronized void addScopedObject(String name, Object obj) {
+ ExecutionFlowRuntime runtime = stack.peek();
+ // TODO: check that the object is not set yet ?
+ if (log.isDebugEnabled()) {
+ Object existing = findScopedObject(name);
+ if (existing != null)
+ log.warn("Scoped object " + name + " of type " + obj.getClass()
+ + " already registered in " + runtime);
+ }
+ runtime.getScopedObjects().put(name, obj);
+ }
+
+ /** @return </code>null<code> if not found */
+ public synchronized Object findScopedObject(String name) {
+ Object obj = null;
+ for (int i = stack.size() - 1; i >= 0; i--) {
+ if (stack.get(i).getScopedObjects().containsKey(name)) {
+ obj = stack.get(i).getScopedObjects().get(name);
+ break;
+ }
+ }
+ return obj;
+ }
+
+ protected static class ExecutionFlowRuntime {
+ private final ExecutionFlow executionFlow;
+ private final Map<String, Object> scopedObjects = new HashMap<String, Object>();
+ private final Map<String, Object> localVariables = new HashMap<String, Object>();
+ private final String uuid = UUID.randomUUID().toString();
+
+ public ExecutionFlowRuntime(ExecutionFlow executionFlow) {
+ this.executionFlow = executionFlow;
+ }
+
+ public ExecutionFlow getExecutionFlow() {
+ return executionFlow;
+ }
+
+ public Map<String, Object> getScopedObjects() {
+ return scopedObjects;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public Map<String, Object> getLocalVariables() {
+ return localVariables;
+ }
+
+ @Override
+ public String toString() {
+ return "Stack Level #" + uuid;
+ }
+
+ }
+}
--- /dev/null
+package org.argeo.slc.runtime;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.argeo.slc.execution.ExecutionProcess;
+import org.argeo.slc.execution.ExecutionStep;
+import org.argeo.slc.execution.RealizedFlow;
+
+/** Canonical implementation of an {@link ExecutionProcess} as a bean. */
+public class DefaultProcess implements ExecutionProcess {
+ private String uuid = UUID.randomUUID().toString();
+ private String status = ExecutionProcess.NEW;
+
+ private List<ExecutionStep> steps = new ArrayList<ExecutionStep>();
+ private List<RealizedFlow> realizedFlows = new ArrayList<RealizedFlow>();
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public void addSteps(List<ExecutionStep> steps) {
+ steps.addAll(steps);
+ }
+
+ public List<RealizedFlow> getRealizedFlows() {
+ return realizedFlows;
+ }
+
+ public List<ExecutionStep> getSteps() {
+ return steps;
+ }
+
+ public void setSteps(List<ExecutionStep> steps) {
+ this.steps = steps;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ public void setRealizedFlows(List<RealizedFlow> realizedFlows) {
+ this.realizedFlows = realizedFlows;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (C) 2007-2012 Argeo GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.argeo.slc.runtime;
+
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.security.auth.Subject;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.execution.ExecutionFlowDescriptor;
+import org.argeo.slc.execution.ExecutionModulesManager;
+import org.argeo.slc.execution.ExecutionStep;
+import org.argeo.slc.execution.FlowConfigurationException;
+import org.argeo.slc.execution.RealizedFlow;
+
+/** Thread of a single execution */
+public class ExecutionThread extends Thread {
+ public final static String SYSPROP_EXECUTION_AUTO_UPGRADE = "slc.execution.autoupgrade";
+ private final static Log log = LogFactory.getLog(ExecutionThread.class);
+
+ private ExecutionModulesManager executionModulesManager;
+ private final RealizedFlow realizedFlow;
+ private final AccessControlContext accessControlContext;
+
+ private List<Runnable> destructionCallbacks = new ArrayList<Runnable>();
+
+ public ExecutionThread(ProcessThreadGroup processThreadGroup, ExecutionModulesManager executionModulesManager,
+ RealizedFlow realizedFlow) {
+ super(processThreadGroup, "Flow " + realizedFlow.getFlowDescriptor().getName());
+ this.realizedFlow = realizedFlow;
+ this.executionModulesManager = executionModulesManager;
+ accessControlContext = AccessController.getContext();
+ }
+
+ public void run() {
+ // authenticate thread
+ // Authentication authentication = getProcessThreadGroup()
+ // .getAuthentication();
+ // if (authentication == null)
+ // throw new SlcException("Can only execute authenticated threads");
+ // SecurityContextHolder.getContext().setAuthentication(authentication);
+
+ // Retrieve execution flow descriptor
+ ExecutionFlowDescriptor executionFlowDescriptor = realizedFlow.getFlowDescriptor();
+ String flowName = executionFlowDescriptor.getName();
+
+ getProcessThreadGroup().dispatchAddStep(
+ new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.PHASE_START, "Flow " + flowName));
+
+ try {
+ Subject subject = Subject.getSubject(accessControlContext);
+ try {
+ Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
+
+ @Override
+ public Void run() throws Exception {
+ String autoUpgrade = System.getProperty(SYSPROP_EXECUTION_AUTO_UPGRADE);
+ if (autoUpgrade != null && autoUpgrade.equals("true"))
+ executionModulesManager.upgrade(realizedFlow.getModuleNameVersion());
+ executionModulesManager.start(realizedFlow.getModuleNameVersion());
+ //
+ // START FLOW
+ //
+ executionModulesManager.execute(realizedFlow);
+ // END FLOW
+ return null;
+ }
+
+ });
+ } catch (PrivilegedActionException privilegedActionException) {
+ throw (Exception) privilegedActionException.getCause();
+ }
+ } catch (FlowConfigurationException e) {
+ String msg = "Configuration problem with flow " + flowName + ":\n" + e.getMessage();
+ log.error(msg);
+ getProcessThreadGroup().dispatchAddStep(
+ new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.ERROR, msg + " " + e.getMessage()));
+ } catch (Exception e) {
+ // TODO: re-throw exception ?
+ String msg = "Execution of flow " + flowName + " failed.";
+ log.error(msg, e);
+ getProcessThreadGroup().dispatchAddStep(
+ new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.ERROR, msg + " " + e.getMessage()));
+ } finally {
+ getProcessThreadGroup().dispatchAddStep(
+ new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.PHASE_END, "Flow " + flowName));
+ processDestructionCallbacks();
+ }
+ }
+
+ private synchronized void processDestructionCallbacks() {
+ for (int i = destructionCallbacks.size() - 1; i >= 0; i--) {
+ try {
+ destructionCallbacks.get(i).run();
+ } catch (Exception e) {
+ log.warn("Could not process destruction callback " + i + " in thread " + getName(), e);
+ }
+ }
+ }
+
+ /**
+ * Gather object destruction callback to be called in reverse order at the
+ * end of the thread
+ */
+ public synchronized void registerDestructionCallback(String name, Runnable callback) {
+ destructionCallbacks.add(callback);
+ }
+
+ protected ProcessThreadGroup getProcessThreadGroup() {
+ return (ProcessThreadGroup) getThreadGroup();
+ }
+}
\ No newline at end of file
--- /dev/null
+/*\r
+ * Copyright (C) 2007-2012 Argeo GmbH\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package org.argeo.slc.runtime;\r
+\r
+import java.util.Stack;\r
+\r
+import org.apache.commons.logging.Log;\r
+import org.apache.commons.logging.LogFactory;\r
+import org.argeo.slc.SlcException;\r
+import org.argeo.slc.execution.ExecutionFlow;\r
+import org.argeo.slc.execution.ExecutionSpecAttribute;\r
+import org.argeo.slc.execution.RefSpecAttribute;\r
+import org.argeo.slc.primitive.PrimitiveSpecAttribute;\r
+import org.argeo.slc.primitive.PrimitiveUtils;\r
+\r
+/** Manage parameters that need to be set during the instantiation of a flow */\r
+public class InstantiationManager {\r
+\r
+ private final static Log log = LogFactory\r
+ .getLog(InstantiationManager.class);\r
+\r
+ private ThreadLocal<Stack<ExecutionFlow>> flowStack = new ThreadLocal<Stack<ExecutionFlow>>();\r
+\r
+ public Object createRef(String name) {\r
+\r
+ if ((flowStack.get() == null) || flowStack.get().empty()) {\r
+ throw new SlcException("No flow is currently initializing."\r
+ + " Declare ParameterRef as inner beans or prototypes.");\r
+ }\r
+\r
+ return getInitializingFlowParameter(name);\r
+ }\r
+\r
+ public void flowInitializationStarted(ExecutionFlow flow, String flowName) {\r
+ // set the flow name if it is DefaultExecutionFlow\r
+ if (flow instanceof DefaultExecutionFlow) {\r
+ ((DefaultExecutionFlow) flow).setName(flowName);\r
+ }\r
+\r
+ if (log.isTraceEnabled())\r
+ log.trace("Start initialization of " + flow.hashCode() + " ("\r
+ + flow + " - " + flow.getClass() + ")");\r
+\r
+ // log.info("# flowInitializationStarted " + flowName);\r
+ // create a stack for this thread if there is none\r
+ if (flowStack.get() == null) {\r
+ flowStack.set(new Stack<ExecutionFlow>());\r
+ }\r
+ flowStack.get().push(flow);\r
+ }\r
+\r
+ public void flowInitializationFinished(ExecutionFlow flow, String flowName) {\r
+ if (log.isTraceEnabled())\r
+ log.trace("Finish initialization of " + flow.hashCode() + " ("\r
+ + flow + " - " + flow.getClass() + ")");\r
+\r
+ if (flowStack.get() != null) {\r
+ ExecutionFlow registeredFlow = flowStack.get().pop();\r
+ if (registeredFlow != null) {\r
+ if (!flow.getName().equals(registeredFlow.getName()))\r
+ throw new SlcException("Current flow is " + flow);\r
+ // log.info("# flowInitializationFinished " + flowName);\r
+ // initializingFlow.set(null);\r
+ }\r
+ } else {\r
+ // happens for flows imported as services\r
+ log.warn("flowInitializationFinished - Flow Stack is null");\r
+ }\r
+ }\r
+\r
+ protected ExecutionFlow findInitializingFlowWithParameter(String key) {\r
+ if ((flowStack.get() == null) || flowStack.get().empty())\r
+ throw new SlcException("No initializing flow available.");\r
+\r
+ // first look in the outer flow (that may override parameters)\r
+ for (int i = 0; i < flowStack.get().size(); i++) {\r
+ if (flowStack.get().elementAt(i).isSetAsParameter(key)) {\r
+ return flowStack.get().elementAt(i);\r
+ }\r
+ }\r
+ throw new SlcException("Key " + key + " is not set as parameter in "\r
+ + flowStack.get().firstElement().toString() + " (stack size="\r
+ + flowStack.get().size() + ")");\r
+\r
+ }\r
+\r
+ public Object getInitializingFlowParameter(String key) {\r
+ return findInitializingFlowWithParameter(key).getParameter(key);\r
+ }\r
+\r
+ public Class<?> getInitializingFlowParameterClass(String key) {\r
+ ExecutionSpecAttribute attr = findInitializingFlowWithParameter(key)\r
+ .getExecutionSpec().getAttributes().get(key);\r
+ if (attr instanceof RefSpecAttribute)\r
+ return ((RefSpecAttribute) attr).getTargetClass();\r
+ else if (attr instanceof PrimitiveSpecAttribute) {\r
+ String type = ((PrimitiveSpecAttribute) attr).getType();\r
+ Class<?> clss = PrimitiveUtils.typeAsClass(type);\r
+ if (clss == null)\r
+ throw new SlcException("Cannot convert type " + type\r
+ + " to class.");\r
+ return clss;\r
+ } else\r
+ return null;\r
+ }\r
+\r
+ public Boolean isInFlowInitialization() {\r
+ return (flowStack.get() != null) && !flowStack.get().empty();\r
+ }\r
+}\r
--- /dev/null
+/*
+ * Copyright (C) 2007-2012 Argeo GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.argeo.slc.runtime;
+
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.security.auth.Subject;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.execution.ExecutionModulesManager;
+import org.argeo.slc.execution.ExecutionProcess;
+import org.argeo.slc.execution.ExecutionStep;
+import org.argeo.slc.execution.RealizedFlow;
+
+/**
+ * Main thread coordinating an {@link ExecutionProcess}, launching parallel or
+ * sequential {@link ExecutionThread}s.
+ */
+public class ProcessThread extends Thread {
+ private final static Log log = LogFactory.getLog(ProcessThread.class);
+
+ private final ExecutionModulesManager executionModulesManager;
+ private final ExecutionProcess process;
+ private final ProcessThreadGroup processThreadGroup;
+
+ private Set<ExecutionThread> executionThreads = Collections.synchronizedSet(new HashSet<ExecutionThread>());
+
+ // private Boolean hadAnError = false;
+ private Boolean killed = false;
+
+ private final AccessControlContext accessControlContext;
+
+ public ProcessThread(ThreadGroup processesThreadGroup, ExecutionModulesManager executionModulesManager,
+ ExecutionProcess process) {
+ super(processesThreadGroup, "SLC Process #" + process.getUuid());
+ this.executionModulesManager = executionModulesManager;
+ this.process = process;
+ processThreadGroup = new ProcessThreadGroup(process);
+ accessControlContext = AccessController.getContext();
+ }
+
+ public final void run() {
+ // authenticate thread
+ // Authentication authentication = getProcessThreadGroup()
+ // .getAuthentication();
+ // if (authentication == null)
+ // throw new SlcException("Can only execute authenticated threads");
+ // SecurityContextHolder.getContext().setAuthentication(authentication);
+
+ log.info("\n##\n## SLC Process #" + process.getUuid() + " STARTED\n##\n");
+
+ // Start logging
+ new LoggingThread().start();
+
+ process.setStatus(ExecutionProcess.RUNNING);
+ try {
+ Subject subject = Subject.getSubject(accessControlContext);
+ try {
+ Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
+
+ @Override
+ public Void run() throws Exception {
+ process();
+ return null;
+ }
+
+ });
+ } catch (PrivilegedActionException privilegedActionException) {
+ Throwable cause = privilegedActionException.getCause();
+ if (cause instanceof InterruptedException)
+ throw (InterruptedException) cause;
+ else
+ throw new SlcException("Cannot process", cause);
+ }
+ // process();
+ } catch (InterruptedException e) {
+ die();
+ return;
+ } catch (Exception e) {
+ String msg = "Process " + getProcess().getUuid() + " failed unexpectedly.";
+ log.error(msg, e);
+ getProcessThreadGroup()
+ .dispatchAddStep(new ExecutionStep("Process", ExecutionStep.ERROR, msg + " " + e.getMessage()));
+ }
+
+ // waits for all execution threads to complete (in case they were
+ // started asynchronously)
+ for (ExecutionThread executionThread : executionThreads) {
+ if (executionThread.isAlive()) {
+ try {
+ executionThread.join();
+ } catch (InterruptedException e) {
+ die();
+ return;
+ }
+ }
+ }
+
+ computeFinalStatus();
+ }
+
+ /** Make sure this is called BEFORE all the threads are interrupted. */
+ private void computeFinalStatus() {
+ // String oldStatus = process.getStatus();
+ // TODO: error management at flow level?
+ if (killed)
+ process.setStatus(ExecutionProcess.KILLED);
+ else if (processThreadGroup.hadAnError())
+ process.setStatus(ExecutionProcess.ERROR);
+ else
+ process.setStatus(ExecutionProcess.COMPLETED);
+ // executionModulesManager.dispatchUpdateStatus(process, oldStatus,
+ // process.getStatus());
+ log.info("\n## SLC Process #" + process.getUuid() + " " + process.getStatus() + "\n");
+ }
+
+ /** Called when being killed */
+ private synchronized void die() {
+ killed = true;
+ computeFinalStatus();
+ for (ExecutionThread executionThread : executionThreads) {
+ try {
+ executionThread.interrupt();
+ } catch (Exception e) {
+ log.error("Cannot interrupt " + executionThread);
+ }
+ }
+ processThreadGroup.interrupt();
+ }
+
+ /**
+ * Implementation specific execution. To be overridden in order to deal with
+ * custom process types. Default expects an {@link SlcExecution}.
+ */
+ protected void process() throws InterruptedException {
+ List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
+ flowsToProcess.addAll(process.getRealizedFlows());
+ while (flowsToProcess.size() > 0) {
+ RealizedFlow realizedFlow = flowsToProcess.remove(0);
+ execute(realizedFlow, true);
+ }
+ }
+
+ /** @return the (distinct) thread used for this execution */
+ protected final void execute(RealizedFlow realizedFlow, Boolean synchronous) throws InterruptedException {
+ if (killed)
+ return;
+
+ ExecutionThread thread = new ExecutionThread(processThreadGroup, executionModulesManager, realizedFlow);
+ executionThreads.add(thread);
+ thread.start();
+
+ if (synchronous)
+ thread.join();
+
+ return;
+ }
+
+ // public void notifyError() {
+ // hadAnError = true;
+ // }
+ //
+ // public synchronized void flowCompleted() {
+ // // notifyAll();
+ // }
+
+ public ExecutionProcess getProcess() {
+ return process;
+ }
+
+ public ProcessThreadGroup getProcessThreadGroup() {
+ return processThreadGroup;
+ }
+
+ public ExecutionModulesManager getExecutionModulesManager() {
+ return executionModulesManager;
+ }
+
+ private class LoggingThread extends Thread {
+
+ public LoggingThread() {
+ super("SLC Process Logger #" + process.getUuid());
+ }
+
+ public void run() {
+ boolean run = true;
+ while (run) {
+ List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
+ processThreadGroup.getSteps().drainTo(newSteps);
+ if (newSteps.size() > 0) {
+ // System.out.println(steps.size() + " steps");
+ process.addSteps(newSteps);
+ }
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ break;
+ }
+
+ if (!ProcessThread.this.isAlive() && processThreadGroup.getSteps().size() == 0)
+ run = false;
+ }
+ }
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (C) 2007-2012 Argeo GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.argeo.slc.runtime;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.argeo.slc.execution.ExecutionProcess;
+import org.argeo.slc.execution.ExecutionStep;
+
+/** The thread group attached to a given {@link SlcExecution}. */
+public class ProcessThreadGroup extends ThreadGroup {
+// private final Authentication authentication;
+ private final static Integer STEPS_BUFFER_CAPACITY = 5000;
+
+ private BlockingQueue<ExecutionStep> steps = new ArrayBlockingQueue<ExecutionStep>(
+ STEPS_BUFFER_CAPACITY);
+
+ private Boolean hadAnError = false;
+
+ public ProcessThreadGroup(ExecutionProcess executionProcess) {
+ super("SLC Process #" + executionProcess.getUuid() + " thread group");
+// this.authentication = SecurityContextHolder.getContext()
+// .getAuthentication();
+ }
+
+// public Authentication getAuthentication() {
+// return authentication;
+// }
+
+ public void dispatchAddStep(ExecutionStep step) {
+ // ExecutionProcess slcProcess = processThread.getProcess();
+ // List<ExecutionStep> steps = new ArrayList<ExecutionStep>();
+ // steps.add(step);
+ // TODO clarify why we don't dispatch steps, must be a reason
+ // dispatchAddSteps(steps);
+ // slcProcess.addSteps(steps);
+ if (step.getType().equals(ExecutionStep.ERROR))
+ hadAnError = true;
+ this.steps.add(step);
+ }
+
+ // public void dispatchAddSteps(List<ExecutionStep> steps) {
+ // ExecutionProcess slcProcess = processThread.getProcess();
+ // executionModulesManager.dispatchAddSteps(slcProcess, steps);
+ // }
+
+ public BlockingQueue<ExecutionStep> getSteps() {
+ return steps;
+ }
+
+ public Boolean hadAnError() {
+ return hadAnError;
+ }
+}
+++ /dev/null
-/*
- * Copyright (C) 2007-2012 Argeo GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.argeo.slc.core.execution;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.argeo.slc.execution.ExecutionContext;
-import org.argeo.slc.execution.ExecutionFlow;
-import org.argeo.slc.execution.ExecutionFlowDescriptorConverter;
-import org.argeo.slc.execution.ExecutionModulesManager;
-import org.argeo.slc.execution.RealizedFlow;
-
-/** Provides the base feature of an execution module manager. */
-public abstract class AbstractExecutionModulesManager implements
- ExecutionModulesManager {
- private final static Log log = LogFactory
- .getLog(AbstractExecutionModulesManager.class);
-
- // private List<FilteredNotifier> filteredNotifiers = Collections
- // .synchronizedList(new ArrayList<FilteredNotifier>());
-
- protected abstract ExecutionFlow findExecutionFlow(String moduleName,
- String moduleVersion, String flowName);
-
- protected abstract ExecutionContext findExecutionContext(String moduleName,
- String moduleVersion);
-
- protected abstract ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter(
- String moduleName, String moduleVersion);
-
- public void execute(RealizedFlow realizedFlow) {
- if (log.isTraceEnabled())
- log.trace("Executing " + realizedFlow);
-
- String moduleName = realizedFlow.getModuleName();
- String moduleVersion = realizedFlow.getModuleVersion();
-
- Map<? extends String, ? extends Object> variablesToAdd = getExecutionFlowDescriptorConverter(
- moduleName, moduleVersion).convertValues(
- realizedFlow.getFlowDescriptor());
- ExecutionContext executionContext = findExecutionContext(moduleName,
- moduleVersion);
- for (String key : variablesToAdd.keySet())
- executionContext.setVariable(key, variablesToAdd.get(key));
-
- ExecutionFlow flow = findExecutionFlow(moduleName, moduleVersion,
- realizedFlow.getFlowDescriptor().getName());
-
- //
- // Actually runs the flow, IN THIS THREAD
- //
- executionContext.beforeFlow(flow);
- try {
- flow.run();
- } finally {
- executionContext.afterFlow(flow);
- }
- //
- //
- //
- }
-
- // public void dispatchUpdateStatus(ExecutionProcess process,
- // String oldStatus, String newStatus) {
- // // filtered notifiers
- // for (Iterator<FilteredNotifier> it = filteredNotifiers.iterator(); it
- // .hasNext();) {
- // FilteredNotifier filteredNotifier = it.next();
- // if (filteredNotifier.receiveFrom(process))
- // filteredNotifier.getNotifier().updateStatus(process, oldStatus,
- // newStatus);
- // }
- //
- // }
-
- // public void dispatchAddSteps(ExecutionProcess process,
- // List<ExecutionStep> steps) {
- // process.addSteps(steps);
- // for (Iterator<FilteredNotifier> it = filteredNotifiers.iterator(); it
- // .hasNext();) {
- // FilteredNotifier filteredNotifier = it.next();
- // if (filteredNotifier.receiveFrom(process))
- // filteredNotifier.getNotifier().addSteps(process, steps);
- // }
- // }
-
- // public void registerProcessNotifier(ExecutionProcessNotifier notifier,
- // Map<String, String> properties) {
- // filteredNotifiers.add(new FilteredNotifier(notifier, properties));
- // }
- //
- // public void unregisterProcessNotifier(ExecutionProcessNotifier notifier,
- // Map<String, String> properties) {
- // filteredNotifiers.remove(notifier);
- // }
-
- // protected class FilteredNotifier {
- // private final ExecutionProcessNotifier notifier;
- // private final String processId;
- //
- // public FilteredNotifier(ExecutionProcessNotifier notifier,
- // Map<String, String> properties) {
- // super();
- // this.notifier = notifier;
- // if (properties == null)
- // properties = new HashMap<String, String>();
- // if (properties.containsKey(SLC_PROCESS_ID))
- // processId = properties.get(SLC_PROCESS_ID);
- // else
- // processId = null;
- // }
- //
- // /**
- // * Whether event from this process should be received by this listener.
- // */
- // public Boolean receiveFrom(ExecutionProcess process) {
- // if (processId != null)
- // if (process.getUuid().equals(processId))
- // return true;
- // else
- // return false;
- // return true;
- // }
- //
- // @Override
- // public int hashCode() {
- // return notifier.hashCode();
- // }
- //
- // @Override
- // public boolean equals(Object obj) {
- // if (obj instanceof FilteredNotifier) {
- // FilteredNotifier fn = (FilteredNotifier) obj;
- // return notifier.equals(fn.notifier);
- // } else if (obj instanceof ExecutionProcessNotifier) {
- // ExecutionProcessNotifier epn = (ExecutionProcessNotifier) obj;
- // return notifier.equals(epn);
- // } else
- // return false;
- // }
- //
- // public ExecutionProcessNotifier getNotifier() {
- // return notifier;
- // }
- //
- // }
-}
*/
package org.argeo.slc.core.execution;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URLDecoder;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.argeo.slc.DefaultNameVersion;
-import org.argeo.slc.NameVersion;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.execution.ExecutionModuleDescriptor;
-import org.argeo.slc.execution.ExecutionModulesManager;
-import org.argeo.slc.execution.ExecutionProcess;
-import org.argeo.slc.execution.SlcAgent;
-
/** Implements the base methods of an SLC agent. */
-public class DefaultAgent implements SlcAgent {
- // private final static Log log = LogFactory.getLog(DefaultAgent.class);
- /** UTF-8 charset for encoding. */
- private final static String UTF8 = "UTF-8";
-
- private String agentUuid = null;
- private ExecutionModulesManager modulesManager;
-
- private ThreadGroup processesThreadGroup;
- private Map<String, ProcessThread> runningProcesses = Collections
- .synchronizedMap(new HashMap<String, ProcessThread>());
-
- private String defaultModulePrefix = null;
-
- /*
- * LIFECYCLE
- */
- /** Initialization */
- public void init() {
- agentUuid = initAgentUuid();
- processesThreadGroup = new ThreadGroup("SLC Processes of Agent #"
- + agentUuid);
- }
-
- /** Clean up (needs to be called by overriding method) */
- public void destroy() {
- }
-
- /**
- * Called during initialization in order to determines the agent UUID. To be
- * overridden. By default creates a new one per instance.
- */
- protected String initAgentUuid() {
- return UUID.randomUUID().toString();
- }
-
- /*
- * SLC AGENT
- */
- public void process(ExecutionProcess process) {
- ProcessThread processThread = createProcessThread(processesThreadGroup,
- modulesManager, process);
- processThread.start();
- runningProcesses.put(process.getUuid(), processThread);
-
- // clean up old processes
- Iterator<ProcessThread> it = runningProcesses.values().iterator();
- while (it.hasNext()) {
- ProcessThread pThread = it.next();
- if (!pThread.isAlive())
- it.remove();
- }
- }
-
- public String process(List<URI> uris) {
- DefaultProcess process = new DefaultProcess();
- for (URI uri : uris) {
- String[] path = uri.getPath().split("/");
- if (path.length < 3)
- throw new SlcException("Badly formatted URI: " + uri);
- NameVersion nameVersion = new DefaultNameVersion(path[1]);
- StringBuilder flow = new StringBuilder();
- for (int i = 2; i < path.length; i++)
- flow.append('/').append(path[i]);
-
- Map<String, Object> values = getQueryMap(uri.getQuery());
- // Get execution module descriptor
- ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
- nameVersion.getName(), nameVersion.getVersion());
- process.getRealizedFlows().add(
- emd.asRealizedFlow(flow.toString(), values));
- }
- process(process);
- return process.getUuid();
- }
-
- public void kill(String processUuid) {
- if (runningProcesses.containsKey(processUuid)) {
- runningProcesses.get(processUuid).interrupt();
- } else {
- // assume is finished
- }
- }
-
- public void waitFor(String processUuid, Long millis) {
- if (runningProcesses.containsKey(processUuid)) {
- try {
- if (millis != null)
- runningProcesses.get(processUuid).join(millis);
- else
- runningProcesses.get(processUuid).join();
- } catch (InterruptedException e) {
- // silent
- }
- } else {
- // assume is finished
- }
- }
-
- /** Creates the thread which will coordinate the execution for this agent. */
- protected ProcessThread createProcessThread(
- ThreadGroup processesThreadGroup,
- ExecutionModulesManager modulesManager, ExecutionProcess process) {
- ProcessThread processThread = new ProcessThread(processesThreadGroup,
- modulesManager, process);
- return processThread;
- }
-
- public ExecutionModuleDescriptor getExecutionModuleDescriptor(
- String moduleName, String moduleVersion) {
- // Get execution module descriptor
- ExecutionModuleDescriptor emd;
- try {
- modulesManager
- .start(new DefaultNameVersion(moduleName, moduleVersion));
- emd = modulesManager.getExecutionModuleDescriptor(moduleName,
- moduleVersion);
- } catch (SlcException e) {
- if (defaultModulePrefix != null) {
- moduleName = defaultModulePrefix + "." + moduleName;
- modulesManager.start(new DefaultNameVersion(moduleName,
- moduleVersion));
- emd = modulesManager.getExecutionModuleDescriptor(moduleName,
- moduleVersion);
- } else
- throw e;
- }
- return emd;
- }
-
- public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
- return modulesManager.listExecutionModules();
- }
+@Deprecated
+public class DefaultAgent extends org.argeo.slc.runtime.DefaultAgent {
- public boolean ping() {
- return true;
+ public DefaultAgent() {
+ super();
}
- /*
- * UTILITIES
- */
- /**
- * @param query
- * can be null
- */
- static Map<String, Object> getQueryMap(String query) {
- Map<String, Object> map = new LinkedHashMap<String, Object>();
- if (query == null)
- return map;
- String[] params = query.split("&");
- for (String param : params) {
- String[] arr = param.split("=");
- String name = arr[0];
- Object value = arr.length > 1 ? param.split("=")[1] : Boolean.TRUE;
- try {
- map.put(URLDecoder.decode(name, UTF8),
- URLDecoder.decode(value.toString(), UTF8));
- } catch (UnsupportedEncodingException e) {
- throw new SlcException("Cannot decode '" + param + "'", e);
- }
- }
- return map;
- }
-
- /*
- * BEAN
- */
- public void setModulesManager(ExecutionModulesManager modulesManager) {
- this.modulesManager = modulesManager;
- }
-
- public void setDefaultModulePrefix(String defaultModulePrefix) {
- this.defaultModulePrefix = defaultModulePrefix;
- }
-
- public String getAgentUuid() {
- return agentUuid;
- }
-
- @Override
- public String toString() {
- return "Agent #" + getAgentUuid();
- }
}
package org.argeo.slc.core.execution;
-import java.net.URI;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.argeo.slc.DefaultNameVersion;
-import org.argeo.slc.NameVersion;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.execution.ExecutionFlowDescriptor;
-import org.argeo.slc.execution.ExecutionModuleDescriptor;
-import org.argeo.slc.execution.ExecutionSpec;
-import org.argeo.slc.execution.ExecutionSpecAttribute;
-import org.argeo.slc.execution.SlcAgent;
-import org.argeo.slc.execution.SlcAgentCli;
-
/**
* Authenticates thread and executes synchronously a command line execution.
* Reference implementation of args to URIs algorithm.
*/
-public class DefaultAgentCli implements SlcAgentCli {
- private final static Log log = LogFactory.getLog(DefaultAgentCli.class);
-
- private final static String UTF8 = "UTF-8";
- private SlcAgent agent;
-// private AuthenticationManager authenticationManager;
-
- private Long timeout = 24 * 60 * 60 * 1000l;
-
- public String process(String[] args) {
-// if (SecurityContextHolder.getContext().getAuthentication() == null) {
-// OsAuthenticationToken oat = new OsAuthenticationToken();
-// Authentication authentication = authenticationManager
-// .authenticate(oat);
-// SecurityContextHolder.getContext()
-// .setAuthentication(authentication);
-// }
-
- if (args.length > 0 && args[0].equals("help")) {
- StringBuilder buf = new StringBuilder();
- help(args, buf);
- log.info("\n" + buf);
- return buf.toString();
- } else {
- List<URI> uris = asURIs(args);
- String processUuid = agent.process(uris);
- agent.waitFor(processUuid, timeout);
- return processUuid;
- }
- }
-
- protected void help(String[] rawArgs, StringBuilder buf) {
- String[] args = Arrays.copyOfRange(rawArgs, 1, rawArgs.length);
- if (args.length == 0) {// modules
- for (ExecutionModuleDescriptor emd : agent
- .listExecutionModuleDescriptors()) {
- appendModule(emd, buf);
- }
- } else if (args.length == 1 && !args[0].contains("/")) {// single module
- NameVersion nameVersion = new DefaultNameVersion(args[0]);
- ExecutionModuleDescriptor emd = agent.getExecutionModuleDescriptor(
- nameVersion.getName(), nameVersion.getVersion());
- appendModule(emd, buf);
-
- // flows
- for (ExecutionFlowDescriptor efd : emd.getExecutionFlows()) {
- buf.append(" ").append(efd.getName());
- if (efd.getDescription() != null
- && !efd.getDescription().trim().equals(""))
- buf.append(" : ").append(" ").append(efd.getDescription());
- buf.append('\n');
- }
- return;
- } else {
- List<URI> uris = asURIs(args);
- for (URI uri : uris) {
- appendUriHelp(uri, buf);
- }
- }
- }
-
- protected void appendUriHelp(URI uri, StringBuilder buf) {
- String[] path = uri.getPath().split("/");
- NameVersion nameVersion = new DefaultNameVersion(path[1]);
- ExecutionModuleDescriptor emd = agent.getExecutionModuleDescriptor(
- nameVersion.getName(), nameVersion.getVersion());
-
- StringBuilder flow = new StringBuilder();
- for (int i = 2; i < path.length; i++)
- flow.append('/').append(path[i]);
- String flowPath = flow.toString();
- ExecutionFlowDescriptor efd = findExecutionFlowDescriptor(emd, flowPath);
- if (efd == null)
- throw new SlcException("Flow " + uri + " not found");
-
- appendModule(emd, buf);
-
- buf.append(" ").append(efd.getName());
- if (efd.getDescription() != null
- && !efd.getDescription().trim().equals(""))
- buf.append(" : ").append(" ").append(efd.getDescription());
- buf.append('\n');
- Map<String, Object> values = DefaultAgent.getQueryMap(uri.getQuery());
- ExecutionSpec spec = efd.getExecutionSpec();
- for (String attrKey : spec.getAttributes().keySet()) {
- ExecutionSpecAttribute esa = spec.getAttributes().get(attrKey);
- buf.append(" --").append(attrKey);
- if (values.containsKey(attrKey))
- buf.append(" ").append(values.get(attrKey));
- if (esa.getValue() != null)
- buf.append(" (").append(esa.getValue()).append(')');
- buf.append('\n');
- }
- }
+@Deprecated
+public class DefaultAgentCli extends org.argeo.slc.runtime.DefaultAgentCli {
- private void appendModule(ExecutionModuleDescriptor emd, StringBuilder buf) {
- buf.append("# ").append(emd.getName());
- if (emd.getDescription() != null
- && !emd.getDescription().trim().equals(""))
- buf.append(" : ").append(emd.getDescription());
- if (emd.getVersion() != null)
- buf.append(" (v").append(emd.getVersion()).append(")");
- buf.append('\n');
+ public DefaultAgentCli() {
+ super();
}
-
- public static List<URI> asURIs(String[] args) {
- try {
- List<URI> uris = new ArrayList<URI>();
- List<String> leftOvers = new ArrayList<String>();
-
- Boolean hasArgs = false;
- String currKey = null;
- StringBuilder currUri = null;
- Iterator<String> argIt = Arrays.asList(args).iterator();
- while (argIt.hasNext()) {
- String arg = argIt.next();
- if (!arg.startsWith("-")) {
- if (currKey != null) {// value
- currUri.append(URLEncoder.encode(arg, UTF8));
- currKey = null;
- } else { // module
- if (currUri != null) {
- uris.add(new URI(currUri.toString()));
- }
- currUri = new StringBuilder("flow:");
-
- String currModule = arg;
- currUri.append('/').append(currModule);
- if (!arg.contains("/")) {
- // flow path not in arg go to next arg
- if (!argIt.hasNext())
- throw new SlcException("No flow found");
- String currFlow = argIt.next();
- if (!currFlow.startsWith("/"))
- currFlow = "/" + currFlow;
- currUri.append(currFlow);
- }
- }
- } else {
- if (currUri == null) {// first args
- leftOvers.add(arg);
- } else {
- String key;
- if (arg.startsWith("--"))
- key = arg.substring(2);
- else if (arg.startsWith("-"))
- key = arg.substring(1);
- else {
- throw new SlcException("Cannot intepret key: "
- + arg);
- }
-
- if (!hasArgs) {
- currUri.append('?');
- hasArgs = true;
- } else {
- currUri.append('&');
- }
-
- // deal with boolean keys
- if (currKey != null) {// value
- currUri.append(URLEncoder.encode("true", UTF8));
- currKey = null;
- }
-
- currKey = key;
- currUri.append(URLEncoder.encode(key, UTF8))
- .append('=');
- }
- }
- }
- if (currUri != null)
- uris.add(new URI(currUri.toString()));
- return uris;
- } catch (Exception e) {
- throw new SlcException("Cannot convert " + Arrays.toString(args)
- + " to flow URI", e);
- }
- }
-
- private ExecutionFlowDescriptor findExecutionFlowDescriptor(
- ExecutionModuleDescriptor emd, String flowPath) {
- ExecutionFlowDescriptor flowDescriptor = null;
- for (ExecutionFlowDescriptor efd : emd.getExecutionFlows()) {
- String name = efd.getName();
- // normalize name as flow path
- if (!name.startsWith("/"))
- name = "/" + name;
- if (name.endsWith("/"))
- name = name.substring(0, name.length() - 1);
- if (name.equals(flowPath)) {
- flowDescriptor = efd;
- break;
- }
- }
- return flowDescriptor;
- }
-
- public void setAgent(SlcAgent agent) {
- this.agent = agent;
- }
-
-// public void setAuthenticationManager(
-// AuthenticationManager authenticationManager) {
-// this.authenticationManager = authenticationManager;
-// }
-
- public void setTimeout(Long timeout) {
- this.timeout = timeout;
- }
-
}
*/
package org.argeo.slc.core.execution;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.execution.ExecutionContext;
-import org.argeo.slc.execution.ExecutionFlow;
import org.argeo.slc.execution.ExecutionSpec;
-import org.argeo.slc.execution.ExecutionSpecAttribute;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.InitializingBean;
-import org.springframework.validation.MapBindingResult;
/** Default implementation of an execution flow. */
-public class DefaultExecutionFlow implements ExecutionFlow, InitializingBean,
- BeanNameAware {
- private final static Log log = LogFactory
- .getLog(DefaultExecutionFlow.class);
-
- private final ExecutionSpec executionSpec;
- private String name = null;
- private Map<String, Object> parameters = new HashMap<String, Object>();
- private List<Runnable> executables = new ArrayList<Runnable>();
-
- private String path;
-
- private Boolean failOnError = true;
-
- // Only needed if stacked execution flows are used
- private ExecutionContext executionContext = null;
-
+@Deprecated
+public class DefaultExecutionFlow extends org.argeo.slc.runtime.DefaultExecutionFlow
+ implements InitializingBean, BeanNameAware {
public DefaultExecutionFlow() {
- this.executionSpec = new DefaultExecutionSpec();
+ super();
}
- public DefaultExecutionFlow(ExecutionSpec executionSpec) {
- this.executionSpec = executionSpec;
+ public DefaultExecutionFlow(ExecutionSpec executionSpec, Map<String, Object> parameters) {
+ super(executionSpec, parameters);
}
- public DefaultExecutionFlow(ExecutionSpec executionSpec,
- Map<String, Object> parameters) {
- // be sure to have an execution spec
- this.executionSpec = (executionSpec == null) ? new DefaultExecutionSpec()
- : executionSpec;
-
- // only parameters contained in the executionSpec can be set
- for (String parameter : parameters.keySet()) {
- if (!executionSpec.getAttributes().containsKey(parameter)) {
- throw new SlcException("Parameter " + parameter
- + " is not defined in the ExecutionSpec");
- }
- }
-
- // set the parameters
- this.parameters.putAll(parameters);
-
- // check that all the required parameters are defined
- MapBindingResult errors = new MapBindingResult(parameters, "execution#"
- + getName());
- for (String key : executionSpec.getAttributes().keySet()) {
- ExecutionSpecAttribute attr = executionSpec.getAttributes()
- .get(key);
-
- if (attr.getIsImmutable() && !isSetAsParameter(key)) {
- errors.rejectValue(key, "Immutable but not set");
- break;
- }
-
- if (attr.getIsConstant() && !isSetAsParameter(key)) {
- errors.rejectValue(key, "Constant but not set as parameter");
- break;
- }
-
- if (attr.getIsHidden() && !isSetAsParameter(key)) {
- errors.rejectValue(key, "Hidden but not set as parameter");
- break;
- }
- }
-
- if (errors.hasErrors())
- throw new SlcException("Could not prepare execution flow: "
- + errors.toString());
-
- }
-
- public void run() {
- try {
- for (Runnable executable : executables) {
- if (Thread.interrupted()) {
- log.error("Flow '" + getName() + "' killed before '"
- + executable + "'");
- Thread.currentThread().interrupt();
- return;
- // throw new ThreadDeath();
- }
- this.doExecuteRunnable(executable);
- }
- } catch (RuntimeException e) {
- if (Thread.interrupted()) {
- log.error("Flow '" + getName()
- + "' killed while receiving an unrelated exception", e);
- Thread.currentThread().interrupt();
- return;
- // throw new ThreadDeath();
- }
- if (failOnError)
- throw e;
- else {
- log.error("Execution flow failed,"
- + " but process did not fail"
- + " because failOnError property"
- + " is set to false: " + e);
- if (log.isTraceEnabled())
- e.printStackTrace();
- }
- }
- }
-
- /**
- * List sub-runnables that would be executed if run() method would be
- * called.
- */
- public Iterator<Runnable> runnables() {
- return executables.iterator();
- }
-
- /**
- * If there is one and only one runnable wrapped return it, throw an
- * exeception otherwise.
- */
- public Runnable getRunnable() {
- if (executables.size() == 1)
- return executables.get(0);
- else
- throw new SlcException("There are " + executables.size()
- + " runnables in flow " + getName());
- }
-
- public void doExecuteRunnable(Runnable runnable) {
- try {
- if (executionContext != null)
- if (runnable instanceof ExecutionFlow)
- executionContext.beforeFlow((ExecutionFlow) runnable);
- runnable.run();
- } finally {
- if (executionContext != null)
- if (runnable instanceof ExecutionFlow)
- executionContext.afterFlow((ExecutionFlow) runnable);
- }
+ public DefaultExecutionFlow(ExecutionSpec executionSpec) {
+ super(executionSpec);
}
public void afterPropertiesSet() throws Exception {
- if (path == null) {
- if (name.charAt(0) == '/') {
- path = name.substring(0, name.lastIndexOf('/'));
- }
- }
-
- if (path != null) {
- for (Runnable executable : executables) {
- if (executable instanceof DefaultExecutionFlow) {
- // so we don't need to have DefaultExecutionFlow
- // implementing StructureAware
- // FIXME: probably has side effects
- DefaultExecutionFlow flow = (DefaultExecutionFlow) executable;
- String newPath = path + '/' + flow.getName();
- flow.setPath(newPath);
- log.warn(newPath + " was forcibly set on " + flow);
- }
- }
- }
+ init();
}
public void setBeanName(String name) {
- this.name = name;
- }
-
- public void setExecutables(List<Runnable> executables) {
- this.executables = executables;
- }
-
- public void setParameters(Map<String, Object> attributes) {
- this.parameters = attributes;
- }
-
- public String getName() {
- return name;
- }
-
- public ExecutionSpec getExecutionSpec() {
- return executionSpec;
- }
-
- public Object getParameter(String parameterName) {
- // Verify that there is a spec attribute
- ExecutionSpecAttribute specAttr = null;
- if (executionSpec.getAttributes().containsKey(parameterName)) {
- specAttr = executionSpec.getAttributes().get(parameterName);
- } else {
- throw new SlcException("Key " + parameterName
- + " is not defined in the specifications of " + toString());
- }
-
- if (parameters.containsKey(parameterName)) {
- Object paramValue = parameters.get(parameterName);
- return paramValue;
- } else {
- if (specAttr.getValue() != null) {
- return specAttr.getValue();
- }
- }
- throw new SlcException("Key " + parameterName
- + " is not set as parameter in " + toString());
- }
-
- public Boolean isSetAsParameter(String key) {
- return parameters.containsKey(key)
- || (executionSpec.getAttributes().containsKey(key) && executionSpec
- .getAttributes().get(key).getValue() != null);
- }
-
- @Override
- public String toString() {
- return new StringBuffer("Execution flow ").append(name).toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- return ((ExecutionFlow) obj).getName().equals(name);
+ setName(name);
}
-
- @Override
- public int hashCode() {
- return name.hashCode();
- }
-
- public String getPath() {
- return path;
- }
-
- public void setPath(String path) {
- this.path = path;
- }
-
- public Boolean getFailOnError() {
- return failOnError;
- }
-
- public void setFailOnError(Boolean failOnError) {
- this.failOnError = failOnError;
- }
-
- public void setExecutionContext(ExecutionContext executionContext) {
- this.executionContext = executionContext;
- }
-
}
import org.argeo.slc.execution.ExecutionModuleDescriptor;
import org.argeo.slc.execution.ExecutionSpec;
import org.argeo.slc.execution.ExecutionSpecAttribute;
+import org.argeo.slc.execution.FlowConfigurationException;
import org.argeo.slc.execution.RefSpecAttribute;
import org.argeo.slc.execution.RefValue;
import org.argeo.slc.primitive.PrimitiveSpecAttribute;
*/
package org.argeo.slc.core.execution;
-import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.argeo.slc.execution.ExecutionSpec;
import org.argeo.slc.execution.ExecutionSpecAttribute;
import org.argeo.slc.execution.RefSpecAttribute;
import org.argeo.slc.execution.RefValueChoice;
import org.springframework.context.ConfigurableApplicationContext;
/** Spring based implementation of execution specifications. */
-public class DefaultExecutionSpec implements ExecutionSpec, BeanNameAware,
- ApplicationContextAware, InitializingBean, Serializable {
+@Deprecated
+public class DefaultExecutionSpec extends org.argeo.slc.runtime.DefaultExecutionSpec
+ implements BeanNameAware, ApplicationContextAware, InitializingBean {
private static final long serialVersionUID = 5159882223926926539L;
- private final static Log log = LogFactory
- .getLog(DefaultExecutionSpec.class);
+ private final static Log log = LogFactory.getLog(DefaultExecutionSpec.class);
private transient ApplicationContext applicationContext;
- private String description;
- private Map<String, ExecutionSpecAttribute> attributes = new HashMap<String, ExecutionSpecAttribute>();
-
- private String name = INTERNAL_NAME;
-
- public Map<String, ExecutionSpecAttribute> getAttributes() {
- return attributes;
- }
-
- public void setAttributes(Map<String, ExecutionSpecAttribute> attributes) {
- this.attributes = attributes;
+ public DefaultExecutionSpec() {
+ super();
}
public void setBeanName(String name) {
- this.name = name;
- }
-
- /**
- * The Spring bean name (only relevant for specs declared has high-level
- * beans)
- */
- public String getName() {
- return name;
- }
-
- public boolean equals(Object obj) {
- return ((ExecutionSpec) obj).getName().equals(name);
- }
-
- /**
- * The Spring bean description (only relevant for specs declared has
- * high-level beans)
- */
- public String getDescription() {
- return description;
+ setName(name);
}
private ConfigurableListableBeanFactory getBeanFactory() {
- return ((ConfigurableApplicationContext) applicationContext)
- .getBeanFactory();
+ return ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
}
public void setApplicationContext(ApplicationContext applicationContext) {
}
public void afterPropertiesSet() throws Exception {
- if (description == null) {
+ if (getDescription() == null) {
try {
- description = getBeanFactory().getBeanDefinition(name)
- .getDescription();
+ setDescription(getBeanFactory().getBeanDefinition(getName()).getDescription());
} catch (NoSuchBeanDefinitionException e) {
// silent
}
}
- for (String key : attributes.keySet()) {
- ExecutionSpecAttribute attr = attributes.get(key);
+ for (String key : getAttributes().keySet()) {
+ ExecutionSpecAttribute attr = getAttributes().get(key);
if (attr instanceof RefSpecAttribute) {
RefSpecAttribute rsa = (RefSpecAttribute) attr;
if (rsa.getChoices() == null) {
rsa.setChoices(choices);
}
if (log.isTraceEnabled())
- log.debug("Spec attr " + key + " has "
- + rsa.getChoices().size() + " choices");
+ log.debug("Spec attr " + key + " has " + rsa.getChoices().size() + " choices");
}
}
}
/**
* Generates a list of ref value choices based on the bean available in the
- * application ocntext.
+ * application context.
*/
protected List<RefValueChoice> buildRefValueChoices(RefSpecAttribute rsa) {
List<RefValueChoice> choices = new ArrayList<RefValueChoice>();
if (applicationContext == null) {
- log.warn("No application context declared,"
- + " cannot scan ref value choices.");
+ log.warn("No application context declared," + " cannot scan ref value choices.");
return choices;
}
- beanNames: for (String beanName : getBeanFactory().getBeanNamesForType(
- rsa.getTargetClass(), true, false)) {
+ beanNames: for (String beanName : getBeanFactory().getBeanNamesForType(rsa.getTargetClass(), true, false)) {
// Since Spring 3, systemProperties is implicitly defined but has no
// bean definition
+++ /dev/null
-/*
- * Copyright (C) 2007-2012 Argeo GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.argeo.slc.core.execution;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Stack;
-import java.util.UUID;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.execution.ExecutionFlow;
-import org.argeo.slc.execution.ExecutionSpecAttribute;
-import org.argeo.slc.execution.ExecutionStack;
-
-/** Canonical implementation of an execution stack. */
-public class DefaultExecutionStack implements ExecutionStack {
-
- private final static Log log = LogFactory
- .getLog(DefaultExecutionStack.class);
-
- private final Stack<ExecutionFlowRuntime> stack = new Stack<ExecutionFlowRuntime>();
-
- public synchronized void enterFlow(ExecutionFlow executionFlow) {
- ExecutionFlowRuntime runtime = new ExecutionFlowRuntime(executionFlow);
- stack.push(runtime);
-
- Map<String, ExecutionSpecAttribute> specAttrs = executionFlow
- .getExecutionSpec().getAttributes();
- for (String key : specAttrs.keySet()) {
- if (executionFlow.isSetAsParameter(key)) {
- runtime.getLocalVariables().put(key,
- executionFlow.getParameter(key));
- }
- }
- }
-
- public synchronized String getCurrentStackLevelUuid() {
- return stack.peek().getUuid();
- }
-
- public synchronized Integer getStackSize() {
- return stack.size();
- }
-
- /**
- * Looks for a set variable in the stack, starting at the upper flows
- *
- * @return the variable or <code>null</code> if not found
- */
- public synchronized Object findLocalVariable(String key) {
- Object obj = null;
- for (int i = 0; i < stack.size(); i++) {
- if (stack.get(i).getLocalVariables().containsKey(key)) {
- obj = stack.get(i).getLocalVariables().get(key);
- break;
- }
- }
- return obj;
- }
-
- public synchronized void leaveFlow(ExecutionFlow executionFlow) {
- ExecutionFlowRuntime leftEf = stack.pop();
-
- if (!leftEf.getExecutionFlow().getName()
- .equals(executionFlow.getName()))
- throw new SlcException("Asked to leave " + executionFlow
- + " but last is " + leftEf);
-
- leftEf.getScopedObjects().clear();
- leftEf.getLocalVariables().clear();
- }
-
- public synchronized void addScopedObject(String name, Object obj) {
- ExecutionFlowRuntime runtime = stack.peek();
- // TODO: check that the object is not set yet ?
- if (log.isDebugEnabled()) {
- Object existing = findScopedObject(name);
- if (existing != null)
- log.warn("Scoped object " + name + " of type " + obj.getClass()
- + " already registered in " + runtime);
- }
- runtime.getScopedObjects().put(name, obj);
- }
-
- /** @return </code>null<code> if not found */
- public synchronized Object findScopedObject(String name) {
- Object obj = null;
- for (int i = stack.size() - 1; i >= 0; i--) {
- if (stack.get(i).getScopedObjects().containsKey(name)) {
- obj = stack.get(i).getScopedObjects().get(name);
- break;
- }
- }
- return obj;
- }
-
- protected static class ExecutionFlowRuntime {
- private final ExecutionFlow executionFlow;
- private final Map<String, Object> scopedObjects = new HashMap<String, Object>();
- private final Map<String, Object> localVariables = new HashMap<String, Object>();
- private final String uuid = UUID.randomUUID().toString();
-
- public ExecutionFlowRuntime(ExecutionFlow executionFlow) {
- this.executionFlow = executionFlow;
- }
-
- public ExecutionFlow getExecutionFlow() {
- return executionFlow;
- }
-
- public Map<String, Object> getScopedObjects() {
- return scopedObjects;
- }
-
- public String getUuid() {
- return uuid;
- }
-
- public Map<String, Object> getLocalVariables() {
- return localVariables;
- }
-
- @Override
- public String toString() {
- return "Stack Level #" + uuid;
- }
-
- }
-}
+++ /dev/null
-package org.argeo.slc.core.execution;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.argeo.slc.execution.ExecutionProcess;
-import org.argeo.slc.execution.ExecutionStep;
-import org.argeo.slc.execution.RealizedFlow;
-
-/** Canonical implementation of an {@link ExecutionProcess} as a bean. */
-public class DefaultProcess implements ExecutionProcess {
- private String uuid = UUID.randomUUID().toString();
- private String status = ExecutionProcess.NEW;
-
- private List<ExecutionStep> steps = new ArrayList<ExecutionStep>();
- private List<RealizedFlow> realizedFlows = new ArrayList<RealizedFlow>();
-
- public String getUuid() {
- return uuid;
- }
-
- public String getStatus() {
- return status;
- }
-
- public void setStatus(String status) {
- this.status = status;
- }
-
- public void addSteps(List<ExecutionStep> steps) {
- steps.addAll(steps);
- }
-
- public List<RealizedFlow> getRealizedFlows() {
- return realizedFlows;
- }
-
- public List<ExecutionStep> getSteps() {
- return steps;
- }
-
- public void setSteps(List<ExecutionStep> steps) {
- this.steps = steps;
- }
-
- public void setUuid(String uuid) {
- this.uuid = uuid;
- }
-
- public void setRealizedFlows(List<RealizedFlow> realizedFlows) {
- this.realizedFlows = realizedFlows;
- }
-
-}
import org.argeo.slc.SlcException;\r
import org.argeo.slc.execution.ExecutionContext;\r
import org.argeo.slc.execution.ExecutionFlow;\r
+import org.argeo.slc.runtime.InstantiationManager;\r
import org.springframework.beans.BeansException;\r
import org.springframework.beans.MutablePropertyValues;\r
import org.springframework.beans.PropertyValue;\r
import org.argeo.slc.execution.ExecutionFlow;
import org.argeo.slc.execution.ExecutionSpec;
import org.argeo.slc.execution.ExecutionStack;
+import org.argeo.slc.runtime.ExecutionThread;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.config.Scope;
+++ /dev/null
-/*
- * Copyright (C) 2007-2012 Argeo GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.argeo.slc.core.execution;
-
-import java.security.AccessControlContext;
-import java.security.AccessController;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.security.auth.Subject;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.execution.ExecutionFlowDescriptor;
-import org.argeo.slc.execution.ExecutionModulesManager;
-import org.argeo.slc.execution.ExecutionStep;
-import org.argeo.slc.execution.RealizedFlow;
-
-/** Thread of a single execution */
-public class ExecutionThread extends Thread {
- public final static String SYSPROP_EXECUTION_AUTO_UPGRADE = "slc.execution.autoupgrade";
- private final static Log log = LogFactory.getLog(ExecutionThread.class);
-
- private ExecutionModulesManager executionModulesManager;
- private final RealizedFlow realizedFlow;
- private final AccessControlContext accessControlContext;
-
- private List<Runnable> destructionCallbacks = new ArrayList<Runnable>();
-
- public ExecutionThread(ProcessThreadGroup processThreadGroup, ExecutionModulesManager executionModulesManager,
- RealizedFlow realizedFlow) {
- super(processThreadGroup, "Flow " + realizedFlow.getFlowDescriptor().getName());
- this.realizedFlow = realizedFlow;
- this.executionModulesManager = executionModulesManager;
- accessControlContext = AccessController.getContext();
- }
-
- public void run() {
- // authenticate thread
- // Authentication authentication = getProcessThreadGroup()
- // .getAuthentication();
- // if (authentication == null)
- // throw new SlcException("Can only execute authenticated threads");
- // SecurityContextHolder.getContext().setAuthentication(authentication);
-
- // Retrieve execution flow descriptor
- ExecutionFlowDescriptor executionFlowDescriptor = realizedFlow.getFlowDescriptor();
- String flowName = executionFlowDescriptor.getName();
-
- getProcessThreadGroup().dispatchAddStep(
- new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.PHASE_START, "Flow " + flowName));
-
- try {
- Subject subject = Subject.getSubject(accessControlContext);
- try {
- Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
-
- @Override
- public Void run() throws Exception {
- String autoUpgrade = System.getProperty(SYSPROP_EXECUTION_AUTO_UPGRADE);
- if (autoUpgrade != null && autoUpgrade.equals("true"))
- executionModulesManager.upgrade(realizedFlow.getModuleNameVersion());
- executionModulesManager.start(realizedFlow.getModuleNameVersion());
- //
- // START FLOW
- //
- executionModulesManager.execute(realizedFlow);
- // END FLOW
- return null;
- }
-
- });
- } catch (PrivilegedActionException privilegedActionException) {
- throw (Exception) privilegedActionException.getCause();
- }
- } catch (FlowConfigurationException e) {
- String msg = "Configuration problem with flow " + flowName + ":\n" + e.getMessage();
- log.error(msg);
- getProcessThreadGroup().dispatchAddStep(
- new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.ERROR, msg + " " + e.getMessage()));
- } catch (Exception e) {
- // TODO: re-throw exception ?
- String msg = "Execution of flow " + flowName + " failed.";
- log.error(msg, e);
- getProcessThreadGroup().dispatchAddStep(
- new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.ERROR, msg + " " + e.getMessage()));
- } finally {
- getProcessThreadGroup().dispatchAddStep(
- new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.PHASE_END, "Flow " + flowName));
- processDestructionCallbacks();
- }
- }
-
- private synchronized void processDestructionCallbacks() {
- for (int i = destructionCallbacks.size() - 1; i >= 0; i--) {
- try {
- destructionCallbacks.get(i).run();
- } catch (Exception e) {
- log.warn("Could not process destruction callback " + i + " in thread " + getName(), e);
- }
- }
- }
-
- /**
- * Gather object destruction callback to be called in reverse order at the
- * end of the thread
- */
- synchronized void registerDestructionCallback(String name, Runnable callback) {
- destructionCallbacks.add(callback);
- }
-
- protected ProcessThreadGroup getProcessThreadGroup() {
- return (ProcessThreadGroup) getThreadGroup();
- }
-}
\ No newline at end of file
+++ /dev/null
-package org.argeo.slc.core.execution;
-
-import org.argeo.slc.SlcException;
-
-/** The stack trace of such exceptions does not need to be displayed */
-public class FlowConfigurationException extends SlcException {
- private static final long serialVersionUID = 8456260596346797321L;
-
- public FlowConfigurationException(String message) {
- super(message);
- }
-}
+++ /dev/null
-/*\r
- * Copyright (C) 2007-2012 Argeo GmbH\r
- *\r
- * Licensed under the Apache License, Version 2.0 (the "License");\r
- * you may not use this file except in compliance with the License.\r
- * You may obtain a copy of the License at\r
- *\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-package org.argeo.slc.core.execution;\r
-\r
-import java.util.Stack;\r
-\r
-import org.apache.commons.logging.Log;\r
-import org.apache.commons.logging.LogFactory;\r
-import org.argeo.slc.SlcException;\r
-import org.argeo.slc.execution.ExecutionFlow;\r
-import org.argeo.slc.execution.ExecutionSpecAttribute;\r
-import org.argeo.slc.execution.RefSpecAttribute;\r
-import org.argeo.slc.primitive.PrimitiveSpecAttribute;\r
-import org.argeo.slc.primitive.PrimitiveUtils;\r
-\r
-/** Manage parameters that need to be set during the instantiation of a flow */\r
-public class InstantiationManager {\r
-\r
- private final static Log log = LogFactory\r
- .getLog(InstantiationManager.class);\r
-\r
- private ThreadLocal<Stack<ExecutionFlow>> flowStack = new ThreadLocal<Stack<ExecutionFlow>>();\r
-\r
- public Object createRef(String name) {\r
-\r
- if ((flowStack.get() == null) || flowStack.get().empty()) {\r
- throw new SlcException("No flow is currently initializing."\r
- + " Declare ParameterRef as inner beans or prototypes.");\r
- }\r
-\r
- return getInitializingFlowParameter(name);\r
- }\r
-\r
- public void flowInitializationStarted(ExecutionFlow flow, String flowName) {\r
- // set the flow name if it is DefaultExecutionFlow\r
- if (flow instanceof DefaultExecutionFlow) {\r
- ((DefaultExecutionFlow) flow).setBeanName(flowName);\r
- }\r
-\r
- if (log.isTraceEnabled())\r
- log.trace("Start initialization of " + flow.hashCode() + " ("\r
- + flow + " - " + flow.getClass() + ")");\r
-\r
- // log.info("# flowInitializationStarted " + flowName);\r
- // create a stack for this thread if there is none\r
- if (flowStack.get() == null) {\r
- flowStack.set(new Stack<ExecutionFlow>());\r
- }\r
- flowStack.get().push(flow);\r
- }\r
-\r
- public void flowInitializationFinished(ExecutionFlow flow, String flowName) {\r
- if (log.isTraceEnabled())\r
- log.trace("Finish initialization of " + flow.hashCode() + " ("\r
- + flow + " - " + flow.getClass() + ")");\r
-\r
- if (flowStack.get() != null) {\r
- ExecutionFlow registeredFlow = flowStack.get().pop();\r
- if (registeredFlow != null) {\r
- if (!flow.getName().equals(registeredFlow.getName()))\r
- throw new SlcException("Current flow is " + flow);\r
- // log.info("# flowInitializationFinished " + flowName);\r
- // initializingFlow.set(null);\r
- }\r
- } else {\r
- // happens for flows imported as services\r
- log.warn("flowInitializationFinished - Flow Stack is null");\r
- }\r
- }\r
-\r
- protected ExecutionFlow findInitializingFlowWithParameter(String key) {\r
- if ((flowStack.get() == null) || flowStack.get().empty())\r
- throw new SlcException("No initializing flow available.");\r
-\r
- // first look in the outer flow (that may override parameters)\r
- for (int i = 0; i < flowStack.get().size(); i++) {\r
- if (flowStack.get().elementAt(i).isSetAsParameter(key)) {\r
- return flowStack.get().elementAt(i);\r
- }\r
- }\r
- throw new SlcException("Key " + key + " is not set as parameter in "\r
- + flowStack.get().firstElement().toString() + " (stack size="\r
- + flowStack.get().size() + ")");\r
-\r
- }\r
-\r
- public Object getInitializingFlowParameter(String key) {\r
- return findInitializingFlowWithParameter(key).getParameter(key);\r
- }\r
-\r
- public Class<?> getInitializingFlowParameterClass(String key) {\r
- ExecutionSpecAttribute attr = findInitializingFlowWithParameter(key)\r
- .getExecutionSpec().getAttributes().get(key);\r
- if (attr instanceof RefSpecAttribute)\r
- return ((RefSpecAttribute) attr).getTargetClass();\r
- else if (attr instanceof PrimitiveSpecAttribute) {\r
- String type = ((PrimitiveSpecAttribute) attr).getType();\r
- Class<?> clss = PrimitiveUtils.typeAsClass(type);\r
- if (clss == null)\r
- throw new SlcException("Cannot convert type " + type\r
- + " to class.");\r
- return clss;\r
- } else\r
- return null;\r
- }\r
-\r
- public Boolean isInFlowInitialization() {\r
- return (flowStack.get() != null) && !flowStack.get().empty();\r
- }\r
-}\r
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.runtime.InstantiationManager;
import org.springframework.beans.factory.FactoryBean;
public class ParameterRef implements FactoryBean<Object> {
+++ /dev/null
-/*
- * Copyright (C) 2007-2012 Argeo GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.argeo.slc.core.execution;
-
-import java.security.AccessControlContext;
-import java.security.AccessController;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import javax.security.auth.Subject;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.execution.ExecutionModulesManager;
-import org.argeo.slc.execution.ExecutionProcess;
-import org.argeo.slc.execution.ExecutionStep;
-import org.argeo.slc.execution.RealizedFlow;
-
-/**
- * Main thread coordinating an {@link ExecutionProcess}, launching parallel or
- * sequential {@link ExecutionThread}s.
- */
-public class ProcessThread extends Thread {
- private final static Log log = LogFactory.getLog(ProcessThread.class);
-
- private final ExecutionModulesManager executionModulesManager;
- private final ExecutionProcess process;
- private final ProcessThreadGroup processThreadGroup;
-
- private Set<ExecutionThread> executionThreads = Collections.synchronizedSet(new HashSet<ExecutionThread>());
-
- // private Boolean hadAnError = false;
- private Boolean killed = false;
-
- private final AccessControlContext accessControlContext;
-
- public ProcessThread(ThreadGroup processesThreadGroup, ExecutionModulesManager executionModulesManager,
- ExecutionProcess process) {
- super(processesThreadGroup, "SLC Process #" + process.getUuid());
- this.executionModulesManager = executionModulesManager;
- this.process = process;
- processThreadGroup = new ProcessThreadGroup(process);
- accessControlContext = AccessController.getContext();
- }
-
- public final void run() {
- // authenticate thread
- // Authentication authentication = getProcessThreadGroup()
- // .getAuthentication();
- // if (authentication == null)
- // throw new SlcException("Can only execute authenticated threads");
- // SecurityContextHolder.getContext().setAuthentication(authentication);
-
- log.info("\n##\n## SLC Process #" + process.getUuid() + " STARTED\n##\n");
-
- // Start logging
- new LoggingThread().start();
-
- process.setStatus(ExecutionProcess.RUNNING);
- try {
- Subject subject = Subject.getSubject(accessControlContext);
- try {
- Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
-
- @Override
- public Void run() throws Exception {
- process();
- return null;
- }
-
- });
- } catch (PrivilegedActionException privilegedActionException) {
- Throwable cause = privilegedActionException.getCause();
- if (cause instanceof InterruptedException)
- throw (InterruptedException) cause;
- else
- throw new SlcException("Cannot process", cause);
- }
- // process();
- } catch (InterruptedException e) {
- die();
- return;
- } catch (Exception e) {
- String msg = "Process " + getProcess().getUuid() + " failed unexpectedly.";
- log.error(msg, e);
- getProcessThreadGroup()
- .dispatchAddStep(new ExecutionStep("Process", ExecutionStep.ERROR, msg + " " + e.getMessage()));
- }
-
- // waits for all execution threads to complete (in case they were
- // started asynchronously)
- for (ExecutionThread executionThread : executionThreads) {
- if (executionThread.isAlive()) {
- try {
- executionThread.join();
- } catch (InterruptedException e) {
- die();
- return;
- }
- }
- }
-
- computeFinalStatus();
- }
-
- /** Make sure this is called BEFORE all the threads are interrupted. */
- private void computeFinalStatus() {
- // String oldStatus = process.getStatus();
- // TODO: error management at flow level?
- if (killed)
- process.setStatus(ExecutionProcess.KILLED);
- else if (processThreadGroup.hadAnError())
- process.setStatus(ExecutionProcess.ERROR);
- else
- process.setStatus(ExecutionProcess.COMPLETED);
- // executionModulesManager.dispatchUpdateStatus(process, oldStatus,
- // process.getStatus());
- log.info("\n## SLC Process #" + process.getUuid() + " " + process.getStatus() + "\n");
- }
-
- /** Called when being killed */
- private synchronized void die() {
- killed = true;
- computeFinalStatus();
- for (ExecutionThread executionThread : executionThreads) {
- try {
- executionThread.interrupt();
- } catch (Exception e) {
- log.error("Cannot interrupt " + executionThread);
- }
- }
- processThreadGroup.interrupt();
- }
-
- /**
- * Implementation specific execution. To be overridden in order to deal with
- * custom process types. Default expects an {@link SlcExecution}.
- */
- protected void process() throws InterruptedException {
- List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
- flowsToProcess.addAll(process.getRealizedFlows());
- while (flowsToProcess.size() > 0) {
- RealizedFlow realizedFlow = flowsToProcess.remove(0);
- execute(realizedFlow, true);
- }
- }
-
- /** @return the (distinct) thread used for this execution */
- protected final void execute(RealizedFlow realizedFlow, Boolean synchronous) throws InterruptedException {
- if (killed)
- return;
-
- ExecutionThread thread = new ExecutionThread(processThreadGroup, executionModulesManager, realizedFlow);
- executionThreads.add(thread);
- thread.start();
-
- if (synchronous)
- thread.join();
-
- return;
- }
-
- // public void notifyError() {
- // hadAnError = true;
- // }
- //
- // public synchronized void flowCompleted() {
- // // notifyAll();
- // }
-
- public ExecutionProcess getProcess() {
- return process;
- }
-
- public ProcessThreadGroup getProcessThreadGroup() {
- return processThreadGroup;
- }
-
- public ExecutionModulesManager getExecutionModulesManager() {
- return executionModulesManager;
- }
-
- private class LoggingThread extends Thread {
-
- public LoggingThread() {
- super("SLC Process Logger #" + process.getUuid());
- }
-
- public void run() {
- boolean run = true;
- while (run) {
- List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
- processThreadGroup.getSteps().drainTo(newSteps);
- if (newSteps.size() > 0) {
- // System.out.println(steps.size() + " steps");
- process.addSteps(newSteps);
- }
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- break;
- }
-
- if (!ProcessThread.this.isAlive() && processThreadGroup.getSteps().size() == 0)
- run = false;
- }
- }
-
- }
-}
+++ /dev/null
-/*
- * Copyright (C) 2007-2012 Argeo GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.argeo.slc.core.execution;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import org.argeo.slc.execution.ExecutionProcess;
-import org.argeo.slc.execution.ExecutionStep;
-
-/** The thread group attached to a given {@link SlcExecution}. */
-public class ProcessThreadGroup extends ThreadGroup {
-// private final Authentication authentication;
- private final static Integer STEPS_BUFFER_CAPACITY = 5000;
-
- private BlockingQueue<ExecutionStep> steps = new ArrayBlockingQueue<ExecutionStep>(
- STEPS_BUFFER_CAPACITY);
-
- private Boolean hadAnError = false;
-
- public ProcessThreadGroup(ExecutionProcess executionProcess) {
- super("SLC Process #" + executionProcess.getUuid() + " thread group");
-// this.authentication = SecurityContextHolder.getContext()
-// .getAuthentication();
- }
-
-// public Authentication getAuthentication() {
-// return authentication;
-// }
-
- public void dispatchAddStep(ExecutionStep step) {
- // ExecutionProcess slcProcess = processThread.getProcess();
- // List<ExecutionStep> steps = new ArrayList<ExecutionStep>();
- // steps.add(step);
- // TODO clarify why we don't dispatch steps, must be a reason
- // dispatchAddSteps(steps);
- // slcProcess.addSteps(steps);
- if (step.getType().equals(ExecutionStep.ERROR))
- hadAnError = true;
- this.steps.add(step);
- }
-
- // public void dispatchAddSteps(List<ExecutionStep> steps) {
- // ExecutionProcess slcProcess = processThread.getProcess();
- // executionModulesManager.dispatchAddSteps(slcProcess, steps);
- // }
-
- public BlockingQueue<ExecutionStep> getSteps() {
- return steps;
- }
-
- public Boolean hadAnError() {
- return hadAnError;
- }
-}
</bean>
- <bean id="executionStack" class="org.argeo.slc.core.execution.DefaultExecutionStack"
+ <bean id="executionStack" class="org.argeo.slc.runtime.DefaultExecutionStack"
scope="execution">
<aop:scoped-proxy proxy-target-class="false" />
</bean>
- <bean id="instantiationManager" class="org.argeo.slc.core.execution.InstantiationManager" />
+ <bean id="instantiationManager" class="org.argeo.slc.runtime.InstantiationManager" />
<bean class="org.argeo.slc.core.execution.ExecutionParameterPostProcessor">
<property name="executionContext" ref="executionContext" />
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
- <bean id="specAttr.primitive" class="org.argeo.slc.core.execution.PrimitiveSpecAttribute"
+ <bean id="specAttr.primitive" class="org.argeo.slc.primitive.PrimitiveSpecAttribute"
abstract="true" />
<bean id="specAttr.resource" class="org.argeo.slc.core.execution.ResourceSpecAttribute"
abstract="true" />
- <bean id="specAttr.ref" class="org.argeo.slc.core.execution.RefSpecAttribute"
+ <bean id="specAttr.ref" class="org.argeo.slc.execution.RefSpecAttribute"
abstract="true" />
<bean id="slcTemplate.simpleFlow" class="org.argeo.slc.core.execution.DefaultExecutionFlow"
+++ /dev/null
-/*
- * Copyright (C) 2007-2012 Argeo GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.argeo.slc.jcr.execution;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.UUID;
-
-import javax.jcr.Node;
-import javax.jcr.Repository;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-import javax.jcr.security.Privilege;
-
-import org.argeo.jcr.JcrUtils;
-import org.argeo.slc.SlcConstants;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.SlcNames;
-import org.argeo.slc.SlcTypes;
-import org.argeo.slc.core.execution.DefaultAgent;
-import org.argeo.slc.core.execution.ProcessThread;
-import org.argeo.slc.execution.ExecutionModulesManager;
-import org.argeo.slc.execution.ExecutionProcess;
-import org.argeo.slc.jcr.SlcJcrConstants;
-
-/** SLC VM agent synchronizing with a JCR repository. */
-public class JcrAgent extends DefaultAgent implements SlcNames {
- // final static String ROLE_REMOTE = "ROLE_REMOTE";
- final static String NODE_REPO_URI = "argeo.node.repo.uri";
-
- private Repository repository;
-
- private String agentNodeName = "default";
-
- /*
- * LIFECYCLE
- */
- protected String initAgentUuid() {
- Session session = null;
- try {
- session = repository.login();
-
- String agentFactoryPath = getAgentFactoryPath();
- Node vmAgentFactoryNode = JcrUtils.mkdirsSafe(session, agentFactoryPath, SlcTypes.SLC_AGENT_FACTORY);
- JcrUtils.addPrivilege(session, SlcJcrConstants.SLC_BASE_PATH, SlcConstants.ROLE_SLC, Privilege.JCR_ALL);
- if (!vmAgentFactoryNode.hasNode(agentNodeName)) {
- String uuid = UUID.randomUUID().toString();
- Node agentNode = vmAgentFactoryNode.addNode(agentNodeName, SlcTypes.SLC_AGENT);
- agentNode.setProperty(SLC_UUID, uuid);
- }
- session.save();
- return vmAgentFactoryNode.getNode(agentNodeName).getProperty(SLC_UUID).getString();
- } catch (RepositoryException e) {
- JcrUtils.discardQuietly(session);
- throw new SlcException("Cannot find JCR agent UUID", e);
- } finally {
- JcrUtils.logoutQuietly(session);
- }
- }
-
- @Override
- public void destroy() {
- super.destroy();
- }
-
- /*
- * SLC AGENT
- */
- @Override
- protected ProcessThread createProcessThread(ThreadGroup processesThreadGroup,
- ExecutionModulesManager modulesManager, ExecutionProcess process) {
- if (process instanceof JcrExecutionProcess)
- return new JcrProcessThread(processesThreadGroup, modulesManager, (JcrExecutionProcess) process);
- else
- return super.createProcessThread(processesThreadGroup, modulesManager, process);
- }
-
- /*
- * UTILITIES
- */
- public String getNodePath() {
- return getAgentFactoryPath() + '/' + getAgentNodeName();
- }
-
- public String getAgentFactoryPath() {
- try {
- Boolean isRemote = System.getProperty(NODE_REPO_URI) != null;
- String agentFactoryPath;
- if (isRemote) {
- InetAddress localhost = InetAddress.getLocalHost();
- agentFactoryPath = SlcJcrConstants.AGENTS_BASE_PATH + "/" + localhost.getCanonicalHostName();
-
- if (agentFactoryPath.equals(SlcJcrConstants.VM_AGENT_FACTORY_PATH))
- throw new SlcException("Unsupported hostname " + localhost.getCanonicalHostName());
- } else {// local
- agentFactoryPath = SlcJcrConstants.VM_AGENT_FACTORY_PATH;
- }
- return agentFactoryPath;
- } catch (UnknownHostException e) {
- throw new SlcException("Cannot find agent factory base path", e);
- }
- }
-
- /*
- * BEAN
- */
- public String getAgentNodeName() {
- return agentNodeName;
- }
-
- public void setRepository(Repository repository) {
- this.repository = repository;
- }
-
- public void setAgentNodeName(String agentNodeName) {
- this.agentNodeName = agentNodeName;
- }
-}
+++ /dev/null
-/*
- * Copyright (C) 2007-2012 Argeo GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.argeo.slc.jcr.execution;
-
-import javax.jcr.Session;
-
-import org.argeo.slc.attachment.Attachment;
-import org.argeo.slc.core.attachment.AttachmentUploader;
-import org.springframework.core.io.Resource;
-
-/** JCR based attachment uploader */
-public class JcrAttachmentUploader implements AttachmentUploader {
- private Session session;
-
- public void upload(Attachment attachment, Resource resource) {
- session.toString();
- // not yet implemented, need to review the interface
- }
-
- public void setSession(Session session) {
- this.session = session;
- }
-
-}
+++ /dev/null
-/*
- * Copyright (C) 2007-2012 Argeo GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.argeo.slc.jcr.execution;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.jcr.Node;
-import javax.jcr.NodeIterator;
-import javax.jcr.Property;
-import javax.jcr.Repository;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-import javax.jcr.nodetype.NodeType;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.argeo.jcr.JcrUtils;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.SlcNames;
-import org.argeo.slc.SlcTypes;
-import org.argeo.slc.deploy.ModuleDescriptor;
-import org.argeo.slc.execution.ExecutionFlowDescriptor;
-import org.argeo.slc.execution.ExecutionModuleDescriptor;
-import org.argeo.slc.execution.ExecutionModulesListener;
-import org.argeo.slc.execution.ExecutionModulesManager;
-import org.argeo.slc.execution.ExecutionSpec;
-import org.argeo.slc.execution.ExecutionSpecAttribute;
-import org.argeo.slc.execution.RefSpecAttribute;
-import org.argeo.slc.execution.RefValueChoice;
-import org.argeo.slc.jcr.SlcJcrUtils;
-import org.argeo.slc.primitive.PrimitiveSpecAttribute;
-import org.argeo.slc.primitive.PrimitiveValue;
-
-/**
- * Synchronizes the local execution runtime with a JCR repository. For the time
- * being the state is completely reset from one start to another.
- */
-public class JcrExecutionModulesListener implements ExecutionModulesListener,
- SlcNames {
- private final static String SLC_EXECUTION_MODULES_PROPERTY = "slc.executionModules";
-
- private final static Log log = LogFactory
- .getLog(JcrExecutionModulesListener.class);
- private JcrAgent agent;
-
- private ExecutionModulesManager modulesManager;
-
- private Repository repository;
- /**
- * We don't use a thread bound session because many different threads will
- * call this critical component and we don't want to login each time. We
- * therefore rather protect access to this session via synchronized.
- */
- private Session session;
-
- /*
- * LIFECYCLE
- */
- public void init() {
- try {
- session = repository.login();
- clearAgent();
- if (modulesManager != null) {
- Node agentNode = session.getNode(agent.getNodePath());
-
- List<ModuleDescriptor> moduleDescriptors = modulesManager
- .listModules();
-
- // scan SLC-ExecutionModule metadata
- for (ModuleDescriptor md : moduleDescriptors) {
- if (md.getMetadata().containsKey(
- ExecutionModuleDescriptor.SLC_EXECUTION_MODULE)) {
- String moduleNodeName = SlcJcrUtils
- .getModuleNodeName(md);
- Node moduleNode = agentNode.hasNode(moduleNodeName) ? agentNode
- .getNode(moduleNodeName) : agentNode
- .addNode(moduleNodeName);
- moduleNode.addMixin(SlcTypes.SLC_EXECUTION_MODULE);
- moduleNode.setProperty(SLC_NAME, md.getName());
- moduleNode.setProperty(SLC_VERSION, md.getVersion());
- moduleNode.setProperty(Property.JCR_TITLE,
- md.getTitle());
- moduleNode.setProperty(Property.JCR_DESCRIPTION,
- md.getDescription());
- moduleNode.setProperty(SLC_STARTED, md.getStarted());
- }
- }
-
- // scan execution modules property
- String executionModules = System
- .getProperty(SLC_EXECUTION_MODULES_PROPERTY);
- if (executionModules != null) {
- for (String executionModule : executionModules.split(",")) {
- allModules: for (ModuleDescriptor md : moduleDescriptors) {
- String moduleNodeName = SlcJcrUtils
- .getModuleNodeName(md);
- if (md.getName().equals(executionModule)) {
- Node moduleNode = agentNode
- .hasNode(moduleNodeName) ? agentNode
- .getNode(moduleNodeName) : agentNode
- .addNode(moduleNodeName);
- moduleNode
- .addMixin(SlcTypes.SLC_EXECUTION_MODULE);
- moduleNode.setProperty(SLC_NAME, md.getName());
- moduleNode.setProperty(SLC_VERSION,
- md.getVersion());
- moduleNode.setProperty(Property.JCR_TITLE,
- md.getTitle());
- moduleNode.setProperty(
- Property.JCR_DESCRIPTION,
- md.getDescription());
- moduleNode.setProperty(SLC_STARTED,
- md.getStarted());
- break allModules;
- }
- }
- }
-
- // save if needed
- if (session.hasPendingChanges())
- session.save();
- }
- }
- } catch (RepositoryException e) {
- JcrUtils.discardQuietly(session);
- JcrUtils.logoutQuietly(session);
- throw new SlcException("Cannot initialize modules", e);
- }
- }
-
- public void destroy() {
- clearAgent();
- JcrUtils.logoutQuietly(session);
- }
-
- protected synchronized void clearAgent() {
- try {
- Node agentNode = session.getNode(agent.getNodePath());
- for (NodeIterator nit = agentNode.getNodes(); nit.hasNext();)
- nit.nextNode().remove();
- session.save();
- } catch (RepositoryException e) {
- JcrUtils.discardQuietly(session);
- throw new SlcException("Cannot clear agent " + agent, e);
- }
- }
-
- /*
- * EXECUTION MODULES LISTENER
- */
-
- public synchronized void executionModuleAdded(
- ModuleDescriptor moduleDescriptor) {
- syncExecutionModule(moduleDescriptor);
- }
-
- protected void syncExecutionModule(ModuleDescriptor moduleDescriptor) {
- try {
- Node agentNode = session.getNode(agent.getNodePath());
- String moduleNodeName = SlcJcrUtils
- .getModuleNodeName(moduleDescriptor);
- Node moduleNode = agentNode.hasNode(moduleNodeName) ? agentNode
- .getNode(moduleNodeName) : agentNode
- .addNode(moduleNodeName);
- moduleNode.addMixin(SlcTypes.SLC_EXECUTION_MODULE);
- moduleNode.setProperty(SLC_NAME, moduleDescriptor.getName());
- moduleNode.setProperty(SLC_VERSION, moduleDescriptor.getVersion());
- moduleNode.setProperty(Property.JCR_TITLE,
- moduleDescriptor.getTitle());
- moduleNode.setProperty(Property.JCR_DESCRIPTION,
- moduleDescriptor.getDescription());
- moduleNode.setProperty(SLC_STARTED, moduleDescriptor.getStarted());
- session.save();
- } catch (RepositoryException e) {
- JcrUtils.discardQuietly(session);
- throw new SlcException("Cannot sync module " + moduleDescriptor, e);
- }
- }
-
- public synchronized void executionModuleRemoved(
- ModuleDescriptor moduleDescriptor) {
- try {
- String moduleName = SlcJcrUtils.getModuleNodeName(moduleDescriptor);
- Node agentNode = session.getNode(agent.getNodePath());
- if (agentNode.hasNode(moduleName)) {
- Node moduleNode = agentNode.getNode(moduleName);
- for (NodeIterator nit = moduleNode.getNodes(); nit.hasNext();) {
- nit.nextNode().remove();
- }
- moduleNode.setProperty(SLC_STARTED, false);
- }
- session.save();
- } catch (RepositoryException e) {
- JcrUtils.discardQuietly(session);
- throw new SlcException("Cannot remove module " + moduleDescriptor,
- e);
- }
- }
-
- public synchronized void executionFlowAdded(ModuleDescriptor module,
- ExecutionFlowDescriptor efd) {
- try {
- Node agentNode = session.getNode(agent.getNodePath());
- Node moduleNode = agentNode.getNode(SlcJcrUtils
- .getModuleNodeName(module));
- String relativePath = getExecutionFlowRelativePath(efd);
- @SuppressWarnings("unused")
- Node flowNode = null;
- if (!moduleNode.hasNode(relativePath)) {
- flowNode = createExecutionFlowNode(moduleNode, relativePath,
- efd);
- session.save();
- } else {
- flowNode = moduleNode.getNode(relativePath);
- }
-
- if (log.isTraceEnabled())
- log.trace("Flow " + efd + " added to JCR");
- } catch (RepositoryException e) {
- JcrUtils.discardQuietly(session);
- throw new SlcException("Cannot add flow " + efd + " from module "
- + module, e);
- }
-
- }
-
- protected Node createExecutionFlowNode(Node moduleNode,
- String relativePath, ExecutionFlowDescriptor efd)
- throws RepositoryException {
- Node flowNode = null;
- List<String> pathTokens = Arrays.asList(relativePath.split("/"));
-
- Iterator<String> names = pathTokens.iterator();
- // create intermediary paths
- Node currNode = moduleNode;
- while (names.hasNext()) {
- String name = names.next();
- if (currNode.hasNode(name))
- currNode = currNode.getNode(name);
- else {
- if (names.hasNext())
- currNode = currNode.addNode(name);
- else
- flowNode = currNode.addNode(name,
- SlcTypes.SLC_EXECUTION_FLOW);
- }
- }
-
- // name, description
- flowNode.setProperty(SLC_NAME, efd.getName());
- String endName = pathTokens.get(pathTokens.size() - 1);
- flowNode.setProperty(Property.JCR_TITLE, endName);
- if (efd.getDescription() != null
- && !efd.getDescription().trim().equals("")) {
- flowNode.setProperty(Property.JCR_DESCRIPTION, efd.getDescription());
- } else {
- flowNode.setProperty(Property.JCR_DESCRIPTION, endName);
- }
-
- // execution spec
- ExecutionSpec executionSpec = efd.getExecutionSpec();
- String esName = executionSpec.getName();
- if (esName == null || esName.equals(ExecutionSpec.INTERNAL_NAME)
- || esName.contains("#")/* automatically generated bean name */) {
- // internal spec node
- mapExecutionSpec(flowNode, executionSpec);
- } else {
- // reference spec node
- Node executionSpecsNode = moduleNode.hasNode(SLC_EXECUTION_SPECS) ? moduleNode
- .getNode(SLC_EXECUTION_SPECS) : moduleNode
- .addNode(SLC_EXECUTION_SPECS);
- Node executionSpecNode = executionSpecsNode.addNode(esName,
- SlcTypes.SLC_EXECUTION_SPEC);
- executionSpecNode.setProperty(SLC_NAME, esName);
- executionSpecNode.setProperty(Property.JCR_TITLE, esName);
- if (executionSpec.getDescription() != null
- && !executionSpec.getDescription().trim().equals(""))
- executionSpecNode.setProperty(Property.JCR_DESCRIPTION,
- executionSpec.getDescription());
- mapExecutionSpec(executionSpecNode, executionSpec);
- flowNode.setProperty(SLC_SPEC, executionSpecNode);
- }
-
- // flow values
- for (String attr : efd.getValues().keySet()) {
- ExecutionSpecAttribute esa = executionSpec.getAttributes()
- .get(attr);
- if (esa instanceof PrimitiveSpecAttribute) {
- PrimitiveSpecAttribute psa = (PrimitiveSpecAttribute) esa;
- // if spec reference there will be no node at this stage
- Node valueNode = JcrUtils.getOrAdd(flowNode, attr);
- valueNode.setProperty(SLC_TYPE, psa.getType());
- SlcJcrUtils.setPrimitiveAsProperty(valueNode, SLC_VALUE,
- (PrimitiveValue) efd.getValues().get(attr));
- }
- }
-
- return flowNode;
- }
-
- /**
- * Base can be either an execution spec node, or an execution flow node (in
- * case the execution spec is internal)
- */
- protected void mapExecutionSpec(Node baseNode, ExecutionSpec executionSpec)
- throws RepositoryException {
- for (String attrName : executionSpec.getAttributes().keySet()) {
- ExecutionSpecAttribute esa = executionSpec.getAttributes().get(
- attrName);
- Node attrNode = baseNode.addNode(attrName);
- // booleans
- attrNode.addMixin(SlcTypes.SLC_EXECUTION_SPEC_ATTRIBUTE);
- attrNode.setProperty(SLC_IS_IMMUTABLE, esa.getIsImmutable());
- attrNode.setProperty(SLC_IS_CONSTANT, esa.getIsConstant());
- attrNode.setProperty(SLC_IS_HIDDEN, esa.getIsHidden());
-
- if (esa instanceof PrimitiveSpecAttribute) {
- attrNode.addMixin(SlcTypes.SLC_PRIMITIVE_SPEC_ATTRIBUTE);
- PrimitiveSpecAttribute psa = (PrimitiveSpecAttribute) esa;
- SlcJcrUtils.setPrimitiveAsProperty(attrNode, SLC_VALUE, psa);
- attrNode.setProperty(SLC_TYPE, psa.getType());
- } else if (esa instanceof RefSpecAttribute) {
- attrNode.addMixin(SlcTypes.SLC_REF_SPEC_ATTRIBUTE);
- RefSpecAttribute rsa = (RefSpecAttribute) esa;
- attrNode.setProperty(SLC_TYPE, rsa.getTargetClassName());
- Object value = rsa.getValue();
- if (rsa.getChoices() != null) {
- Integer index = null;
- int count = 0;
- for (RefValueChoice choice : rsa.getChoices()) {
- String name = choice.getName();
- if (value != null && name.equals(value.toString()))
- index = count;
- Node choiceNode = attrNode.addNode(choice.getName());
- choiceNode.addMixin(NodeType.MIX_TITLE);
- choiceNode.setProperty(Property.JCR_TITLE,
- choice.getName());
- if (choice.getDescription() != null
- && !choice.getDescription().trim().equals(""))
- choiceNode.setProperty(Property.JCR_DESCRIPTION,
- choice.getDescription());
- count++;
- }
-
- if (index != null)
- attrNode.setProperty(SLC_VALUE, index);
- }
- }
- }
- }
-
- public synchronized void executionFlowRemoved(ModuleDescriptor module,
- ExecutionFlowDescriptor executionFlow) {
- try {
- Node agentNode = session.getNode(agent.getNodePath());
- Node moduleNode = agentNode.getNode(SlcJcrUtils
- .getModuleNodeName(module));
- String relativePath = getExecutionFlowRelativePath(executionFlow);
- if (moduleNode.hasNode(relativePath))
- moduleNode.getNode(relativePath).remove();
- agentNode.getSession().save();
- } catch (RepositoryException e) {
- throw new SlcException("Cannot remove flow " + executionFlow
- + " from module " + module, e);
- }
- }
-
- /*
- * UTILITIES
- */
- /** @return the relative path, never starts with '/' */
- @SuppressWarnings("deprecation")
- protected String getExecutionFlowRelativePath(
- ExecutionFlowDescriptor executionFlow) {
- String relativePath = executionFlow.getPath() == null ? executionFlow
- .getName() : executionFlow.getPath() + '/'
- + executionFlow.getName();
- // we assume that it is more than one char long
- if (relativePath.charAt(0) == '/')
- relativePath = relativePath.substring(1);
- // FIXME quick hack to avoid duplicate '/'
- relativePath = relativePath.replaceAll("//", "/");
- return relativePath;
- }
-
- /*
- * BEAN
- */
- public void setAgent(JcrAgent agent) {
- this.agent = agent;
- }
-
- public void setRepository(Repository repository) {
- this.repository = repository;
- }
-
- public void setModulesManager(ExecutionModulesManager modulesManager) {
- this.modulesManager = modulesManager;
- }
-
-}
+++ /dev/null
-/*
- * Copyright (C) 2007-2012 Argeo GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.argeo.slc.jcr.execution;
-
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.GregorianCalendar;
-import java.util.List;
-
-import javax.jcr.Node;
-import javax.jcr.NodeIterator;
-import javax.jcr.Property;
-import javax.jcr.Repository;
-import javax.jcr.RepositoryException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.argeo.jcr.JcrUtils;
-import org.argeo.slc.NameVersion;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.SlcNames;
-import org.argeo.slc.SlcTypes;
-import org.argeo.slc.core.execution.ProcessThread;
-import org.argeo.slc.execution.ExecutionProcess;
-import org.argeo.slc.execution.ExecutionStep;
-import org.argeo.slc.execution.RealizedFlow;
-import org.argeo.slc.jcr.SlcJcrUtils;
-
-/** Execution process implementation based on a JCR node. */
-public class JcrExecutionProcess implements ExecutionProcess, SlcNames {
- private final static Log log = LogFactory.getLog(JcrExecutionProcess.class);
- private final Node node;
-
- private Long nextLogLine = 1l;
-
- public JcrExecutionProcess(Node node) {
- this.node = node;
- }
-
- public synchronized String getUuid() {
- try {
- return node.getProperty(SLC_UUID).getString();
- } catch (RepositoryException e) {
- throw new SlcException("Cannot get uuid for " + node, e);
- }
- }
-
- public synchronized String getStatus() {
- try {
- return node.getProperty(SLC_STATUS).getString();
- } catch (RepositoryException e) {
- log.error("Cannot get status: " + e);
- // we should re-throw exception because this information can
- // probably used for monitoring in case there are already unexpected
- // exceptions
- return UNKOWN;
- }
- }
-
- public synchronized void setStatus(String status) {
- try {
- node.setProperty(SLC_STATUS, status);
- // last modified properties needs to be manually updated
- // see https://issues.apache.org/jira/browse/JCR-2233
- JcrUtils.updateLastModified(node);
- node.getSession().save();
- } catch (RepositoryException e) {
- JcrUtils.discardUnderlyingSessionQuietly(node);
- // we should re-throw exception because this information can
- // probably used for monitoring in case there are already unexpected
- // exceptions
- log.error("Cannot set status " + status + ": " + e);
- }
- }
-
- /**
- * Synchronized in order to make sure that there is no concurrent
- * modification of {@link #nextLogLine}.
- */
- public synchronized void addSteps(List<ExecutionStep> steps) {
- try {
- steps: for (ExecutionStep step : steps) {
- String type;
- if (step.getType().equals(ExecutionStep.TRACE))
- type = SlcTypes.SLC_LOG_TRACE;
- else if (step.getType().equals(ExecutionStep.DEBUG))
- type = SlcTypes.SLC_LOG_DEBUG;
- else if (step.getType().equals(ExecutionStep.INFO))
- type = SlcTypes.SLC_LOG_INFO;
- else if (step.getType().equals(ExecutionStep.WARNING))
- type = SlcTypes.SLC_LOG_WARNING;
- else if (step.getType().equals(ExecutionStep.ERROR))
- type = SlcTypes.SLC_LOG_ERROR;
- else
- // skip
- continue steps;
-
- String relPath = SLC_LOG + '/'
- + step.getThread().replace('/', '_') + '/'
- + step.getLocation().replace('.', '/');
- String path = node.getPath() + '/' + relPath;
- // clean special character
- // TODO factorize in JcrUtils
- path = path.replace('@', '_');
-
- Node location = JcrUtils.mkdirs(node.getSession(), path);
- Node logEntry = location.addNode(Long.toString(nextLogLine),
- type);
- logEntry.setProperty(SLC_MESSAGE, step.getLog());
- Calendar calendar = new GregorianCalendar();
- calendar.setTime(step.getTimestamp());
- logEntry.setProperty(SLC_TIMESTAMP, calendar);
-
- // System.out.println("Logged " + logEntry.getPath());
-
- nextLogLine++;
- }
-
- // last modified properties needs to be manually updated
- // see https://issues.apache.org/jira/browse/JCR-2233
- JcrUtils.updateLastModified(node);
-
- node.getSession().save();
- } catch (Exception e) {
- JcrUtils.discardUnderlyingSessionQuietly(node);
- e.printStackTrace();
- }
- }
-
- // public Node getNode() {
- // return node;
- // }
-
- public List<RealizedFlow> getRealizedFlows() {
- try {
- List<RealizedFlow> realizedFlows = new ArrayList<RealizedFlow>();
- Node rootRealizedFlowNode = node.getNode(SLC_FLOW);
- // we just manage one level for the time being
- NodeIterator nit = rootRealizedFlowNode.getNodes(SLC_FLOW);
- while (nit.hasNext()) {
- Node realizedFlowNode = nit.nextNode();
-
- if (realizedFlowNode.hasNode(SLC_ADDRESS)) {
- String flowPath = realizedFlowNode.getNode(SLC_ADDRESS)
- .getProperty(Property.JCR_PATH).getString();
- NameVersion moduleNameVersion = SlcJcrUtils
- .moduleNameVersion(flowPath);
- ((ProcessThread) Thread.currentThread())
- .getExecutionModulesManager().start(
- moduleNameVersion);
- }
-
- RealizedFlow realizedFlow = new JcrRealizedFlow(
- realizedFlowNode);
- if (realizedFlow != null)
- realizedFlows.add(realizedFlow);
- }
- return realizedFlows;
- } catch (RepositoryException e) {
- throw new SlcException("Cannot get realized flows", e);
- }
- }
-
- public String getNodePath() {
- try {
- return node.getPath();
- } catch (RepositoryException e) {
- throw new SlcException("Cannot get process node path for " + node,
- e);
- }
- }
-
- public Repository getRepository() {
- try {
- return node.getSession().getRepository();
- } catch (RepositoryException e) {
- throw new SlcException("Cannot get process JCR repository for "
- + node, e);
- }
- }
-}
+++ /dev/null
-/*
-
- * Copyright (C) 2007-2012 Argeo GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.argeo.slc.jcr.execution;
-
-import java.util.List;
-
-import javax.jcr.Node;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-
-import org.argeo.jcr.JcrUtils;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.SlcNames;
-import org.argeo.slc.core.execution.ProcessThread;
-import org.argeo.slc.execution.ExecutionModulesManager;
-import org.argeo.slc.execution.ExecutionProcess;
-import org.argeo.slc.execution.RealizedFlow;
-
-/** Where the actual execution takes place */
-public class JcrProcessThread extends ProcessThread implements SlcNames {
-
- public JcrProcessThread(ThreadGroup processesThreadGroup,
- ExecutionModulesManager executionModulesManager,
- JcrExecutionProcess process) {
- super(processesThreadGroup, executionModulesManager, process);
- }
-
- /** Overridden in order to set progress status on realized flow nodes. */
- @Override
- protected void process() throws InterruptedException {
- Session session = null;
- if (getProcess() instanceof JcrExecutionProcess)
- try {
- session = ((JcrExecutionProcess) getProcess()).getRepository()
- .login();
-
- List<RealizedFlow> realizedFlows = getProcess()
- .getRealizedFlows();
- for (RealizedFlow realizedFlow : realizedFlows) {
- Node realizedFlowNode = session
- .getNode(((JcrRealizedFlow) realizedFlow).getPath());
- setFlowStatus(realizedFlowNode, ExecutionProcess.RUNNING);
-
- try {
- //
- // EXECUTE THE FLOW
- //
- execute(realizedFlow, true);
-
- setFlowStatus(realizedFlowNode,
- ExecutionProcess.COMPLETED);
- } catch (RepositoryException e) {
- throw e;
- } catch (InterruptedException e) {
- setFlowStatus(realizedFlowNode, ExecutionProcess.KILLED);
- throw e;
- } catch (RuntimeException e) {
- setFlowStatus(realizedFlowNode, ExecutionProcess.ERROR);
- throw e;
- }
- }
- } catch (RepositoryException e) {
- throw new SlcException("Cannot process "
- + getJcrExecutionProcess().getNodePath(), e);
- } finally {
- JcrUtils.logoutQuietly(session);
- }
- else
- super.process();
- }
-
- protected void setFlowStatus(Node realizedFlowNode, String status)
- throws RepositoryException {
- realizedFlowNode.setProperty(SLC_STATUS, status);
- realizedFlowNode.getSession().save();
- }
-
- protected JcrExecutionProcess getJcrExecutionProcess() {
- return (JcrExecutionProcess) getProcess();
- }
-}
+++ /dev/null
-package org.argeo.slc.jcr.execution;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jcr.Node;
-import javax.jcr.NodeIterator;
-import javax.jcr.Property;
-import javax.jcr.RepositoryException;
-
-import org.argeo.slc.SlcException;
-import org.argeo.slc.SlcNames;
-import org.argeo.slc.SlcTypes;
-import org.argeo.slc.core.execution.DefaultExecutionSpec;
-import org.argeo.slc.execution.ExecutionFlowDescriptor;
-import org.argeo.slc.execution.ExecutionSpecAttribute;
-import org.argeo.slc.execution.RealizedFlow;
-import org.argeo.slc.execution.RefSpecAttribute;
-import org.argeo.slc.jcr.SlcJcrUtils;
-import org.argeo.slc.primitive.PrimitiveSpecAttribute;
-import org.argeo.slc.primitive.PrimitiveUtils;
-
-public class JcrRealizedFlow extends RealizedFlow implements SlcNames {
- private static final long serialVersionUID = -3709453850260712001L;
- private String path;
-
- public JcrRealizedFlow(Node node) {
- try {
- this.path = node.getPath();
- loadFromNode(node);
- } catch (RepositoryException e) {
- throw new SlcException("Cannot initialize from " + node, e);
- }
- }
-
- protected void loadFromNode(Node realizedFlowNode)
- throws RepositoryException {
- if (realizedFlowNode.hasNode(SLC_ADDRESS)) {
- String flowPath = realizedFlowNode.getNode(SLC_ADDRESS)
- .getProperty(Property.JCR_PATH).getString();
- // TODO: convert to local path if remote
- // FIXME start related module
- Node flowNode = realizedFlowNode.getSession().getNode(flowPath);
- String flowName = flowNode.getProperty(SLC_NAME).getString();
- String description = null;
- if (flowNode.hasProperty(Property.JCR_DESCRIPTION))
- description = flowNode.getProperty(Property.JCR_DESCRIPTION)
- .getString();
-
- Node executionModuleNode = flowNode.getSession().getNode(
- SlcJcrUtils.modulePath(flowPath));
- String executionModuleName = executionModuleNode.getProperty(
- SLC_NAME).getString();
- String executionModuleVersion = executionModuleNode.getProperty(
- SLC_VERSION).getString();
-
- RealizedFlow realizedFlow = this;
- realizedFlow.setModuleName(executionModuleName);
- realizedFlow.setModuleVersion(executionModuleVersion);
-
- // retrieve execution spec
- DefaultExecutionSpec executionSpec = new DefaultExecutionSpec();
- Map<String, ExecutionSpecAttribute> attrs = readExecutionSpecAttributes(realizedFlowNode);
- executionSpec.setAttributes(attrs);
-
- // set execution spec name
- if (flowNode.hasProperty(SlcNames.SLC_SPEC)) {
- Node executionSpecNode = flowNode.getProperty(SLC_SPEC)
- .getNode();
- executionSpec.setBeanName(executionSpecNode.getProperty(
- SLC_NAME).getString());
- }
-
- // explicitly retrieve values
- Map<String, Object> values = new HashMap<String, Object>();
- for (String attrName : attrs.keySet()) {
- ExecutionSpecAttribute attr = attrs.get(attrName);
- Object value = attr.getValue();
- values.put(attrName, value);
- }
-
- ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(flowName,
- description, values, executionSpec);
- realizedFlow.setFlowDescriptor(efd);
- } else {
- throw new SlcException("Unsupported realized flow "
- + realizedFlowNode);
- }
- }
-
- protected Map<String, ExecutionSpecAttribute> readExecutionSpecAttributes(
- Node node) {
- try {
- Map<String, ExecutionSpecAttribute> attrs = new HashMap<String, ExecutionSpecAttribute>();
- for (NodeIterator nit = node.getNodes(); nit.hasNext();) {
- Node specAttrNode = nit.nextNode();
- if (specAttrNode
- .isNodeType(SlcTypes.SLC_PRIMITIVE_SPEC_ATTRIBUTE)) {
- String type = specAttrNode.getProperty(SLC_TYPE)
- .getString();
- Object value = null;
- if (specAttrNode.hasProperty(SLC_VALUE)) {
- String valueStr = specAttrNode.getProperty(SLC_VALUE)
- .getString();
- value = PrimitiveUtils.convert(type, valueStr);
- }
- PrimitiveSpecAttribute specAttr = new PrimitiveSpecAttribute(
- type, value);
- attrs.put(specAttrNode.getName(), specAttr);
- } else if (specAttrNode
- .isNodeType(SlcTypes.SLC_REF_SPEC_ATTRIBUTE)) {
- if (!specAttrNode.hasProperty(SLC_VALUE)) {
- continue;
- }
- Integer value = (int) specAttrNode.getProperty(SLC_VALUE)
- .getLong();
- RefSpecAttribute specAttr = new RefSpecAttribute();
- NodeIterator children = specAttrNode.getNodes();
- int index = 0;
- String id = null;
- while (children.hasNext()) {
- Node child = children.nextNode();
- if (index == value)
- id = child.getName();
- index++;
- }
- specAttr.setValue(id);
- attrs.put(specAttrNode.getName(), specAttr);
- }
- // throw new SlcException("Unsupported spec attribute "
- // + specAttrNode);
- }
- return attrs;
- } catch (RepositoryException e) {
- throw new SlcException("Cannot read spec attributes from " + node,
- e);
- }
- }
-
- public String getPath() {
- return path;
- }
-}
import org.argeo.slc.DefaultNameVersion;
import org.argeo.slc.NameVersion;
import org.argeo.slc.SlcException;
-import org.argeo.slc.core.execution.AbstractExecutionModulesManager;
import org.argeo.slc.core.execution.DefaultExecutionFlowDescriptorConverter;
import org.argeo.slc.deploy.Module;
import org.argeo.slc.deploy.ModuleDescriptor;
import org.argeo.slc.execution.ExecutionModuleDescriptor;
import org.argeo.slc.execution.ExecutionModulesListener;
import org.argeo.slc.execution.RealizedFlow;
+import org.argeo.slc.runtime.AbstractExecutionModulesManager;
import org.eclipse.gemini.blueprint.service.importer.OsgiServiceLifecycleListener;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleEvent;
<name>SLC Third Party Support</name>
<dependencies>
<!-- SLC Runtime -->
+ <dependency>
+ <groupId>org.argeo.slc</groupId>
+ <artifactId>org.argeo.slc.runtime</artifactId>
+ <version>2.1.17-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.argeo.slc</groupId>
<artifactId>org.argeo.slc.spring</artifactId>
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
-import org.argeo.slc.core.execution.ExecutionThread;
-import org.argeo.slc.core.execution.ProcessThreadGroup;
import org.argeo.slc.execution.ExecutionStep;
+import org.argeo.slc.runtime.ExecutionThread;
+import org.argeo.slc.runtime.ProcessThreadGroup;
/** Not meant to be used directly in standard log4j config */
public class SlcExecutionAppender extends AppenderSkeleton {
try {
log4jLevel = Level.toLevel(level);
} catch (Exception e) {
- System.err
- .println("Log4j level could not be set for level '"
- + level + "', resetting it to null.");
+ System.err.println("Log4j level could not be set for level '" + level + "', resetting it to null.");
e.printStackTrace();
level = null;
}
- if (log4jLevel != null
- && !event.getLevel().isGreaterOrEqual(log4jLevel)) {
+ if (log4jLevel != null && !event.getLevel().isGreaterOrEqual(log4jLevel)) {
return;
}
}
// Check whether we are within an executing process
Thread currentThread = Thread.currentThread();
if (currentThread.getThreadGroup() instanceof ProcessThreadGroup) {
- if (onlyExecutionThread
- && !(currentThread instanceof ExecutionThread))
+ if (onlyExecutionThread && !(currentThread instanceof ExecutionThread))
return;
final String type;
- if (event.getLevel().equals(Level.ERROR)
- || event.getLevel().equals(Level.FATAL))
+ if (event.getLevel().equals(Level.ERROR) || event.getLevel().equals(Level.FATAL))
type = ExecutionStep.ERROR;
else if (event.getLevel().equals(Level.WARN))
type = ExecutionStep.WARNING;
else
type = ExecutionStep.INFO;
- ExecutionStep step = new ExecutionStep(event.getLoggerName(),
- new Date(event.getTimeStamp()), type, event.getMessage()
- .toString());
+ ExecutionStep step = new ExecutionStep(event.getLoggerName(), new Date(event.getTimeStamp()), type,
+ event.getMessage().toString());
try {
dispatching.set(true);
- BlockingQueue<ExecutionStep> steps = ((ProcessThreadGroup) currentThread
- .getThreadGroup()).getSteps();
+ BlockingQueue<ExecutionStep> steps = ((ProcessThreadGroup) currentThread.getThreadGroup()).getSteps();
if (steps.remainingCapacity() == 0) {
- stdOut("WARNING: execution steps queue is full, skipping step: "
- + step);
+ stdOut("WARNING: execution steps queue is full, skipping step: " + step);
// FIXME understand why it block indefinitely: the queue
// should be emptied by the logging thread
} else {
</properties>
<modules>
<module>org.argeo.slc.api</module>
+ <module>org.argeo.slc.runtime</module>
<module>org.argeo.slc.jcr</module>
<!-- Runtime -->