package org.argeo.slc.core.execution;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.execution.ExecutionFlow;
-import org.argeo.slc.execution.ExecutionFlowDescriptor;
-import org.argeo.slc.execution.ExecutionModuleDescriptor;
+
+import org.argeo.slc.core.execution.internal.ProcessThread;
import org.argeo.slc.execution.ExecutionModulesManager;
-import org.argeo.slc.execution.ExecutionSpec;
-import org.argeo.slc.execution.ExecutionSpecAttribute;
-import org.argeo.slc.process.RealizedFlow;
import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.process.SlcExecutionNotifier;
-import org.argeo.slc.process.SlcExecutionStep;
-import org.springframework.aop.scope.ScopedObject;
-import org.springframework.util.Assert;
public abstract class AbstractExecutionModulesManager implements
ExecutionModulesManager {
- private final static Log log = LogFactory
- .getLog(AbstractExecutionModulesManager.class);
-
private List<SlcExecutionNotifier> slcExecutionNotifiers = new ArrayList<SlcExecutionNotifier>();
private ThreadGroup processesThreadGroup = new ThreadGroup("Processes");
public void process(SlcExecution slcExecution) {
- new ProcessThread(processesThreadGroup, slcExecution).start();
+ new ProcessThread(this, slcExecution).start();
}
-
+/*
protected void dispatchUpdateStatus(SlcExecution slcExecution,
String oldStatus, String newStatus) {
for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
.iterator(); it.hasNext();) {
it.next().addSteps(slcExecution, steps);
}
- }
+ }*/
public void setSlcExecutionNotifiers(
List<SlcExecutionNotifier> slcExecutionNotifiers) {
this.slcExecutionNotifiers = slcExecutionNotifiers;
}
-
- protected static ExecutionModuleDescriptor createDescriptor(
- String moduleName, String moduleVersion,
+/*
+ protected static void addFlowsToDescriptor(ExecutionModuleDescriptor md,
Map<String, ExecutionFlow> executionFlows) {
// TODO: put this in a separate configurable object
- ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
- md.setName(moduleName);
- md.setVersion(moduleVersion);
-
for (String name : executionFlows.keySet()) {
ExecutionFlow executionFlow = executionFlows.get(name);
// Add execution flow
md.getExecutionFlows().add(efd);
}
-
- return md;
}
-
- /** Thread of the SLC Process, starting the sub executions. */
- private class ProcessThread extends Thread {
- private final SlcExecution slcProcess;
- private final ThreadGroup processThreadGroup;
- private final List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
-
- public ProcessThread(ThreadGroup processesThreadGroup,
- SlcExecution slcExecution) {
- super(processesThreadGroup, "SLC Process #"
- + slcExecution.getUuid());
- this.slcProcess = slcExecution;
- processThreadGroup = new ThreadGroup("SLC Process #"
- + slcExecution.getUuid() + " thread group");
- }
-
- public void run() {
- log.info("\n##\n## Process SLC Execution " + slcProcess + "\n##\n");
-
- slcProcess.setStatus(SlcExecution.STATUS_RUNNING);
- dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED,
- SlcExecution.STATUS_RUNNING);
-
- flowsToProcess.addAll(slcProcess.getRealizedFlows());
-
- while (flowsToProcess.size() > 0) {
- RealizedFlow flow = flowsToProcess.remove(0);
- ExecutionThread thread = new ExecutionThread(this, flow);
- thread.start();
-
- synchronized (this) {
- try {
- wait();
- } catch (InterruptedException e) {
- // silent
- }
- }
- }
-
- slcProcess.setStatus(SlcExecution.STATUS_FINISHED);
- dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING,
- SlcExecution.STATUS_FINISHED);
- }
-
- public synchronized void flowCompleted() {
- notifyAll();
- }
-
- public SlcExecution getSlcProcess() {
- return slcProcess;
- }
-
- public ThreadGroup getProcessThreadGroup() {
- return processThreadGroup;
- }
+*/
+ /**
+ * Thread of the SLC Process, starting the sub executions. private class
+ * ProcessThread extends Thread { private final SlcExecution slcProcess;
+ * private final ThreadGroup processThreadGroup; private final
+ * List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
+ *
+ * public ProcessThread(ThreadGroup processesThreadGroup, SlcExecution
+ * slcExecution) { super(processesThreadGroup, "SLC Process #" +
+ * slcExecution.getUuid()); this.slcProcess = slcExecution;
+ * processThreadGroup = new ThreadGroup("SLC Process #" +
+ * slcExecution.getUuid() + " thread group"); }
+ *
+ * public void run() { log.info("\n##\n## Process SLC Execution " +
+ * slcProcess + "\n##\n");
+ *
+ * slcProcess.setStatus(SlcExecution.STATUS_RUNNING);
+ * dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED,
+ * SlcExecution.STATUS_RUNNING);
+ *
+ * flowsToProcess.addAll(slcProcess.getRealizedFlows());
+ *
+ * while (flowsToProcess.size() > 0) { RealizedFlow flow =
+ * flowsToProcess.remove(0); ExecutionThread thread = new
+ * ExecutionThread(this, flow); thread.start();
+ *
+ * synchronized (this) { try { wait(); } catch (InterruptedException e) { //
+ * silent } } }
+ *
+ * slcProcess.setStatus(SlcExecution.STATUS_FINISHED);
+ * dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING,
+ * SlcExecution.STATUS_FINISHED); }
+ *
+ * public synchronized void flowCompleted() { notifyAll(); }
+ *
+ * public SlcExecution getSlcProcess() { return slcProcess; }
+ *
+ * public ThreadGroup getProcessThreadGroup() { return processThreadGroup; }
+ * }
+ */
+
+ /**
+ * Thread of a single execution private class ExecutionThread extends Thread
+ * { private final RealizedFlow realizedFlow; private final ProcessThread
+ * processThread;
+ *
+ * public ExecutionThread(ProcessThread processThread, RealizedFlow
+ * realizedFlow) { super(processThread.getProcessThreadGroup(), "Flow " +
+ * realizedFlow.getFlowDescriptor().getName()); this.realizedFlow =
+ * realizedFlow; this.processThread = processThread; }
+ *
+ * public void run() { ExecutionFlowDescriptor executionFlowDescriptor =
+ * realizedFlow .getFlowDescriptor(); String flowName =
+ * executionFlowDescriptor.getName();
+ *
+ * dispatchAddStep(processThread.getSlcProcess(), new
+ * SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START, "Flow " + flowName));
+ *
+ * try { execute(realizedFlow); } catch (Exception e) { // TODO: re-throw
+ * exception ? String msg = "Execution of flow " + flowName + " failed.";
+ * log.error(msg, e); dispatchAddStep(processThread.getSlcProcess(), new
+ * SlcExecutionStep(msg + " " + e.getMessage())); } finally {
+ * processThread.flowCompleted();
+ * dispatchAddStep(processThread.getSlcProcess(), new
+ * SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END, "Flow " + flowName)); }
+ * } }
+ */
+
+ public List<SlcExecutionNotifier> getSlcExecutionNotifiers() {
+ return slcExecutionNotifiers;
}
- /** Thread of a single execution */
- private class ExecutionThread extends Thread {
- private final RealizedFlow realizedFlow;
- private final ProcessThread processThread;
-
- public ExecutionThread(ProcessThread processThread,
- RealizedFlow realizedFlow) {
- super(processThread.getProcessThreadGroup(), "Flow "
- + realizedFlow.getFlowDescriptor().getName());
- this.realizedFlow = realizedFlow;
- this.processThread = processThread;
- }
-
- public void run() {
- ExecutionFlowDescriptor executionFlowDescriptor = realizedFlow
- .getFlowDescriptor();
- String flowName = executionFlowDescriptor.getName();
-
- dispatchAddStep(processThread.getSlcProcess(),
- new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START,
- "Flow " + flowName));
-
- try {
- execute(realizedFlow);
- } catch (Exception e) {
- // TODO: re-throw exception ?
- String msg = "Execution of flow " + flowName + " failed.";
- log.error(msg, e);
- dispatchAddStep(processThread.getSlcProcess(),
- new SlcExecutionStep(msg + " " + e.getMessage()));
- } finally {
- processThread.flowCompleted();
- dispatchAddStep(processThread.getSlcProcess(),
- new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END,
- "Flow " + flowName));
- }
- }
+ public ThreadGroup getProcessesThreadGroup() {
+ return processesThreadGroup;
}
}
+++ /dev/null
-package org.argeo.slc.core.execution;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.argeo.slc.execution.ExecutionFlowDescriptor;
-import org.argeo.slc.execution.ExecutionFlowDescriptorConverter;
-
-public class DefaultDescriptorConverter implements
- ExecutionFlowDescriptorConverter {
-
- public Map<String, Object> convertValues(
- ExecutionFlowDescriptor executionFlowDescriptor) {
- // convert the values of flow.getFlowDescriptor()
- Map<String, Object> values = executionFlowDescriptor.getValues();
-
- Map<String, Object> convertedValues = new HashMap<String, Object>();
-
- if (values != null) {
- for (String key : values.keySet()) {
- Object value = values.get(key);
- if (value instanceof PrimitiveValue) {
- PrimitiveValue primitiveValue = (PrimitiveValue) value;
-
- // TODO: check that the class of the the
- // primitiveValue.value
- // matches
- // the primitiveValue.type
- convertedValues.put(key, primitiveValue.getValue());
- } else if (value instanceof RefValue) {
- // not yet implemented
-
-// RefValue refValue = (RefValue) value;
-// convertedValues.put(key, refValue.getLabel());
- }
- }
- }
- return convertedValues;
- }
-
-}
--- /dev/null
+package org.argeo.slc.core.execution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.execution.ExecutionFlow;
+import org.argeo.slc.execution.ExecutionFlowDescriptor;
+import org.argeo.slc.execution.ExecutionFlowDescriptorConverter;
+import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.execution.ExecutionSpec;
+import org.argeo.slc.execution.ExecutionSpecAttribute;
+import org.springframework.aop.scope.ScopedObject;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanDefinition;
+import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.util.Assert;
+
+public class DefaultExecutionFlowDescriptorConverter implements
+ ExecutionFlowDescriptorConverter, ApplicationContextAware {
+ private final static Log log = LogFactory
+ .getLog(DefaultExecutionFlowDescriptorConverter.class);
+
+ private ApplicationContext applicationContext;
+
+ public Map<String, Object> convertValues(
+ ExecutionFlowDescriptor executionFlowDescriptor) {
+ // convert the values of flow.getFlowDescriptor()
+ Map<String, Object> values = executionFlowDescriptor.getValues();
+
+ Map<String, Object> convertedValues = new HashMap<String, Object>();
+
+ if (values != null) {
+ for (String key : values.keySet()) {
+ Object value = values.get(key);
+ if (value instanceof PrimitiveValue) {
+ PrimitiveValue primitiveValue = (PrimitiveValue) value;
+
+ // TODO: check that the class of the the
+ // primitiveValue.value
+ // matches
+ // the primitiveValue.type
+ convertedValues.put(key, primitiveValue.getValue());
+ } else if (value instanceof RefValue) {
+ // not yet implemented
+
+ // RefValue refValue = (RefValue) value;
+ // convertedValues.put(key, refValue.getLabel());
+ }
+ }
+ }
+ return convertedValues;
+ }
+
+ public void addFlowsToDescriptor(ExecutionModuleDescriptor md,
+ Map<String, ExecutionFlow> executionFlows) {
+ for (String name : executionFlows.keySet()) {
+ ExecutionFlow executionFlow = executionFlows.get(name);
+
+ Assert.notNull(executionFlow.getName());
+ Assert.state(name.equals(executionFlow.getName()));
+
+ ExecutionSpec executionSpec = executionFlow.getExecutionSpec();
+ Assert.notNull(executionSpec);
+ Assert.notNull(executionSpec.getName());
+
+ Map<String, Object> values = new TreeMap<String, Object>();
+ for (String key : executionSpec.getAttributes().keySet()) {
+ ExecutionSpecAttribute attribute = executionSpec
+ .getAttributes().get(key);
+
+ if (attribute instanceof PrimitiveSpecAttribute) {
+ if (executionFlow.isSetAsParameter(key)) {
+ Object value = executionFlow.getParameter(key);
+ PrimitiveValue primitiveValue = new PrimitiveValue();
+ primitiveValue
+ .setType(((PrimitiveSpecAttribute) attribute)
+ .getType());
+ primitiveValue.setValue(value);
+ values.put(key, primitiveValue);
+ } else {
+ // no need to add a primitive value if it is not set,
+ // all necessary information is in the spec
+ }
+ } else if (attribute instanceof RefSpecAttribute) {
+ values.put(key, buildRefValue((RefSpecAttribute) attribute,
+ executionFlow, key));
+ } else {
+ throw new SlcException("Unkown spec attribute type "
+ + attribute.getClass());
+ }
+
+ }
+
+ ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(name,
+ values, executionSpec);
+ if (executionFlow.getPath() != null)
+ efd.setPath(executionFlow.getPath());
+
+ // Add execution spec if necessary
+ if (!md.getExecutionSpecs().contains(executionSpec))
+ md.getExecutionSpecs().add(executionSpec);
+
+ // Add execution flow
+ md.getExecutionFlows().add(efd);
+ }
+ }
+
+ @SuppressWarnings(value = { "unchecked" })
+ protected RefValue buildRefValue(RefSpecAttribute rsa,
+ ExecutionFlow executionFlow, String key) {
+ RefValue refValue = new RefValue();
+
+ if (executionFlow.isSetAsParameter(key)) {
+ String ref = null;
+ Object value = executionFlow.getParameter(key);
+ if (applicationContext == null) {
+ log
+ .warn("No application context declared, cannot scan ref value.");
+ ref = value.toString();
+ } else {
+
+ // look for a ref to the value
+ Map<String, Object> beans = getBeanFactory().getBeansOfType(
+ rsa.getTargetClass(), false, false);
+ // TODO: also check scoped beans
+ beans: for (String beanName : beans.keySet()) {
+ Object obj = beans.get(beanName);
+ if (value instanceof ScopedObject) {
+ // don't call methods of the target of the scope
+ if (obj instanceof ScopedObject)
+ if (value == obj) {
+ ref = beanName;
+ break beans;
+ }
+ } else {
+ if (obj.equals(value)) {
+ ref = beanName;
+ break beans;
+ }
+ }
+ }
+ }
+ if (ref == null)
+ log.warn("Cannot define reference for ref spec attribute "
+ + key);
+ refValue.setRef(ref);
+ }
+ return refValue;
+ }
+
+ private ConfigurableListableBeanFactory getBeanFactory() {
+ return ((ConfigurableApplicationContext) applicationContext)
+ .getBeanFactory();
+ }
+
+ /** Must be use within the execution application context */
+ public void setApplicationContext(ApplicationContext applicationContext)
+ throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+}
package org.argeo.slc.core.execution;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.argeo.slc.execution.ExecutionSpec;
import org.argeo.slc.execution.ExecutionSpecAttribute;
import org.springframework.beans.factory.BeanNameAware;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.springframework.beans.factory.config.BeanDefinition;
+import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ConfigurableApplicationContext;
-public class DefaultExecutionSpec implements ExecutionSpec, BeanNameAware {
+public class DefaultExecutionSpec implements ExecutionSpec, BeanNameAware,
+ ApplicationContextAware, InitializingBean {
+ private final static Log log = LogFactory
+ .getLog(DefaultExecutionSpec.class);
+ private ApplicationContext applicationContext;
+
+ private String description;
private Map<String, ExecutionSpecAttribute> attributes = new HashMap<String, ExecutionSpecAttribute>();
private String name = getClass().getName() + "#" + UUID.randomUUID();
return ((ExecutionSpec) obj).getName().equals(name);
}
+ public String getDescription() {
+ return description;
+ }
+
+ private ConfigurableListableBeanFactory getBeanFactory() {
+ return ((ConfigurableApplicationContext) applicationContext)
+ .getBeanFactory();
+ }
+
+ public void setApplicationContext(ApplicationContext applicationContext) {
+ this.applicationContext = applicationContext;
+ }
+
+ public void afterPropertiesSet() throws Exception {
+ if (description == null) {
+ try {
+ description = getBeanFactory().getBeanDefinition(name)
+ .getDescription();
+ } catch (NoSuchBeanDefinitionException e) {
+ // silent
+ }
+ }
+
+ for (String key : attributes.keySet()) {
+ ExecutionSpecAttribute attr = attributes.get(key);
+ if (attr instanceof RefSpecAttribute) {
+ RefSpecAttribute rsa = (RefSpecAttribute) attr;
+ if (rsa.getChoices() == null) {
+ rsa.setChoices(buildRefValueChoices(rsa));
+ }
+ }
+ }
+ }
+
+ protected List<RefValueChoice> buildRefValueChoices(RefSpecAttribute rsa) {
+ List<RefValueChoice> choices = new ArrayList<RefValueChoice>();
+ if (applicationContext == null) {
+ log.warn("No application context declared,"
+ + " cannot scan ref value choices.");
+ return choices;
+ }
+
+ for (String beanName : getBeanFactory().getBeanNamesForType(
+ rsa.getTargetClass(), true, false)) {
+ BeanDefinition bd = getBeanFactory().getBeanDefinition(beanName);
+ RefValueChoice choice = new RefValueChoice();
+ choice.setName(beanName);
+ choice.setDescription(bd.getDescription());
+ }
+ return choices;
+ }
+
}
package org.argeo.slc.core.execution;
-public class RefSpecAttribute extends AbstractSpecAttribute {
+import java.util.List;
+
+public class RefSpecAttribute extends AbstractSpecAttribute implements
+ Cloneable {
private Class<?> targetClass;
/** Read only. */
private String targetClassName;
private Object value = null;
+ private List<RefValueChoice> choices = null;
+
public Object getValue() {
return value;
}
return targetClassName;
}
+ /** @return can be null */
+ public List<RefValueChoice> getChoices() {
+ return choices;
+ }
+
+ public void setChoices(List<RefValueChoice> choices) {
+ this.choices = choices;
+ }
+
+ @Override
+ protected Object clone() throws CloneNotSupportedException {
+ RefSpecAttribute rsa = new RefSpecAttribute();
+ rsa.setTargetClass(targetClass);
+ rsa.setChoices(choices);
+ return rsa;
+ }
+
}
package org.argeo.slc.core.execution;
+
public class RefValue extends AbstractExecutionValue {
- private String label;
+ private String ref;
public RefValue() {
}
- public RefValue(String label) {
+ public RefValue(String ref) {
super();
- this.label = label;
+ this.ref = ref;
}
- public String getLabel() {
- return label;
+ public String getRef() {
+ return ref;
}
- public void setLabel(String label) {
- this.label = label;
+ public void setRef(String ref) {
+ this.ref = ref;
}
}
--- /dev/null
+package org.argeo.slc.core.execution;
+
+public class RefValueChoice {
+ private String name;
+ private String description;
+
+ public RefValueChoice() {
+ }
+
+ public RefValueChoice(String name, String description) {
+ this.name = name;
+ this.description = description;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+}
--- /dev/null
+package org.argeo.slc.core.execution.internal;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.execution.ExecutionFlowDescriptor;
+import org.argeo.slc.process.RealizedFlow;
+import org.argeo.slc.process.SlcExecution;
+import org.argeo.slc.process.SlcExecutionNotifier;
+import org.argeo.slc.process.SlcExecutionStep;
+
+/** Thread of a single execution */
+public class ExecutionThread extends Thread {
+ private final static Log log = LogFactory.getLog(ExecutionThread.class);
+
+ private final RealizedFlow realizedFlow;
+ private final ProcessThread processThread;
+
+ public ExecutionThread(ProcessThread processThread,
+ RealizedFlow realizedFlow) {
+ super(processThread.getProcessThreadGroup(), "Flow "
+ + realizedFlow.getFlowDescriptor().getName());
+ this.realizedFlow = realizedFlow;
+ this.processThread = processThread;
+ }
+
+ public void run() {
+ ExecutionFlowDescriptor executionFlowDescriptor = realizedFlow
+ .getFlowDescriptor();
+ String flowName = executionFlowDescriptor.getName();
+
+ dispatchAddStep(processThread.getSlcProcess(), new SlcExecutionStep(
+ SlcExecutionStep.TYPE_PHASE_START, "Flow " + flowName));
+
+ try {
+ processThread.getExecutionModulesManager().execute(realizedFlow);
+ } catch (Exception e) {
+ // TODO: re-throw exception ?
+ String msg = "Execution of flow " + flowName + " failed.";
+ log.error(msg, e);
+ dispatchAddStep(processThread.getSlcProcess(),
+ new SlcExecutionStep(msg + " " + e.getMessage()));
+ } finally {
+ processThread.flowCompleted();
+ dispatchAddStep(processThread.getSlcProcess(),
+ new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END,
+ "Flow " + flowName));
+ }
+ }
+
+ protected void dispatchAddStep(SlcExecution slcExecution,
+ SlcExecutionStep step) {
+ slcExecution.getSteps().add(step);
+ List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();
+ steps.add(step);
+ for (Iterator<SlcExecutionNotifier> it = processThread
+ .getExecutionModulesManager().getSlcExecutionNotifiers()
+ .iterator(); it.hasNext();) {
+ it.next().addSteps(slcExecution, steps);
+ }
+ }
+
+}
--- /dev/null
+package org.argeo.slc.core.execution.internal;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.core.execution.AbstractExecutionModulesManager;
+import org.argeo.slc.process.RealizedFlow;
+import org.argeo.slc.process.SlcExecution;
+import org.argeo.slc.process.SlcExecutionNotifier;
+
+/** Thread of the SLC Process, starting the sub executions. */
+public class ProcessThread extends Thread {
+ private final static Log log = LogFactory.getLog(ProcessThread.class);
+
+ private final AbstractExecutionModulesManager executionModulesManager;
+ private final SlcExecution slcProcess;
+ private final ThreadGroup processThreadGroup;
+ private final List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
+
+ public ProcessThread(
+ AbstractExecutionModulesManager executionModulesManager,
+ SlcExecution slcExecution) {
+ super(executionModulesManager.getProcessesThreadGroup(),
+ "SLC Process #" + slcExecution.getUuid());
+ this.executionModulesManager = executionModulesManager;
+ this.slcProcess = slcExecution;
+ processThreadGroup = new ThreadGroup("SLC Process #"
+ + slcExecution.getUuid() + " thread group");
+ }
+
+ public void run() {
+ log.info("\n##\n## Process SLC Execution " + slcProcess + "\n##\n");
+
+ slcProcess.setStatus(SlcExecution.STATUS_RUNNING);
+ dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED,
+ SlcExecution.STATUS_RUNNING);
+
+ flowsToProcess.addAll(slcProcess.getRealizedFlows());
+
+ while (flowsToProcess.size() > 0) {
+ RealizedFlow flow = flowsToProcess.remove(0);
+ ExecutionThread thread = new ExecutionThread(this, flow);
+ thread.start();
+
+ synchronized (this) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // silent
+ }
+ }
+ }
+
+ slcProcess.setStatus(SlcExecution.STATUS_FINISHED);
+ dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING,
+ SlcExecution.STATUS_FINISHED);
+ }
+
+ protected void dispatchUpdateStatus(SlcExecution slcExecution,
+ String oldStatus, String newStatus) {
+ for (Iterator<SlcExecutionNotifier> it = executionModulesManager
+ .getSlcExecutionNotifiers().iterator(); it.hasNext();) {
+ it.next().updateStatus(slcExecution, oldStatus, newStatus);
+ }
+ }
+
+ public synchronized void flowCompleted() {
+ notifyAll();
+ }
+
+ public SlcExecution getSlcProcess() {
+ return slcProcess;
+ }
+
+ public ThreadGroup getProcessThreadGroup() {
+ return processThreadGroup;
+ }
+
+ public AbstractExecutionModulesManager getExecutionModulesManager() {
+ return executionModulesManager;
+ }
+}
<aop:scoped-proxy />
</bean>
+ <bean id="executionFlowDescriptorConverter"
+ class="org.argeo.slc.core.execution.DefaultExecutionFlowDescriptorConverter"></bean>
+
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
package org.argeo.slc.deploy;
-@SuppressWarnings("unchecked")
public interface DynamicRuntime<M extends Module> extends
ModularDeployedSystem<M> {
public void shutdown();
import java.util.List;
-@SuppressWarnings("unchecked")
public interface ModularDeployedSystem<M extends Module> extends DeployedSystem {
/** List the underlying deployed modules (in real time) */
public List<M> listModules();
--- /dev/null
+package org.argeo.slc.deploy;
+
+import java.io.Serializable;
+
+import org.argeo.slc.build.BasicNameVersion;
+
+public class ModuleDescriptor extends BasicNameVersion implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private String label;
+ private String description;
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void setLabel(String label) {
+ this.label = label;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+}
--- /dev/null
+package org.argeo.slc.deploy;
+
+import java.util.List;
+
+/** Provides access to modules */
+public interface ModulesManager {
+ /** @return a full fledged module descriptor. */
+ public ModuleDescriptor getModuleDescriptor(String moduleName,
+ String version);
+
+ /**
+ * @return a list of minimal module descriptors
+ */
+ public List<ModuleDescriptor> listModules();
+}
public class ExecutionFlowDescriptor {
private String name;
+ private String description;
private String path;
private Map<String, Object> values;
private ExecutionSpec executionSpec;
this.executionSpec = executionSpec;
}
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
}
public interface ExecutionFlowDescriptorConverter {
public Map<String, Object> convertValues(
ExecutionFlowDescriptor executionFlowDescriptor);
+
+ public void addFlowsToDescriptor(ExecutionModuleDescriptor md,
+ Map<String, ExecutionFlow> executionFlows);
}
package org.argeo.slc.execution;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import org.argeo.slc.SlcException;
+import org.argeo.slc.deploy.ModuleDescriptor;
-public class ExecutionModuleDescriptor implements Serializable {
+public class ExecutionModuleDescriptor extends ModuleDescriptor {
private static final long serialVersionUID = 1L;
- private String name;
- private String version;
+
private List<ExecutionSpec> executionSpecs = new ArrayList<ExecutionSpec>();
private List<ExecutionFlowDescriptor> executionFlows = new ArrayList<ExecutionFlowDescriptor>();
public void setExecutionFlows(List<ExecutionFlowDescriptor> executionFlows) {
this.executionFlows = executionFlows;
}
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getVersion() {
- return version;
- }
-
- public void setVersion(String version) {
- this.version = version;
- }
-
}
import java.util.List;
+import org.argeo.slc.deploy.ModulesManager;
import org.argeo.slc.process.RealizedFlow;
import org.argeo.slc.process.SlcExecution;
/** Provides access to the execution modules */
-public interface ExecutionModulesManager {
+public interface ExecutionModulesManager extends ModulesManager {
/** @return a full fledged module descriptor. */
public ExecutionModuleDescriptor getExecutionModuleDescriptor(
String moduleName, String version);
public Map<String, ExecutionSpecAttribute> getAttributes();
public String getName();
+
+ public String getDescription();
}
ns-prefix="slc" />\r
<field name="name" />\r
<field name="version" />\r
+ <field name="label" />\r
+ <field name="description" />\r
<field name="executionFlows" collection="arraylist"\r
type="org.argeo.slc.execution.ExecutionFlowDescriptor">\r
<bind-xml auto-naming="deriveByClass" location="execution-flows" />\r
<field name="targetClassName">\r
<bind-xml name="targetClassName" node="attribute" />\r
</field>\r
+ <field name="choices" collection="arraylist"\r
+ type="org.argeo.slc.core.execution.RefValueChoice">\r
+ <bind-xml auto-naming="deriveByClass" location="choices" />\r
+ </field>\r
</class>\r
\r
<!-- Values -->\r
<class name="org.argeo.slc.core.execution.RefValue" extends="org.argeo.slc.core.execution.AbstractExecutionValue">\r
<map-to ns-uri="http://argeo.org/projects/slc/schemas"\r
ns-prefix="slc" />\r
- <field name="label" />\r
+ <field name="ref">\r
+ <bind-xml name="ref" node="attribute" />\r
+ </field>\r
+ </class>\r
+\r
+ <class name="org.argeo.slc.core.execution.RefValueChoice">\r
+ <map-to ns-uri="http://argeo.org/projects/slc/schemas"\r
+ ns-prefix="slc" />\r
+ <field name="name">\r
+ <bind-xml name="name" node="attribute" />\r
+ </field>\r
+ <field name="description" />\r
</class>\r
</mapping>
\ No newline at end of file
import org.argeo.slc.execution.ExecutionFlowDescriptor;\r
import org.argeo.slc.execution.ExecutionModuleDescriptor;\r
import org.argeo.slc.execution.ExecutionSpec;\r
-import org.argeo.slc.msg.ObjectList;\r
import org.argeo.slc.unit.execution.ExecutionFlowDescriptorTestUtils;\r
\r
public class ExecutionModuleDescriptorCastorTest extends AbstractCastorTestCase {\r
public void testMarshUnmarsh() throws Exception {\r
ExecutionModuleDescriptor moduleDescriptor = new ExecutionModuleDescriptor();\r
- moduleDescriptor.setName("test.moodule");\r
+ moduleDescriptor.setName("test.module");\r
moduleDescriptor.setVersion("1.0.0");\r
+ moduleDescriptor.setLabel("Test Module");\r
+ moduleDescriptor.setDescription("module descriptor");\r
\r
ExecutionFlowDescriptor flowDescriptor = ExecutionFlowDescriptorTestUtils\r
.createSimpleExecutionFlowDescriptor();\r
import org.apache.commons.logging.LogFactory;
import org.argeo.slc.SlcException;
import org.argeo.slc.core.execution.AbstractExecutionModulesManager;
-import org.argeo.slc.core.execution.DefaultDescriptorConverter;
+import org.argeo.slc.core.execution.DefaultExecutionFlowDescriptorConverter;
+import org.argeo.slc.deploy.ModuleDescriptor;
import org.argeo.slc.execution.ExecutionContext;
import org.argeo.slc.execution.ExecutionFlow;
import org.argeo.slc.execution.ExecutionFlowDescriptor;
import org.argeo.slc.execution.ExecutionFlowDescriptorConverter;
import org.argeo.slc.execution.ExecutionModuleDescriptor;
import org.argeo.slc.process.RealizedFlow;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.Constants;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import org.osgi.util.tracker.ServiceTracker;
private BundlesManager bundlesManager;
private ServiceTracker executionContexts;
- private ExecutionFlowDescriptorConverter defaultDescriptorConverter = new DefaultDescriptorConverter();
+ private ExecutionFlowDescriptorConverter defaultDescriptorConverter = new DefaultExecutionFlowDescriptorConverter();
public ExecutionModuleDescriptor getExecutionModuleDescriptor(
String moduleName, String version) {
- return createDescriptor(moduleName, version, listFlows(moduleName,
- version));
+ ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
+ md.setName(moduleName);
+ md.setVersion(version);
+ setMetadataFromBundle(md, null);
+ getExecutionFlowDescriptorConverter(moduleName, version)
+ .addFlowsToDescriptor(md, listFlows(moduleName, version));
+ return md;
}
public List<ExecutionModuleDescriptor> listExecutionModules() {
ServiceReference[] srs = executionContexts.getServiceReferences();
for (ServiceReference sr : srs) {
- String moduleName = sr.getBundle().getSymbolicName();
- String moduleVersion = sr.getBundle().getHeaders().get(
- "Bundle-Version").toString();
ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
- md.setName(moduleName);
- md.setVersion(moduleVersion);
+ setMetadataFromBundle(md, sr.getBundle());
descriptors.add(md);
}
return descriptors;
execute(realizedFlow);
}
+ protected ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter(
+ String moduleName, String moduleVersion) {
+ // Check whether a descriptor converter is published by this module
+ ExecutionFlowDescriptorConverter descriptorConverter = findExecutionFlowDescriptorConverter(
+ moduleName, moduleVersion);
+ if (descriptorConverter == null)
+ return defaultDescriptorConverter;
+ else
+ return descriptorConverter;
+ }
+
public void execute(RealizedFlow realizedFlow) {
if (log.isTraceEnabled())
log.trace("Executing " + realizedFlow);
String moduleName = realizedFlow.getModuleName();
String moduleVersion = realizedFlow.getModuleVersion();
- // Check whether a descriptor converter is published by this module
- ExecutionFlowDescriptorConverter descriptorConverter = findExecutionFlowDescriptorConverter(
- moduleName, moduleVersion);
-
- final Map<? extends String, ? extends Object> variablesToAdd;
- if (descriptorConverter != null)
- variablesToAdd = descriptorConverter.convertValues(realizedFlow
- .getFlowDescriptor());
- else
- variablesToAdd = defaultDescriptorConverter
- .convertValues(realizedFlow.getFlowDescriptor());
-
+ Map<? extends String, ? extends Object> variablesToAdd = getExecutionFlowDescriptorConverter(
+ moduleName, moduleVersion).convertValues(
+ realizedFlow.getFlowDescriptor());
ExecutionContext executionContext = findExecutionContext(moduleName,
moduleVersion);
for (String key : variablesToAdd.keySet())
//
}
+ public ModuleDescriptor getModuleDescriptor(String moduleName,
+ String version) {
+ return getExecutionModuleDescriptor(moduleName, version);
+ }
+
+ public List<ModuleDescriptor> listModules() {
+ Bundle[] bundles = bundlesManager.getBundleContext().getBundles();
+ List<ModuleDescriptor> lst = new ArrayList<ModuleDescriptor>();
+ for (Bundle bundle : bundles) {
+ ModuleDescriptor moduleDescriptor = new ModuleDescriptor();
+ setMetadataFromBundle(moduleDescriptor, bundle);
+ lst.add(moduleDescriptor);
+ }
+ return lst;
+ }
+
+ protected void setMetadataFromBundle(ModuleDescriptor md, Bundle bundle) {
+ Bundle bdl = bundle;
+ if (bdl == null) {
+ if (md.getName() == null || md.getVersion() == null)
+ throw new SlcException("Name and version not available.");
+
+ Bundle[] bundles = bundlesManager.getBundleContext().getBundles();
+ for (Bundle b : bundles) {
+ if (b.getSymbolicName().equals(md.getName())
+ && md.getVersion().equals(
+ getHeaderSafe(b, Constants.BUNDLE_VERSION))) {
+ bdl = b;
+ break;
+ }
+ }
+
+ }
+
+ if (bdl == null)
+ throw new SlcException("Cannot find bundle.");
+
+ md.setName(bdl.getSymbolicName());
+ md.setVersion(getHeaderSafe(bdl, Constants.BUNDLE_VERSION));
+ md.setLabel(getHeaderSafe(bdl, Constants.BUNDLE_NAME));
+ md.setDescription(getHeaderSafe(bdl, Constants.BUNDLE_DESCRIPTION));
+ }
+
+ private String getHeaderSafe(Bundle bundle, Object key) {
+ Object obj = bundle.getHeaders().get(key);
+ if (obj == null)
+ return null;
+ else
+ return obj.toString();
+ }
}
<osgi:service interface="org.argeo.slc.execution.ExecutionContext"
ref="executionContext" />
+ <osgi:service interface="org.argeo.slc.execution.ExecutionFlowDescriptorConverter"
+ ref="executionFlowDescriptorConverter" />
<bean class="org.argeo.slc.osgi.MultipleServiceExporterPostProcessor">
<property name="interfaces">
package org.argeo.slc.unit.execution;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
-import org.argeo.slc.core.deploy.SimpleExecutables;
import org.argeo.slc.core.execution.DefaultExecutionSpec;
import org.argeo.slc.core.execution.PrimitiveSpecAttribute;
import org.argeo.slc.core.execution.PrimitiveValue;
import org.argeo.slc.core.execution.RefSpecAttribute;
import org.argeo.slc.core.execution.RefValue;
+import org.argeo.slc.core.execution.RefValueChoice;
import org.argeo.slc.core.test.BasicTestData;
import org.argeo.slc.execution.ExecutionFlowDescriptor;
import org.argeo.slc.execution.ExecutionSpecAttribute;
public static ExecutionFlowDescriptor createSimpleExecutionFlowDescriptor() {
ExecutionFlowDescriptor flowDescriptor = new ExecutionFlowDescriptor();
flowDescriptor.setName("simpleFlow");
+ flowDescriptor.setDescription("my description");
+
Map<String, Object> values = new HashMap<String, Object>();
values.put("primitiveInteger", new PrimitiveValue(
PrimitiveSpecAttribute.TYPE_INTEGER, 100));
- values.put("ref1", new RefValue("Just a label"));
+
+ RefValue refValue = new RefValue("002");
+ values.put("ref1", refValue);
flowDescriptor.setValues(values);
flowDescriptor.setExecutionSpec(createRelatedSimpleSpec());
RefSpecAttribute ref1 = new RefSpecAttribute();
ref1.setTargetClass(BasicTestData.class);
+ ref1.setChoices(new ArrayList<RefValueChoice>());
+ ref1.getChoices().add(new RefValueChoice("001", "desc"));
+ ref1.getChoices().add(new RefValueChoice("002", null));
+ ref1.getChoices().add(new RefValueChoice("003", null));
attributes.put("ref1", ref1);
spec.setAttributes(attributes);