]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java
Save current state even if not completely stable
[gpl/argeo-slc.git] / runtime / org.argeo.slc.core / src / main / java / org / argeo / slc / core / execution / ProcessThread.java
index c11e875b8083df0d627c51dba32587360bdfed22..1d333845d1e9bb1f7caa4996207b1b5c44533fb4 100644 (file)
@@ -21,12 +21,15 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.argeo.slc.SlcException;
 import org.argeo.slc.execution.ExecutionModulesManager;
 import org.argeo.slc.execution.ExecutionProcess;
+import org.argeo.slc.execution.ExecutionStep;
 import org.argeo.slc.process.RealizedFlow;
 import org.argeo.slc.process.SlcExecution;
 
@@ -44,6 +47,10 @@ public class ProcessThread extends Thread {
        private Boolean hadAnError = false;
        private Boolean killed = false;
 
+       private final static Integer STEPS_BUFFER_CAPACITY = 10000;
+       private BlockingQueue<ExecutionStep> steps = new ArrayBlockingQueue<ExecutionStep>(
+                       STEPS_BUFFER_CAPACITY);
+
        public ProcessThread(ThreadGroup processesThreadGroup,
                        ExecutionModulesManager executionModulesManager,
                        ExecutionProcess process) {
@@ -58,6 +65,9 @@ public class ProcessThread extends Thread {
                log.info("\n##\n## SLC Process #" + process.getUuid()
                                + " STARTED\n##\n");
 
+               // Start logging
+               new LoggingThread().start();
+
                String oldStatus = process.getStatus();
                process.setStatus(ExecutionProcess.RUNNING);
                executionModulesManager.dispatchUpdateStatus(process, oldStatus,
@@ -170,4 +180,29 @@ public class ProcessThread extends Thread {
        public ExecutionModulesManager getExecutionModulesManager() {
                return executionModulesManager;
        }
+
+       private class LoggingThread extends Thread {
+               public void run() {
+                       boolean run = true;
+                       while (run) {
+                               List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
+                               processThreadGroup.getSteps().drainTo(newSteps);
+                               if (newSteps.size() > 0) {
+                                       //System.out.println(steps.size() + " steps");
+                                       process.addSteps(newSteps);
+                               }
+
+                               try {
+                                       Thread.sleep(1000);
+                               } catch (InterruptedException e) {
+                                       break;
+                               }
+
+                               if (!ProcessThread.this.isAlive()
+                                               && processThreadGroup.getSteps().size() == 0)
+                                       run = false;
+                       }
+               }
+
+       }
 }