X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=runtime%2Forg.argeo.slc.support.simple%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fcore%2Fexecution%2FDefaultModulesManager.java;h=05a0e8daef263a5936a22b2d2ad43da7bfac2699;hb=2531d7501fcb56eed16a7c61b2d842bd65228f53;hp=9bb6aae6442b6247e1a9daf9a9bb67825145e67a;hpb=ae98a7536a842021e433a43427db79a18abf3a6c;p=gpl%2Fargeo-slc.git diff --git a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/execution/DefaultModulesManager.java b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/execution/DefaultModulesManager.java index 9bb6aae64..05a0e8dae 100644 --- a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/execution/DefaultModulesManager.java +++ b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/execution/DefaultModulesManager.java @@ -1,21 +1,19 @@ package org.argeo.slc.core.execution; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Iterator; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.argeo.slc.SlcException; -import org.argeo.slc.execution.ExecutionContext; -import org.argeo.slc.execution.ExecutionFlow; import org.argeo.slc.execution.ExecutionFlowDescriptor; import org.argeo.slc.execution.ExecutionModule; import org.argeo.slc.execution.ExecutionModuleDescriptor; import org.argeo.slc.execution.ExecutionModulesManager; import org.argeo.slc.process.RealizedFlow; import org.argeo.slc.process.SlcExecution; +import org.argeo.slc.process.SlcExecutionNotifier; import org.springframework.util.Assert; public class DefaultModulesManager implements ExecutionModulesManager { @@ -23,23 +21,26 @@ public class DefaultModulesManager implements ExecutionModulesManager { .getLog(DefaultModulesManager.class); private List executionModules = new ArrayList(); - - protected ExecutionModule getExecutionModule(String moduleName, String version) { + private List slcExecutionNotifiers = new ArrayList(); + + protected ExecutionModule getExecutionModule(String moduleName, + String version) { for (ExecutionModule moduleT : executionModules) { if (moduleT.getName().equals(moduleName)) { - if(moduleT.getVersion().equals(version)) { + if (moduleT.getVersion().equals(version)) { return moduleT; } } } return null; } - + public ExecutionModuleDescriptor getExecutionModuleDescriptor( String moduleName, String version) { ExecutionModule module = getExecutionModule(moduleName, version); - - Assert.notNull(module); + + if(module==null) + throw new SlcException("Module "+moduleName+" ("+version+") not found"); return module.getDescriptor(); } @@ -52,67 +53,114 @@ public class DefaultModulesManager implements ExecutionModulesManager { this.executionModules = executionModules; } - protected Map convertValues(ExecutionFlowDescriptor executionFlowDescriptor) { - // convert the values of flow.getFlowDescriptor() - Map values = executionFlowDescriptor.getValues(); - - Map convertedValues = new HashMap(); - - for(String key : values.keySet()) { - Object value = values.get(key); - if(value instanceof PrimitiveValue) { - PrimitiveValue primitiveValue = (PrimitiveValue) value; - - // TODO: check that the class of the the primitiveValue.value matches - // the primitiveValue.type - convertedValues.put(key, primitiveValue.getValue()); - } - else if(value instanceof RefValue) { - RefValue refValue = (RefValue) value; - convertedValues.put(key, refValue.getLabel()); - } - } - return convertedValues; - } - public void process(SlcExecution slcExecution) { - log.info("##\n## Process SLC Execution " + slcExecution+"\n##"); - - for(RealizedFlow flow : slcExecution.getRealizedFlows()) { - ExecutionModule module = getExecutionModule(flow.getModuleName(), - flow.getModuleVersion()); - if(module != null) { - ExecutionThread thread = new ExecutionThread(flow.getFlowDescriptor(), module); - thread.start(); + new ProcessThread(slcExecution).start(); + } + + protected void dispatchUpdateStatus(SlcExecution slcExecution, + String oldStatus, String newStatus) { + for (Iterator it = slcExecutionNotifiers + .iterator(); it.hasNext();) { + it.next().updateStatus(slcExecution, oldStatus, newStatus); + } + } + + public void setSlcExecutionNotifiers( + List slcExecutionNotifiers) { + this.slcExecutionNotifiers = slcExecutionNotifiers; + } + + private class ProcessThread extends Thread { + private final SlcExecution slcExecution; + private final List flowsToProcess = new ArrayList(); + + public ProcessThread(SlcExecution slcExecution) { + this.slcExecution = slcExecution; + } + + public void run() { + log.info("\n##\n## Process SLC Execution " + slcExecution + + "\n##\n"); + + // FIXME: hack to let the SlcExecution be registered on server + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + // silent } - else { - throw new SlcException("ExecutionModule " + flow.getModuleName() + ", version " - + flow.getModuleVersion() + " not found."); + + slcExecution.setStatus(SlcExecution.STATUS_RUNNING); + dispatchUpdateStatus(slcExecution, SlcExecution.STATUS_SCHEDULED, + SlcExecution.STATUS_RUNNING); + + flowsToProcess.addAll(slcExecution.getRealizedFlows()); + + while (flowsToProcess.size() > 0) { + RealizedFlow flow = flowsToProcess.remove(0); + ExecutionModule module = getExecutionModule(flow + .getModuleName(), flow.getModuleVersion()); + if (module != null) { + ExecutionThread thread = new ExecutionThread(this, flow + .getFlowDescriptor(), module); + thread.start(); + } else { + throw new SlcException("ExecutionModule " + + flow.getModuleName() + ", version " + + flow.getModuleVersion() + " not found."); + } + + synchronized (this) { + try { + wait(); + } catch (InterruptedException e) { + // silent + } + } } + + slcExecution.setStatus(SlcExecution.STATUS_RUNNING); + dispatchUpdateStatus(slcExecution, SlcExecution.STATUS_RUNNING, + SlcExecution.STATUS_FINISHED); + /* + * for (RealizedFlow flow : slcExecution.getRealizedFlows()) { + * ExecutionModule module = getExecutionModule(flow.getModuleName(), + * flow.getModuleVersion()); if (module != null) { ExecutionThread + * thread = new ExecutionThread(flow .getFlowDescriptor(), module); + * thread.start(); } else { throw new + * SlcException("ExecutionModule " + flow.getModuleName() + + * ", version " + flow.getModuleVersion() + " not found."); } } + */ + } + + public synchronized void flowCompleted() { + notifyAll(); } } private class ExecutionThread extends Thread { private final ExecutionFlowDescriptor executionFlowDescriptor; private final ExecutionModule executionModule; + private final ProcessThread processThread; - public ExecutionThread(ExecutionFlowDescriptor executionFlowDescriptor, + public ExecutionThread(ProcessThread processThread, + ExecutionFlowDescriptor executionFlowDescriptor, ExecutionModule executionModule) { - super("SLC Execution #" /*+ executionContext.getUuid()*/); + super("SLC Execution #" /* + executionContext.getUuid() */); this.executionFlowDescriptor = executionFlowDescriptor; this.executionModule = executionModule; + this.processThread = processThread; } public void run() { - ExecutionContext executionContext = executionModule.getExecutionContext(); - executionContext.addVariables(convertValues(executionFlowDescriptor)); try { executionModule.execute(executionFlowDescriptor); } catch (Exception e) { - //TODO: re-throw exception ? - log.error("Execution " + executionContext.getUuid() + // TODO: re-throw exception ? + log.error("Execution "/* + executionContext.getUuid() */ + " failed.", e); + } finally { + processThread.flowCompleted(); } } - } + } }