X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=runtime%2Forg.argeo.slc.support.simple%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fcore%2Fexecution%2FDefaultModulesManager.java;h=b843c4ff0b83aff31a2185656003320af0db661d;hb=e5249c75ef672c95d9cf5405b05cabf6083d18a6;hp=05a0e8daef263a5936a22b2d2ad43da7bfac2699;hpb=236b49051dd15775eba14d444878814509be32b7;p=gpl%2Fargeo-slc.git diff --git a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/execution/DefaultModulesManager.java b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/execution/DefaultModulesManager.java index 05a0e8dae..b843c4ff0 100644 --- a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/execution/DefaultModulesManager.java +++ b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/execution/DefaultModulesManager.java @@ -14,7 +14,7 @@ import org.argeo.slc.execution.ExecutionModulesManager; import org.argeo.slc.process.RealizedFlow; import org.argeo.slc.process.SlcExecution; import org.argeo.slc.process.SlcExecutionNotifier; -import org.springframework.util.Assert; +import org.argeo.slc.process.SlcExecutionStep; public class DefaultModulesManager implements ExecutionModulesManager { private final static Log log = LogFactory @@ -22,6 +22,7 @@ public class DefaultModulesManager implements ExecutionModulesManager { private List executionModules = new ArrayList(); private List slcExecutionNotifiers = new ArrayList(); + private ThreadGroup processesThreadGroup = new ThreadGroup("Processes"); protected ExecutionModule getExecutionModule(String moduleName, String version) { @@ -39,8 +40,9 @@ public class DefaultModulesManager implements ExecutionModulesManager { String moduleName, String version) { ExecutionModule module = getExecutionModule(moduleName, version); - if(module==null) - throw new SlcException("Module "+moduleName+" ("+version+") not found"); + if (module == null) + throw new SlcException("Module " + moduleName + " (" + version + + ") not found"); return module.getDescriptor(); } @@ -54,7 +56,7 @@ public class DefaultModulesManager implements ExecutionModulesManager { } public void process(SlcExecution slcExecution) { - new ProcessThread(slcExecution).start(); + new ProcessThread(processesThreadGroup, slcExecution).start(); } protected void dispatchUpdateStatus(SlcExecution slcExecution, @@ -65,22 +67,39 @@ public class DefaultModulesManager implements ExecutionModulesManager { } } + protected synchronized void dispatchAddStep(SlcExecution slcExecution, + SlcExecutionStep step) { + slcExecution.getSteps().add(step); + List steps = new ArrayList(); + steps.add(step); + for (Iterator it = slcExecutionNotifiers + .iterator(); it.hasNext();) { + it.next().addSteps(slcExecution, steps); + } + } + public void setSlcExecutionNotifiers( List slcExecutionNotifiers) { this.slcExecutionNotifiers = slcExecutionNotifiers; } + /** Thread of the SLC Process, starting the sub executions. */ private class ProcessThread extends Thread { - private final SlcExecution slcExecution; + private final SlcExecution slcProcess; + private final ThreadGroup processThreadGroup; private final List flowsToProcess = new ArrayList(); - public ProcessThread(SlcExecution slcExecution) { - this.slcExecution = slcExecution; + public ProcessThread(ThreadGroup processesThreadGroup, + SlcExecution slcExecution) { + super(processesThreadGroup, "SLC Process #" + + slcExecution.getUuid()); + this.slcProcess = slcExecution; + processThreadGroup = new ThreadGroup("SLC Process #" + + slcExecution.getUuid() + " thread group"); } public void run() { - log.info("\n##\n## Process SLC Execution " + slcExecution - + "\n##\n"); + log.info("\n##\n## Process SLC Execution " + slcProcess + "\n##\n"); // FIXME: hack to let the SlcExecution be registered on server try { @@ -89,11 +108,11 @@ public class DefaultModulesManager implements ExecutionModulesManager { // silent } - slcExecution.setStatus(SlcExecution.STATUS_RUNNING); - dispatchUpdateStatus(slcExecution, SlcExecution.STATUS_SCHEDULED, + slcProcess.setStatus(SlcExecution.STATUS_RUNNING); + dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED, SlcExecution.STATUS_RUNNING); - flowsToProcess.addAll(slcExecution.getRealizedFlows()); + flowsToProcess.addAll(slcProcess.getRealizedFlows()); while (flowsToProcess.size() > 0) { RealizedFlow flow = flowsToProcess.remove(0); @@ -118,25 +137,25 @@ public class DefaultModulesManager implements ExecutionModulesManager { } } - slcExecution.setStatus(SlcExecution.STATUS_RUNNING); - dispatchUpdateStatus(slcExecution, SlcExecution.STATUS_RUNNING, + slcProcess.setStatus(SlcExecution.STATUS_FINISHED); + dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING, SlcExecution.STATUS_FINISHED); - /* - * for (RealizedFlow flow : slcExecution.getRealizedFlows()) { - * ExecutionModule module = getExecutionModule(flow.getModuleName(), - * flow.getModuleVersion()); if (module != null) { ExecutionThread - * thread = new ExecutionThread(flow .getFlowDescriptor(), module); - * thread.start(); } else { throw new - * SlcException("ExecutionModule " + flow.getModuleName() + - * ", version " + flow.getModuleVersion() + " not found."); } } - */ } public synchronized void flowCompleted() { notifyAll(); } + + public SlcExecution getSlcProcess() { + return slcProcess; + } + + public ThreadGroup getProcessThreadGroup() { + return processThreadGroup; + } } + /** Thread of a single execution */ private class ExecutionThread extends Thread { private final ExecutionFlowDescriptor executionFlowDescriptor; private final ExecutionModule executionModule; @@ -145,21 +164,32 @@ public class DefaultModulesManager implements ExecutionModulesManager { public ExecutionThread(ProcessThread processThread, ExecutionFlowDescriptor executionFlowDescriptor, ExecutionModule executionModule) { - super("SLC Execution #" /* + executionContext.getUuid() */); + super(processThread.getProcessThreadGroup(), "Flow " + + executionFlowDescriptor.getName()); this.executionFlowDescriptor = executionFlowDescriptor; this.executionModule = executionModule; this.processThread = processThread; } public void run() { + dispatchAddStep(processThread.getSlcProcess(), + new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START, + "Flow " + executionFlowDescriptor.getName())); + try { executionModule.execute(executionFlowDescriptor); } catch (Exception e) { // TODO: re-throw exception ? - log.error("Execution "/* + executionContext.getUuid() */ - + " failed.", e); + String msg = "Execution of flow " + + executionFlowDescriptor.getName() + " failed."; + log.error(msg, e); + dispatchAddStep(processThread.getSlcProcess(), + new SlcExecutionStep(msg + " " + e.getMessage())); } finally { processThread.flowCompleted(); + dispatchAddStep(processThread.getSlcProcess(), + new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END, + "Flow " + executionFlowDescriptor.getName())); } } }