import org.argeo.slc.process.RealizedFlow;
import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.process.SlcExecutionNotifier;
-import org.springframework.util.Assert;
+import org.argeo.slc.process.SlcExecutionStep;
public class DefaultModulesManager implements ExecutionModulesManager {
private final static Log log = LogFactory
private List<ExecutionModule> executionModules = new ArrayList<ExecutionModule>();
private List<SlcExecutionNotifier> slcExecutionNotifiers = new ArrayList<SlcExecutionNotifier>();
+ private ThreadGroup processesThreadGroup = new ThreadGroup("Processes");
protected ExecutionModule getExecutionModule(String moduleName,
String version) {
String moduleName, String version) {
ExecutionModule module = getExecutionModule(moduleName, version);
- if(module==null)
- throw new SlcException("Module "+moduleName+" ("+version+") not found");
+ if (module == null)
+ throw new SlcException("Module " + moduleName + " (" + version
+ + ") not found");
return module.getDescriptor();
}
}
public void process(SlcExecution slcExecution) {
- new ProcessThread(slcExecution).start();
+ new ProcessThread(processesThreadGroup, slcExecution).start();
}
protected void dispatchUpdateStatus(SlcExecution slcExecution,
}
}
+ protected synchronized void dispatchAddStep(SlcExecution slcExecution,
+ SlcExecutionStep step) {
+ slcExecution.getSteps().add(step);
+ List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();
+ steps.add(step);
+ for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
+ .iterator(); it.hasNext();) {
+ it.next().addSteps(slcExecution, steps);
+ }
+ }
+
public void setSlcExecutionNotifiers(
List<SlcExecutionNotifier> slcExecutionNotifiers) {
this.slcExecutionNotifiers = slcExecutionNotifiers;
}
+ /** Thread of the SLC Process, starting the sub executions. */
private class ProcessThread extends Thread {
- private final SlcExecution slcExecution;
+ private final SlcExecution slcProcess;
+ private final ThreadGroup processThreadGroup;
private final List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
- public ProcessThread(SlcExecution slcExecution) {
- this.slcExecution = slcExecution;
+ public ProcessThread(ThreadGroup processesThreadGroup,
+ SlcExecution slcExecution) {
+ super(processesThreadGroup, "SLC Process #"
+ + slcExecution.getUuid());
+ this.slcProcess = slcExecution;
+ processThreadGroup = new ThreadGroup("SLC Process #"
+ + slcExecution.getUuid() + " thread group");
}
public void run() {
- log.info("\n##\n## Process SLC Execution " + slcExecution
- + "\n##\n");
+ log.info("\n##\n## Process SLC Execution " + slcProcess + "\n##\n");
// FIXME: hack to let the SlcExecution be registered on server
try {
// silent
}
- slcExecution.setStatus(SlcExecution.STATUS_RUNNING);
- dispatchUpdateStatus(slcExecution, SlcExecution.STATUS_SCHEDULED,
+ slcProcess.setStatus(SlcExecution.STATUS_RUNNING);
+ dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED,
SlcExecution.STATUS_RUNNING);
- flowsToProcess.addAll(slcExecution.getRealizedFlows());
+ flowsToProcess.addAll(slcProcess.getRealizedFlows());
while (flowsToProcess.size() > 0) {
RealizedFlow flow = flowsToProcess.remove(0);
}
}
- slcExecution.setStatus(SlcExecution.STATUS_RUNNING);
- dispatchUpdateStatus(slcExecution, SlcExecution.STATUS_RUNNING,
+ slcProcess.setStatus(SlcExecution.STATUS_FINISHED);
+ dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING,
SlcExecution.STATUS_FINISHED);
- /*
- * for (RealizedFlow flow : slcExecution.getRealizedFlows()) {
- * ExecutionModule module = getExecutionModule(flow.getModuleName(),
- * flow.getModuleVersion()); if (module != null) { ExecutionThread
- * thread = new ExecutionThread(flow .getFlowDescriptor(), module);
- * thread.start(); } else { throw new
- * SlcException("ExecutionModule " + flow.getModuleName() +
- * ", version " + flow.getModuleVersion() + " not found."); } }
- */
}
public synchronized void flowCompleted() {
notifyAll();
}
+
+ public SlcExecution getSlcProcess() {
+ return slcProcess;
+ }
+
+ public ThreadGroup getProcessThreadGroup() {
+ return processThreadGroup;
+ }
}
+ /** Thread of a single execution */
private class ExecutionThread extends Thread {
private final ExecutionFlowDescriptor executionFlowDescriptor;
private final ExecutionModule executionModule;
public ExecutionThread(ProcessThread processThread,
ExecutionFlowDescriptor executionFlowDescriptor,
ExecutionModule executionModule) {
- super("SLC Execution #" /* + executionContext.getUuid() */);
+ super(processThread.getProcessThreadGroup(), "Flow "
+ + executionFlowDescriptor.getName());
this.executionFlowDescriptor = executionFlowDescriptor;
this.executionModule = executionModule;
this.processThread = processThread;
}
public void run() {
+ dispatchAddStep(processThread.getSlcProcess(),
+ new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START,
+ "Flow " + executionFlowDescriptor.getName()));
+
try {
executionModule.execute(executionFlowDescriptor);
} catch (Exception e) {
// TODO: re-throw exception ?
- log.error("Execution "/* + executionContext.getUuid() */
- + " failed.", e);
+ String msg = "Execution of flow "
+ + executionFlowDescriptor.getName() + " failed.";
+ log.error(msg, e);
+ dispatchAddStep(processThread.getSlcProcess(),
+ new SlcExecutionStep(msg + " " + e.getMessage()));
} finally {
processThread.flowCompleted();
+ dispatchAddStep(processThread.getSlcProcess(),
+ new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END,
+ "Flow " + executionFlowDescriptor.getName()));
}
}
}