package org.argeo.slc.core.execution;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.argeo.slc.process.RealizedFlow;
import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.process.SlcExecutionNotifier;
+import org.argeo.slc.process.SlcExecutionStep;
+/** Provides the base feature of an execution module manager. */
public abstract class AbstractExecutionModulesManager implements
ExecutionModulesManager {
private final static Log log = LogFactory
//
}
+ public void dispatchUpdateStatus(SlcExecution slcExecution,
+ String oldStatus, String newStatus) {
+ for (Iterator<SlcExecutionNotifier> it = getSlcExecutionNotifiers()
+ .iterator(); it.hasNext();) {
+ it.next().updateStatus(slcExecution, oldStatus, newStatus);
+ }
+ }
+
+ public void dispatchAddStep(SlcExecution slcExecution, SlcExecutionStep step) {
+ List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();
+ steps.add(step);
+ for (Iterator<SlcExecutionNotifier> it = getSlcExecutionNotifiers()
+ .iterator(); it.hasNext();) {
+ it.next().addSteps(slcExecution, steps);
+ }
+ }
+
public void setSlcExecutionNotifiers(
List<SlcExecutionNotifier> slcExecutionNotifiers) {
this.slcExecutionNotifiers = slcExecutionNotifiers;
return processesThreadGroup;
}
- public List<ExecutionModulesListener> getExecutionModulesListeners() {
+ protected List<ExecutionModulesListener> getExecutionModulesListeners() {
return executionModulesListeners;
}
if (log.isDebugEnabled())
logStackEvent("=> ", executionFlow);
-
+
try {
// Actually execute the flow
pjp.proceed();
} finally {
if (log.isDebugEnabled())
logStackEvent("<= ", executionFlow);
-
+
executionStack.leaveFlow(executionFlow);
}
}
protected void logStackEvent(String symbol, ExecutionFlow executionFlow) {
Integer stackSize = executionStack.getStackSize();
- log.debug(depthSpaces(stackSize) + symbol + executionFlow + " #"
- + executionStack.getCurrentStackLevelUuid() + ", depth="
- + stackSize);
+ if (log.isTraceEnabled())
+ log.debug(depthSpaces(stackSize) + symbol + executionFlow + " #"
+ + executionStack.getCurrentStackLevelUuid() + ", depth="
+ + stackSize);
+ else if (log.isDebugEnabled())
+ log.debug(depthSpaces(stackSize) + symbol + executionFlow);
}
protected void logRunnableExecution(ExecutionFlow executionFlow,
Runnable runnable) {
Integer stackSize = executionStack.getStackSize();
- log.debug(depthSpaces(stackSize + 1)
- + runnable.getClass().getSimpleName() + " in " + executionFlow);
+ if (log.isDebugEnabled())
+ log.debug(depthSpaces(stackSize + 1)
+ + runnable.getClass().getSimpleName() + " in "
+ + executionFlow);
}
private String depthSpaces(int depth) {
package org.argeo.slc.core.execution;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.execution.ExecutionModulesManager;
import org.argeo.slc.process.RealizedFlow;
import org.argeo.slc.process.SlcExecution;
-import org.argeo.slc.process.SlcExecutionNotifier;
/** Thread of the SLC Process, starting the sub executions. */
public class ProcessThread extends Thread {
private final static Log log = LogFactory.getLog(ProcessThread.class);
- private final AbstractExecutionModulesManager executionModulesManager;
+ private final ExecutionModulesManager executionModulesManager;
private final SlcExecution slcProcess;
private final ProcessThreadGroup processThreadGroup;
private final List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
private Boolean hadAnError = false;
- public ProcessThread(
- AbstractExecutionModulesManager executionModulesManager,
+ public ProcessThread(ExecutionModulesManager executionModulesManager,
SlcExecution slcExecution) {
super(executionModulesManager.getProcessesThreadGroup(),
"SLC Process #" + slcExecution.getUuid());
this.executionModulesManager = executionModulesManager;
this.slcProcess = slcExecution;
- processThreadGroup = new ProcessThreadGroup(this);
+ processThreadGroup = new ProcessThreadGroup(executionModulesManager,
+ this);
}
public void run() {
- log.info("\n##\n## Process SLC Execution " + slcProcess + "\n##\n");
+ log.info("\n##\n## SLC Process " + slcProcess + " STARTED\n##");
slcProcess.setStatus(SlcExecution.STATUS_RUNNING);
- dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED,
- SlcExecution.STATUS_RUNNING);
+ executionModulesManager.dispatchUpdateStatus(slcProcess,
+ SlcExecution.STATUS_SCHEDULED, SlcExecution.STATUS_RUNNING);
flowsToProcess.addAll(slcProcess.getRealizedFlows());
ExecutionThread thread = new ExecutionThread(this, flow);
thread.start();
- synchronized (this) {
- try {
- wait();
- } catch (InterruptedException e) {
- // silent
- }
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ log.error("Flow " + flow + " was interrupted", e);
}
+
+ // synchronized (this) {
+ // try {
+ // wait();
+ // } catch (InterruptedException e) {
+ // // silent
+ // }
+ // }
}
+ // TODO: error management at flow level?
if (hadAnError)
slcProcess.setStatus(SlcExecution.STATUS_ERROR);
else
slcProcess.setStatus(SlcExecution.STATUS_FINISHED);
- dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING,
- slcProcess.getStatus());
- }
+ executionModulesManager.dispatchUpdateStatus(slcProcess,
+ SlcExecution.STATUS_RUNNING, slcProcess.getStatus());
- protected void dispatchUpdateStatus(SlcExecution slcExecution,
- String oldStatus, String newStatus) {
- for (Iterator<SlcExecutionNotifier> it = executionModulesManager
- .getSlcExecutionNotifiers().iterator(); it.hasNext();) {
- it.next().updateStatus(slcExecution, oldStatus, newStatus);
- }
+ log.info("## SLC Process " + slcProcess + " COMPLETED");
}
public void notifyError() {
}
public synchronized void flowCompleted() {
- notifyAll();
+ // notifyAll();
}
public SlcExecution getSlcProcess() {
return processThreadGroup;
}
- public AbstractExecutionModulesManager getExecutionModulesManager() {
+ public ExecutionModulesManager getExecutionModulesManager() {
return executionModulesManager;
}
}
package org.argeo.slc.core.execution;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
+import org.argeo.slc.execution.ExecutionModulesManager;
import org.argeo.slc.process.SlcExecution;
-import org.argeo.slc.process.SlcExecutionNotifier;
import org.argeo.slc.process.SlcExecutionStep;
+/** The thread group attached to a given {@link SlcExecution}. */
public class ProcessThreadGroup extends ThreadGroup {
+ private final ExecutionModulesManager executionModulesManager;
private final ProcessThread processThread;
- public ProcessThreadGroup(ProcessThread processThread) {
+ public ProcessThreadGroup(ExecutionModulesManager executionModulesManager,
+ ProcessThread processThread) {
super("SLC Process #" + processThread.getSlcProcess().getUuid()
+ " thread group");
+ this.executionModulesManager = executionModulesManager;
this.processThread = processThread;
}
}
public void dispatchAddStep(SlcExecutionStep step) {
- processThread.getSlcProcess().getSteps().add(step);
- List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();
- steps.add(step);
- for (Iterator<SlcExecutionNotifier> it = processThread
- .getExecutionModulesManager().getSlcExecutionNotifiers()
- .iterator(); it.hasNext();) {
- it.next().addSteps(processThread.getSlcProcess(), steps);
- }
+ SlcExecution slcProcess = processThread.getSlcProcess();
+ slcProcess.getSteps().add(step);
+ executionModulesManager.dispatchAddStep(slcProcess, step);
}
}
import org.argeo.slc.deploy.Module;
+/** Listen to events on execution modules. */
public interface ExecutionModulesListener {
public void executionModuleAdded(Module module,
ExecutionContext executionContext);
import org.argeo.slc.deploy.ModulesManager;
import org.argeo.slc.process.RealizedFlow;
import org.argeo.slc.process.SlcExecution;
+import org.argeo.slc.process.SlcExecutionStep;
/** Provides access to the execution modules */
public interface ExecutionModulesManager extends ModulesManager {
*/
public List<ExecutionModuleDescriptor> listExecutionModules();
- /** Asynchronously prepare and executes an {@link SlcExecution} */
+ /** Asynchronously prepares and executes an {@link SlcExecution} */
public void process(SlcExecution slcExecution);
+ /** The thread group to which all process threads will belong. */
+ public ThreadGroup getProcessesThreadGroup();
+
/** Synchronously finds and executes an {@link ExecutionFlow}. */
public void execute(RealizedFlow realizedFlow);
+
+ /** Notify of a status update status of the {@link SlcExecution} */
+ public void dispatchUpdateStatus(SlcExecution slcExecution,
+ String oldStatus, String newStatus);
+
+ /** Notify that a step was added in an {@link SlcExecution} */
+ public void dispatchAddStep(SlcExecution slcExecution, SlcExecutionStep step);
}