import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.argeo.slc.SlcException;
import org.argeo.slc.execution.ExecutionModulesManager;
import org.argeo.slc.execution.ExecutionProcess;
+import org.argeo.slc.execution.ExecutionStep;
import org.argeo.slc.process.RealizedFlow;
import org.argeo.slc.process.SlcExecution;
private Boolean hadAnError = false;
private Boolean killed = false;
+ private final static Integer STEPS_BUFFER_CAPACITY = 10000;
+ private BlockingQueue<ExecutionStep> steps = new ArrayBlockingQueue<ExecutionStep>(
+ STEPS_BUFFER_CAPACITY);
+
public ProcessThread(ThreadGroup processesThreadGroup,
ExecutionModulesManager executionModulesManager,
ExecutionProcess process) {
log.info("\n##\n## SLC Process #" + process.getUuid()
+ " STARTED\n##\n");
+ // Start logging
+ new LoggingThread().start();
+
String oldStatus = process.getStatus();
process.setStatus(ExecutionProcess.RUNNING);
executionModulesManager.dispatchUpdateStatus(process, oldStatus,
public ExecutionModulesManager getExecutionModulesManager() {
return executionModulesManager;
}
+
+ private class LoggingThread extends Thread {
+ public void run() {
+ boolean run = true;
+ while (run) {
+ List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
+ processThreadGroup.getSteps().drainTo(newSteps);
+ if (newSteps.size() > 0) {
+ //System.out.println(steps.size() + " steps");
+ process.addSteps(newSteps);
+ }
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ break;
+ }
+
+ if (!ProcessThread.this.isAlive()
+ && processThreadGroup.getSteps().size() == 0)
+ run = false;
+ }
+ }
+
+ }
}