X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=runtime%2Forg.argeo.slc.core%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fcore%2Fexecution%2FAbstractExecutionModulesManager.java;h=1a07313f002273ff234f39c888760e9e1d685b6b;hb=af9457b0628ba4cc625192762d0c0fe7564b9846;hp=2db1eb31a2ac0632887764435e31f7d596766560;hpb=ee6c3543a0ff9403420ce6a9c647723269f14331;p=gpl%2Fargeo-slc.git diff --git a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java index 2db1eb31a..1a07313f0 100644 --- a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java +++ b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java @@ -1,229 +1,157 @@ +/* + * Copyright (C) 2007-2012 Argeo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.argeo.slc.core.execution; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.argeo.slc.SlcException; +import org.argeo.slc.execution.ExecutionContext; import org.argeo.slc.execution.ExecutionFlow; -import org.argeo.slc.execution.ExecutionFlowDescriptor; -import org.argeo.slc.execution.ExecutionModuleDescriptor; +import org.argeo.slc.execution.ExecutionFlowDescriptorConverter; import org.argeo.slc.execution.ExecutionModulesManager; -import org.argeo.slc.execution.ExecutionSpec; -import org.argeo.slc.execution.ExecutionSpecAttribute; -import org.argeo.slc.process.RealizedFlow; -import org.argeo.slc.process.SlcExecution; -import org.argeo.slc.process.SlcExecutionNotifier; -import org.argeo.slc.process.SlcExecutionStep; -import org.springframework.aop.scope.ScopedObject; -import org.springframework.util.Assert; +import org.argeo.slc.execution.RealizedFlow; +/** Provides the base feature of an execution module manager. */ public abstract class AbstractExecutionModulesManager implements ExecutionModulesManager { private final static Log log = LogFactory .getLog(AbstractExecutionModulesManager.class); - private List slcExecutionNotifiers = new ArrayList(); - private ThreadGroup processesThreadGroup = new ThreadGroup("Processes"); +// private List filteredNotifiers = Collections +// .synchronizedList(new ArrayList()); - public void process(SlcExecution slcExecution) { - new ProcessThread(processesThreadGroup, slcExecution).start(); - } + protected abstract ExecutionFlow findExecutionFlow(String moduleName, + String moduleVersion, String flowName); - protected void dispatchUpdateStatus(SlcExecution slcExecution, - String oldStatus, String newStatus) { - for (Iterator it = slcExecutionNotifiers - .iterator(); it.hasNext();) { - it.next().updateStatus(slcExecution, oldStatus, newStatus); - } - } + protected abstract ExecutionContext findExecutionContext(String moduleName, + String moduleVersion); - protected synchronized void dispatchAddStep(SlcExecution slcExecution, - SlcExecutionStep step) { - slcExecution.getSteps().add(step); - List steps = new ArrayList(); - steps.add(step); - for (Iterator it = slcExecutionNotifiers - .iterator(); it.hasNext();) { - it.next().addSteps(slcExecution, steps); - } - } + protected abstract ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter( + String moduleName, String moduleVersion); - public void setSlcExecutionNotifiers( - List slcExecutionNotifiers) { - this.slcExecutionNotifiers = slcExecutionNotifiers; - } + public void execute(RealizedFlow realizedFlow) { + if (log.isTraceEnabled()) + log.trace("Executing " + realizedFlow); - protected static ExecutionModuleDescriptor createDescriptor( - String moduleName, String moduleVersion, - Map executionFlows) { - // TODO: put this in a separate configurable object - ExecutionModuleDescriptor md = new ExecutionModuleDescriptor(); - md.setName(moduleName); - md.setVersion(moduleVersion); - - for (String name : executionFlows.keySet()) { - ExecutionFlow executionFlow = executionFlows.get(name); - - Assert.notNull(executionFlow.getName()); - Assert.state(name.equals(executionFlow.getName())); - - ExecutionSpec executionSpec = executionFlow.getExecutionSpec(); - Assert.notNull(executionSpec); - Assert.notNull(executionSpec.getName()); - - Map values = new TreeMap(); - for (String key : executionSpec.getAttributes().keySet()) { - ExecutionSpecAttribute attribute = executionSpec - .getAttributes().get(key); - - if (executionFlow.isSetAsParameter(key)) { - Object value = executionFlow.getParameter(key); - if (attribute instanceof PrimitiveSpecAttribute) { - PrimitiveValue primitiveValue = new PrimitiveValue(); - primitiveValue - .setType(((PrimitiveSpecAttribute) attribute) - .getType()); - primitiveValue.setValue(value); - values.put(key, primitiveValue); - } else if (attribute instanceof RefSpecAttribute) { - RefValue refValue = new RefValue(); - if (value instanceof ScopedObject) { - refValue.setLabel("RUNTIME " - + value.getClass().getName()); - } else { - refValue.setLabel("STATIC " - + value.getClass().getName()); - } - values.put(key, refValue); - } else { - throw new SlcException("Unkown spec attribute type " - + attribute.getClass()); - } - } - - } - - ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(name, - values, executionSpec); - if (executionFlow.getPath() != null) - efd.setPath(executionFlow.getPath()); - - // Add execution spec if necessary - if (!md.getExecutionSpecs().contains(executionSpec)) - md.getExecutionSpecs().add(executionSpec); - - // Add execution flow - md.getExecutionFlows().add(efd); - } - - return md; - } + String moduleName = realizedFlow.getModuleName(); + String moduleVersion = realizedFlow.getModuleVersion(); - /** Thread of the SLC Process, starting the sub executions. */ - private class ProcessThread extends Thread { - private final SlcExecution slcProcess; - private final ThreadGroup processThreadGroup; - private final List flowsToProcess = new ArrayList(); - - public ProcessThread(ThreadGroup processesThreadGroup, - SlcExecution slcExecution) { - super(processesThreadGroup, "SLC Process #" - + slcExecution.getUuid()); - this.slcProcess = slcExecution; - processThreadGroup = new ThreadGroup("SLC Process #" - + slcExecution.getUuid() + " thread group"); - } - - public void run() { - log.info("\n##\n## Process SLC Execution " + slcProcess + "\n##\n"); - - // FIXME: hack to let the SlcExecution be registered on server - try { - Thread.sleep(500); - } catch (InterruptedException e1) { - // silent - } - - slcProcess.setStatus(SlcExecution.STATUS_RUNNING); - dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED, - SlcExecution.STATUS_RUNNING); - - flowsToProcess.addAll(slcProcess.getRealizedFlows()); - - while (flowsToProcess.size() > 0) { - RealizedFlow flow = flowsToProcess.remove(0); - ExecutionThread thread = new ExecutionThread(this, flow); - thread.start(); - - synchronized (this) { - try { - wait(); - } catch (InterruptedException e) { - // silent - } - } - } - - slcProcess.setStatus(SlcExecution.STATUS_FINISHED); - dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING, - SlcExecution.STATUS_FINISHED); - } - - public synchronized void flowCompleted() { - notifyAll(); - } - - public SlcExecution getSlcProcess() { - return slcProcess; - } - - public ThreadGroup getProcessThreadGroup() { - return processThreadGroup; - } - } + Map variablesToAdd = getExecutionFlowDescriptorConverter( + moduleName, moduleVersion).convertValues( + realizedFlow.getFlowDescriptor()); + ExecutionContext executionContext = findExecutionContext(moduleName, + moduleVersion); + for (String key : variablesToAdd.keySet()) + executionContext.setVariable(key, variablesToAdd.get(key)); + + ExecutionFlow flow = findExecutionFlow(moduleName, moduleVersion, + realizedFlow.getFlowDescriptor().getName()); - /** Thread of a single execution */ - private class ExecutionThread extends Thread { - private final RealizedFlow realizedFlow; - private final ProcessThread processThread; - - public ExecutionThread(ProcessThread processThread, - RealizedFlow realizedFlow) { - super(processThread.getProcessThreadGroup(), "Flow " - + realizedFlow.getFlowDescriptor().getName()); - this.realizedFlow = realizedFlow; - this.processThread = processThread; - } - - public void run() { - ExecutionFlowDescriptor executionFlowDescriptor = realizedFlow - .getFlowDescriptor(); - String flowName = executionFlowDescriptor.getName(); - - dispatchAddStep(processThread.getSlcProcess(), - new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START, - "Flow " + flowName)); - - try { - execute(realizedFlow); - } catch (Exception e) { - // TODO: re-throw exception ? - String msg = "Execution of flow " + flowName + " failed."; - log.error(msg, e); - dispatchAddStep(processThread.getSlcProcess(), - new SlcExecutionStep(msg + " " + e.getMessage())); - } finally { - processThread.flowCompleted(); - dispatchAddStep(processThread.getSlcProcess(), - new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END, - "Flow " + flowName)); - } - } + // + // Actually runs the flow, IN THIS THREAD + // + flow.run(); + // + // + // } +// public void dispatchUpdateStatus(ExecutionProcess process, +// String oldStatus, String newStatus) { +// // filtered notifiers +// for (Iterator it = filteredNotifiers.iterator(); it +// .hasNext();) { +// FilteredNotifier filteredNotifier = it.next(); +// if (filteredNotifier.receiveFrom(process)) +// filteredNotifier.getNotifier().updateStatus(process, oldStatus, +// newStatus); +// } +// +// } + +// public void dispatchAddSteps(ExecutionProcess process, +// List steps) { +// process.addSteps(steps); +// for (Iterator it = filteredNotifiers.iterator(); it +// .hasNext();) { +// FilteredNotifier filteredNotifier = it.next(); +// if (filteredNotifier.receiveFrom(process)) +// filteredNotifier.getNotifier().addSteps(process, steps); +// } +// } + +// public void registerProcessNotifier(ExecutionProcessNotifier notifier, +// Map properties) { +// filteredNotifiers.add(new FilteredNotifier(notifier, properties)); +// } +// +// public void unregisterProcessNotifier(ExecutionProcessNotifier notifier, +// Map properties) { +// filteredNotifiers.remove(notifier); +// } + +// protected class FilteredNotifier { +// private final ExecutionProcessNotifier notifier; +// private final String processId; +// +// public FilteredNotifier(ExecutionProcessNotifier notifier, +// Map properties) { +// super(); +// this.notifier = notifier; +// if (properties == null) +// properties = new HashMap(); +// if (properties.containsKey(SLC_PROCESS_ID)) +// processId = properties.get(SLC_PROCESS_ID); +// else +// processId = null; +// } +// +// /** +// * Whether event from this process should be received by this listener. +// */ +// public Boolean receiveFrom(ExecutionProcess process) { +// if (processId != null) +// if (process.getUuid().equals(processId)) +// return true; +// else +// return false; +// return true; +// } +// +// @Override +// public int hashCode() { +// return notifier.hashCode(); +// } +// +// @Override +// public boolean equals(Object obj) { +// if (obj instanceof FilteredNotifier) { +// FilteredNotifier fn = (FilteredNotifier) obj; +// return notifier.equals(fn.notifier); +// } else if (obj instanceof ExecutionProcessNotifier) { +// ExecutionProcessNotifier epn = (ExecutionProcessNotifier) obj; +// return notifier.equals(epn); +// } else +// return false; +// } +// +// public ExecutionProcessNotifier getNotifier() { +// return notifier; +// } +// +// } }