package org.argeo.slc.core.execution; import java.util.ArrayList; import java.util.List; import org.argeo.slc.execution.ExecutionModulesManager; import org.argeo.slc.process.SlcExecution; import org.argeo.slc.process.SlcExecutionNotifier; public abstract class AbstractExecutionModulesManager implements ExecutionModulesManager { private List slcExecutionNotifiers = new ArrayList(); private ThreadGroup processesThreadGroup = new ThreadGroup("Processes"); public void process(SlcExecution slcExecution) { new ProcessThread(this, slcExecution).start(); } /* protected void dispatchUpdateStatus(SlcExecution slcExecution, String oldStatus, String newStatus) { for (Iterator it = slcExecutionNotifiers .iterator(); it.hasNext();) { it.next().updateStatus(slcExecution, oldStatus, newStatus); } } protected synchronized void dispatchAddStep(SlcExecution slcExecution, SlcExecutionStep step) { slcExecution.getSteps().add(step); List steps = new ArrayList(); steps.add(step); for (Iterator it = slcExecutionNotifiers .iterator(); it.hasNext();) { it.next().addSteps(slcExecution, steps); } }*/ public void setSlcExecutionNotifiers( List slcExecutionNotifiers) { this.slcExecutionNotifiers = slcExecutionNotifiers; } /* protected static void addFlowsToDescriptor(ExecutionModuleDescriptor md, Map executionFlows) { // TODO: put this in a separate configurable object for (String name : executionFlows.keySet()) { ExecutionFlow executionFlow = executionFlows.get(name); Assert.notNull(executionFlow.getName()); Assert.state(name.equals(executionFlow.getName())); ExecutionSpec executionSpec = executionFlow.getExecutionSpec(); Assert.notNull(executionSpec); Assert.notNull(executionSpec.getName()); Map values = new TreeMap(); for (String key : executionSpec.getAttributes().keySet()) { ExecutionSpecAttribute attribute = executionSpec .getAttributes().get(key); if (executionFlow.isSetAsParameter(key)) { Object value = executionFlow.getParameter(key); if (attribute instanceof PrimitiveSpecAttribute) { PrimitiveValue primitiveValue = new PrimitiveValue(); primitiveValue .setType(((PrimitiveSpecAttribute) attribute) .getType()); primitiveValue.setValue(value); values.put(key, primitiveValue); } else if (attribute instanceof RefSpecAttribute) { RefValue refValue = new RefValue(); if (value instanceof ScopedObject) { refValue.setLabel("RUNTIME " + value.getClass().getName()); } else { refValue.setLabel("STATIC " + value.getClass().getName()); } values.put(key, refValue); } else { throw new SlcException("Unkown spec attribute type " + attribute.getClass()); } } } ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(name, values, executionSpec); if (executionFlow.getPath() != null) efd.setPath(executionFlow.getPath()); // Add execution spec if necessary if (!md.getExecutionSpecs().contains(executionSpec)) md.getExecutionSpecs().add(executionSpec); // Add execution flow md.getExecutionFlows().add(efd); } } */ /** * Thread of the SLC Process, starting the sub executions. private class * ProcessThread extends Thread { private final SlcExecution slcProcess; * private final ThreadGroup processThreadGroup; private final * List flowsToProcess = new ArrayList(); * * public ProcessThread(ThreadGroup processesThreadGroup, SlcExecution * slcExecution) { super(processesThreadGroup, "SLC Process #" + * slcExecution.getUuid()); this.slcProcess = slcExecution; * processThreadGroup = new ThreadGroup("SLC Process #" + * slcExecution.getUuid() + " thread group"); } * * public void run() { log.info("\n##\n## Process SLC Execution " + * slcProcess + "\n##\n"); * * slcProcess.setStatus(SlcExecution.STATUS_RUNNING); * dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED, * SlcExecution.STATUS_RUNNING); * * flowsToProcess.addAll(slcProcess.getRealizedFlows()); * * while (flowsToProcess.size() > 0) { RealizedFlow flow = * flowsToProcess.remove(0); ExecutionThread thread = new * ExecutionThread(this, flow); thread.start(); * * synchronized (this) { try { wait(); } catch (InterruptedException e) { // * silent } } } * * slcProcess.setStatus(SlcExecution.STATUS_FINISHED); * dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING, * SlcExecution.STATUS_FINISHED); } * * public synchronized void flowCompleted() { notifyAll(); } * * public SlcExecution getSlcProcess() { return slcProcess; } * * public ThreadGroup getProcessThreadGroup() { return processThreadGroup; } * } */ /** * Thread of a single execution private class ExecutionThread extends Thread * { private final RealizedFlow realizedFlow; private final ProcessThread * processThread; * * public ExecutionThread(ProcessThread processThread, RealizedFlow * realizedFlow) { super(processThread.getProcessThreadGroup(), "Flow " + * realizedFlow.getFlowDescriptor().getName()); this.realizedFlow = * realizedFlow; this.processThread = processThread; } * * public void run() { ExecutionFlowDescriptor executionFlowDescriptor = * realizedFlow .getFlowDescriptor(); String flowName = * executionFlowDescriptor.getName(); * * dispatchAddStep(processThread.getSlcProcess(), new * SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START, "Flow " + flowName)); * * try { execute(realizedFlow); } catch (Exception e) { // TODO: re-throw * exception ? String msg = "Execution of flow " + flowName + " failed."; * log.error(msg, e); dispatchAddStep(processThread.getSlcProcess(), new * SlcExecutionStep(msg + " " + e.getMessage())); } finally { * processThread.flowCompleted(); * dispatchAddStep(processThread.getSlcProcess(), new * SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END, "Flow " + flowName)); } * } } */ public List getSlcExecutionNotifiers() { return slcExecutionNotifiers; } public ThreadGroup getProcessesThreadGroup() { return processesThreadGroup; } }