X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=runtime%2Forg.argeo.slc.support.simple%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fcore%2Fexecution%2FDefaultModulesManager.java;h=b843c4ff0b83aff31a2185656003320af0db661d;hb=719f374a748e531ae706115252978e06ad72c308;hp=b55a9c377f0b1040eac1a7097bfa57d342af1915;hpb=4810532845c8a54c04a850c7db3a9fe00e8ebc6a;p=gpl%2Fargeo-slc.git diff --git a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/execution/DefaultModulesManager.java b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/execution/DefaultModulesManager.java index b55a9c377..b843c4ff0 100644 --- a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/execution/DefaultModulesManager.java +++ b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/execution/DefaultModulesManager.java @@ -1,42 +1,196 @@ package org.argeo.slc.core.execution; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.argeo.slc.SlcException; +import org.argeo.slc.execution.ExecutionFlowDescriptor; import org.argeo.slc.execution.ExecutionModule; import org.argeo.slc.execution.ExecutionModuleDescriptor; import org.argeo.slc.execution.ExecutionModulesManager; -import org.springframework.util.Assert; +import org.argeo.slc.process.RealizedFlow; +import org.argeo.slc.process.SlcExecution; +import org.argeo.slc.process.SlcExecutionNotifier; +import org.argeo.slc.process.SlcExecutionStep; public class DefaultModulesManager implements ExecutionModulesManager { + private final static Log log = LogFactory + .getLog(DefaultModulesManager.class); + private List executionModules = new ArrayList(); + private List slcExecutionNotifiers = new ArrayList(); + private ThreadGroup processesThreadGroup = new ThreadGroup("Processes"); - public ExecutionModuleDescriptor getExecutionModuleDescriptor( - String moduleName, String version) { - ExecutionModule module = null; + protected ExecutionModule getExecutionModule(String moduleName, + String version) { for (ExecutionModule moduleT : executionModules) { if (moduleT.getName().equals(moduleName)) { - // TODO: check version - module = moduleT; - break; + if (moduleT.getVersion().equals(version)) { + return moduleT; + } } } + return null; + } + + public ExecutionModuleDescriptor getExecutionModuleDescriptor( + String moduleName, String version) { + ExecutionModule module = getExecutionModule(moduleName, version); - Assert.notNull(module); + if (module == null) + throw new SlcException("Module " + moduleName + " (" + version + + ") not found"); return module.getDescriptor(); } - - public List listExecutionModules() { return executionModules; } - - public void setExecutionModules(List executionModules) { this.executionModules = executionModules; } + public void process(SlcExecution slcExecution) { + new ProcessThread(processesThreadGroup, slcExecution).start(); + } + + protected void dispatchUpdateStatus(SlcExecution slcExecution, + String oldStatus, String newStatus) { + for (Iterator it = slcExecutionNotifiers + .iterator(); it.hasNext();) { + it.next().updateStatus(slcExecution, oldStatus, newStatus); + } + } + + protected synchronized void dispatchAddStep(SlcExecution slcExecution, + SlcExecutionStep step) { + slcExecution.getSteps().add(step); + List steps = new ArrayList(); + steps.add(step); + for (Iterator it = slcExecutionNotifiers + .iterator(); it.hasNext();) { + it.next().addSteps(slcExecution, steps); + } + } + + public void setSlcExecutionNotifiers( + List slcExecutionNotifiers) { + this.slcExecutionNotifiers = slcExecutionNotifiers; + } + + /** Thread of the SLC Process, starting the sub executions. */ + private class ProcessThread extends Thread { + private final SlcExecution slcProcess; + private final ThreadGroup processThreadGroup; + private final List flowsToProcess = new ArrayList(); + + public ProcessThread(ThreadGroup processesThreadGroup, + SlcExecution slcExecution) { + super(processesThreadGroup, "SLC Process #" + + slcExecution.getUuid()); + this.slcProcess = slcExecution; + processThreadGroup = new ThreadGroup("SLC Process #" + + slcExecution.getUuid() + " thread group"); + } + + public void run() { + log.info("\n##\n## Process SLC Execution " + slcProcess + "\n##\n"); + + // FIXME: hack to let the SlcExecution be registered on server + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + // silent + } + + slcProcess.setStatus(SlcExecution.STATUS_RUNNING); + dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED, + SlcExecution.STATUS_RUNNING); + + flowsToProcess.addAll(slcProcess.getRealizedFlows()); + + while (flowsToProcess.size() > 0) { + RealizedFlow flow = flowsToProcess.remove(0); + ExecutionModule module = getExecutionModule(flow + .getModuleName(), flow.getModuleVersion()); + if (module != null) { + ExecutionThread thread = new ExecutionThread(this, flow + .getFlowDescriptor(), module); + thread.start(); + } else { + throw new SlcException("ExecutionModule " + + flow.getModuleName() + ", version " + + flow.getModuleVersion() + " not found."); + } + + synchronized (this) { + try { + wait(); + } catch (InterruptedException e) { + // silent + } + } + } + + slcProcess.setStatus(SlcExecution.STATUS_FINISHED); + dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING, + SlcExecution.STATUS_FINISHED); + } + + public synchronized void flowCompleted() { + notifyAll(); + } + + public SlcExecution getSlcProcess() { + return slcProcess; + } + + public ThreadGroup getProcessThreadGroup() { + return processThreadGroup; + } + } + + /** Thread of a single execution */ + private class ExecutionThread extends Thread { + private final ExecutionFlowDescriptor executionFlowDescriptor; + private final ExecutionModule executionModule; + private final ProcessThread processThread; + + public ExecutionThread(ProcessThread processThread, + ExecutionFlowDescriptor executionFlowDescriptor, + ExecutionModule executionModule) { + super(processThread.getProcessThreadGroup(), "Flow " + + executionFlowDescriptor.getName()); + this.executionFlowDescriptor = executionFlowDescriptor; + this.executionModule = executionModule; + this.processThread = processThread; + } + + public void run() { + dispatchAddStep(processThread.getSlcProcess(), + new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START, + "Flow " + executionFlowDescriptor.getName())); + + try { + executionModule.execute(executionFlowDescriptor); + } catch (Exception e) { + // TODO: re-throw exception ? + String msg = "Execution of flow " + + executionFlowDescriptor.getName() + " failed."; + log.error(msg, e); + dispatchAddStep(processThread.getSlcProcess(), + new SlcExecutionStep(msg + " " + e.getMessage())); + } finally { + processThread.flowCompleted(); + dispatchAddStep(processThread.getSlcProcess(), + new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END, + "Flow " + executionFlowDescriptor.getName())); + } + } + } }