]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java
Revert explicit security context propagation
[gpl/argeo-slc.git] / runtime / org.argeo.slc.core / src / main / java / org / argeo / slc / core / execution / ProcessThread.java
index 9474656fd5ac7a8be59326974f0d2414e8251cad..98d715ece3962f5eb77669d912a44ffa28124e1c 100644 (file)
@@ -17,6 +17,7 @@
 package org.argeo.slc.core.execution;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -26,10 +27,12 @@ import org.apache.commons.logging.LogFactory;
 import org.argeo.slc.SlcException;
 import org.argeo.slc.execution.ExecutionModulesManager;
 import org.argeo.slc.execution.ExecutionProcess;
+import org.argeo.slc.execution.ExecutionStep;
 import org.argeo.slc.process.RealizedFlow;
 import org.argeo.slc.process.SlcExecution;
 
 /** Thread of the SLC Process, starting the sub executions. */
+@SuppressWarnings("deprecation")
 public class ProcessThread extends Thread {
        private final static Log log = LogFactory.getLog(ProcessThread.class);
 
@@ -37,29 +40,50 @@ public class ProcessThread extends Thread {
        private final ExecutionProcess process;
        private final ProcessThreadGroup processThreadGroup;
 
-       private Set<ExecutionThread> executionThreads = new HashSet<ExecutionThread>();
+       private Set<ExecutionThread> executionThreads = Collections
+                       .synchronizedSet(new HashSet<ExecutionThread>());
 
        private Boolean hadAnError = false;
+       private Boolean killed = false;
 
-       public ProcessThread(ExecutionModulesManager executionModulesManager,
+       public ProcessThread(ThreadGroup processesThreadGroup,
+                       ExecutionModulesManager executionModulesManager,
                        ExecutionProcess process) {
-               super(executionModulesManager.getProcessesThreadGroup(),
-                               "SLC Process #" + process.getUuid());
+               super(processesThreadGroup, "SLC Process #" + process.getUuid());
                this.executionModulesManager = executionModulesManager;
                this.process = process;
                processThreadGroup = new ProcessThreadGroup(executionModulesManager,
                                this);
        }
 
-       public void run() {
+       public final void run() {
+               // authenticate thread
+               // Authentication authentication = getProcessThreadGroup()
+               // .getAuthentication();
+               // if (authentication == null)
+               // throw new SlcException("Can only execute authenticated threads");
+               // SecurityContextHolder.getContext().setAuthentication(authentication);
+
+               // log.info("\n##\n## SLC Process #" + process.getUuid() +
+               // " STARTED by "
+               // + authentication.getName() + "\n##\n");
                log.info("\n##\n## SLC Process #" + process.getUuid()
                                + " STARTED\n##\n");
 
-               process.setStatus(SlcExecution.RUNNING);
-               executionModulesManager.dispatchUpdateStatus(process,
-                               SlcExecution.SCHEDULED, SlcExecution.RUNNING);
+               // Start logging
+               new LoggingThread().start();
 
-               process();
+               String oldStatus = process.getStatus();
+               process.setStatus(ExecutionProcess.RUNNING);
+               executionModulesManager.dispatchUpdateStatus(process, oldStatus,
+                               ExecutionProcess.RUNNING);
+
+               try {
+                       process();
+               } catch (InterruptedException e) {
+                       die();
+                       return;
+               }
 
                // waits for all execution threads to complete (in case they were
                // started asynchronously)
@@ -68,28 +92,50 @@ public class ProcessThread extends Thread {
                                try {
                                        executionThread.join();
                                } catch (InterruptedException e) {
-                                       log.error("Execution thread " + executionThread
-                                                       + " was interrupted");
+                                       die();
+                                       return;
                                }
                        }
                }
 
+               computeFinalStatus();
+       }
+
+       /** Make sure this is called BEFORE all the threads are interrupted. */
+       private void computeFinalStatus() {
+               String oldStatus = process.getStatus();
                // TODO: error management at flow level?
-               if (hadAnError)
-                       process.setStatus(SlcExecution.ERROR);
+               if (killed)
+                       process.setStatus(ExecutionProcess.KILLED);
+               else if (hadAnError)
+                       process.setStatus(ExecutionProcess.ERROR);
                else
-                       process.setStatus(SlcExecution.COMPLETED);
-               executionModulesManager.dispatchUpdateStatus(process,
-                               SlcExecution.RUNNING, process.getStatus());
+                       process.setStatus(ExecutionProcess.COMPLETED);
+               executionModulesManager.dispatchUpdateStatus(process, oldStatus,
+                               process.getStatus());
+               log.info("\n## SLC Process #" + process.getUuid() + " "
+                               + process.getStatus() + "\n");
+       }
 
-               log.info("\n## SLC Process #" + process.getUuid() + " COMPLETED\n");
+       /** Called when being killed */
+       private synchronized void die() {
+               killed = true;
+               computeFinalStatus();
+               for (ExecutionThread executionThread : executionThreads) {
+                       try {
+                               executionThread.interrupt();
+                       } catch (Exception e) {
+                               log.error("Cannot interrupt " + executionThread);
+                       }
+               }
+               processThreadGroup.interrupt();
        }
 
        /**
         * Implementation specific execution. To be overridden in order to deal with
         * custom process types. Default expects an {@link SlcExecution}.
         */
-       protected void process() {
+       protected void process() throws InterruptedException {
                if (!(process instanceof SlcExecution))
                        throw new SlcException("Unsupported process type "
                                        + process.getClass());
@@ -103,26 +149,20 @@ public class ProcessThread extends Thread {
                }
        }
 
-       protected void execute(RealizedFlow realizedFlow, Boolean synchronous) {
+       /** @return the (distinct) thread used for this execution */
+       protected final void execute(RealizedFlow realizedFlow, Boolean synchronous)
+                       throws InterruptedException {
+               if (killed)
+                       return;
+
                ExecutionThread thread = new ExecutionThread(this, realizedFlow);
                executionThreads.add(thread);
                thread.start();
 
-               if (synchronous) {
-                       try {
-                               thread.join();
-                       } catch (InterruptedException e) {
-                               log.error("Flow " + realizedFlow + " was interrupted", e);
-                       }
-               }
+               if (synchronous)
+                       thread.join();
 
-               // synchronized (this) {
-               // try {
-               // wait();
-               // } catch (InterruptedException e) {
-               // // silent
-               // }
-               // }
+               return;
        }
 
        public void notifyError() {
@@ -144,4 +184,34 @@ public class ProcessThread extends Thread {
        public ExecutionModulesManager getExecutionModulesManager() {
                return executionModulesManager;
        }
+
+       private class LoggingThread extends Thread {
+
+               public LoggingThread() {
+                       super("SLC Process Logger #" + process.getUuid());
+               }
+
+               public void run() {
+                       boolean run = true;
+                       while (run) {
+                               List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
+                               processThreadGroup.getSteps().drainTo(newSteps);
+                               if (newSteps.size() > 0) {
+                                       // System.out.println(steps.size() + " steps");
+                                       process.addSteps(newSteps);
+                               }
+
+                               try {
+                                       Thread.sleep(1000);
+                               } catch (InterruptedException e) {
+                                       break;
+                               }
+
+                               if (!ProcessThread.this.isAlive()
+                                               && processThreadGroup.getSteps().size() == 0)
+                                       run = false;
+                       }
+               }
+
+       }
 }