X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=runtime%2Forg.argeo.slc.core%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fcore%2Fexecution%2FAbstractExecutionModulesManager.java;h=e16ad65f79edc2e81bc8dd73433bd9a7eead1736;hb=f3e477b3048c639451df616cfa61564eae11efbb;hp=4e5672638fb1311b40a26e847285101f67fe24b4;hpb=a91feabac0f3f603a73936b4447599402fba71d0;p=gpl%2Fargeo-slc.git diff --git a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java index 4e5672638..e16ad65f7 100644 --- a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java +++ b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java @@ -1,170 +1,108 @@ +/* + * Copyright (C) 2010 Mathieu Baudier + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.argeo.slc.core.execution; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.argeo.slc.execution.ExecutionContext; +import org.argeo.slc.execution.ExecutionFlow; +import org.argeo.slc.execution.ExecutionFlowDescriptorConverter; import org.argeo.slc.execution.ExecutionModulesManager; +import org.argeo.slc.process.RealizedFlow; import org.argeo.slc.process.SlcExecution; import org.argeo.slc.process.SlcExecutionNotifier; +import org.argeo.slc.process.SlcExecutionStep; +/** Provides the base feature of an execution module manager. */ public abstract class AbstractExecutionModulesManager implements ExecutionModulesManager { + private final static Log log = LogFactory + .getLog(AbstractExecutionModulesManager.class); + private List slcExecutionNotifiers = new ArrayList(); + private ThreadGroup processesThreadGroup = new ThreadGroup("Processes"); + protected abstract ExecutionFlow findExecutionFlow(String moduleName, + String moduleVersion, String flowName); + + protected abstract ExecutionContext findExecutionContext(String moduleName, + String moduleVersion); + + protected abstract ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter( + String moduleName, String moduleVersion); + public void process(SlcExecution slcExecution) { new ProcessThread(this, slcExecution).start(); } -/* - protected void dispatchUpdateStatus(SlcExecution slcExecution, + + public void execute(RealizedFlow realizedFlow) { + if (log.isTraceEnabled()) + log.trace("Executing " + realizedFlow); + + String moduleName = realizedFlow.getModuleName(); + String moduleVersion = realizedFlow.getModuleVersion(); + + Map variablesToAdd = getExecutionFlowDescriptorConverter( + moduleName, moduleVersion).convertValues( + realizedFlow.getFlowDescriptor()); + ExecutionContext executionContext = findExecutionContext(moduleName, + moduleVersion); + for (String key : variablesToAdd.keySet()) + executionContext.setVariable(key, variablesToAdd.get(key)); + + ExecutionFlow flow = findExecutionFlow(moduleName, moduleVersion, + realizedFlow.getFlowDescriptor().getName()); + + // + // Actually runs the flow, IN THIS THREAD + // + flow.run(); + // + // + // + } + + public void dispatchUpdateStatus(SlcExecution slcExecution, String oldStatus, String newStatus) { - for (Iterator it = slcExecutionNotifiers + for (Iterator it = getSlcExecutionNotifiers() .iterator(); it.hasNext();) { it.next().updateStatus(slcExecution, oldStatus, newStatus); } } - protected synchronized void dispatchAddStep(SlcExecution slcExecution, - SlcExecutionStep step) { - slcExecution.getSteps().add(step); + public void dispatchAddStep(SlcExecution slcExecution, SlcExecutionStep step) { List steps = new ArrayList(); steps.add(step); - for (Iterator it = slcExecutionNotifiers + for (Iterator it = getSlcExecutionNotifiers() .iterator(); it.hasNext();) { it.next().addSteps(slcExecution, steps); } - }*/ + } public void setSlcExecutionNotifiers( List slcExecutionNotifiers) { this.slcExecutionNotifiers = slcExecutionNotifiers; } -/* - protected static void addFlowsToDescriptor(ExecutionModuleDescriptor md, - Map executionFlows) { - // TODO: put this in a separate configurable object - for (String name : executionFlows.keySet()) { - ExecutionFlow executionFlow = executionFlows.get(name); - - Assert.notNull(executionFlow.getName()); - Assert.state(name.equals(executionFlow.getName())); - - ExecutionSpec executionSpec = executionFlow.getExecutionSpec(); - Assert.notNull(executionSpec); - Assert.notNull(executionSpec.getName()); - - Map values = new TreeMap(); - for (String key : executionSpec.getAttributes().keySet()) { - ExecutionSpecAttribute attribute = executionSpec - .getAttributes().get(key); - - if (executionFlow.isSetAsParameter(key)) { - Object value = executionFlow.getParameter(key); - if (attribute instanceof PrimitiveSpecAttribute) { - PrimitiveValue primitiveValue = new PrimitiveValue(); - primitiveValue - .setType(((PrimitiveSpecAttribute) attribute) - .getType()); - primitiveValue.setValue(value); - values.put(key, primitiveValue); - } else if (attribute instanceof RefSpecAttribute) { - RefValue refValue = new RefValue(); - if (value instanceof ScopedObject) { - refValue.setLabel("RUNTIME " - + value.getClass().getName()); - } else { - refValue.setLabel("STATIC " - + value.getClass().getName()); - } - values.put(key, refValue); - } else { - throw new SlcException("Unkown spec attribute type " - + attribute.getClass()); - } - } - - } - - ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(name, - values, executionSpec); - if (executionFlow.getPath() != null) - efd.setPath(executionFlow.getPath()); - - // Add execution spec if necessary - if (!md.getExecutionSpecs().contains(executionSpec)) - md.getExecutionSpecs().add(executionSpec); - - // Add execution flow - md.getExecutionFlows().add(efd); - } - } -*/ - /** - * Thread of the SLC Process, starting the sub executions. private class - * ProcessThread extends Thread { private final SlcExecution slcProcess; - * private final ThreadGroup processThreadGroup; private final - * List flowsToProcess = new ArrayList(); - * - * public ProcessThread(ThreadGroup processesThreadGroup, SlcExecution - * slcExecution) { super(processesThreadGroup, "SLC Process #" + - * slcExecution.getUuid()); this.slcProcess = slcExecution; - * processThreadGroup = new ThreadGroup("SLC Process #" + - * slcExecution.getUuid() + " thread group"); } - * - * public void run() { log.info("\n##\n## Process SLC Execution " + - * slcProcess + "\n##\n"); - * - * slcProcess.setStatus(SlcExecution.STATUS_RUNNING); - * dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED, - * SlcExecution.STATUS_RUNNING); - * - * flowsToProcess.addAll(slcProcess.getRealizedFlows()); - * - * while (flowsToProcess.size() > 0) { RealizedFlow flow = - * flowsToProcess.remove(0); ExecutionThread thread = new - * ExecutionThread(this, flow); thread.start(); - * - * synchronized (this) { try { wait(); } catch (InterruptedException e) { // - * silent } } } - * - * slcProcess.setStatus(SlcExecution.STATUS_FINISHED); - * dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING, - * SlcExecution.STATUS_FINISHED); } - * - * public synchronized void flowCompleted() { notifyAll(); } - * - * public SlcExecution getSlcProcess() { return slcProcess; } - * - * public ThreadGroup getProcessThreadGroup() { return processThreadGroup; } - * } - */ - - /** - * Thread of a single execution private class ExecutionThread extends Thread - * { private final RealizedFlow realizedFlow; private final ProcessThread - * processThread; - * - * public ExecutionThread(ProcessThread processThread, RealizedFlow - * realizedFlow) { super(processThread.getProcessThreadGroup(), "Flow " + - * realizedFlow.getFlowDescriptor().getName()); this.realizedFlow = - * realizedFlow; this.processThread = processThread; } - * - * public void run() { ExecutionFlowDescriptor executionFlowDescriptor = - * realizedFlow .getFlowDescriptor(); String flowName = - * executionFlowDescriptor.getName(); - * - * dispatchAddStep(processThread.getSlcProcess(), new - * SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START, "Flow " + flowName)); - * - * try { execute(realizedFlow); } catch (Exception e) { // TODO: re-throw - * exception ? String msg = "Execution of flow " + flowName + " failed."; - * log.error(msg, e); dispatchAddStep(processThread.getSlcProcess(), new - * SlcExecutionStep(msg + " " + e.getMessage())); } finally { - * processThread.flowCompleted(); - * dispatchAddStep(processThread.getSlcProcess(), new - * SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END, "Flow " + flowName)); } - * } } - */ public List getSlcExecutionNotifiers() { return slcExecutionNotifiers;