]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java
Improve RPM Factory
[gpl/argeo-slc.git] / runtime / org.argeo.slc.core / src / main / java / org / argeo / slc / core / execution / ProcessThread.java
index 98d715ece3962f5eb77669d912a44ffa28124e1c..b64639a8cd55be014788e9e1e419277248c0f857 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
+ * Copyright (C) 2007-2012 Argeo GmbH
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,7 +13,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.argeo.slc.core.execution;
 
 import java.util.ArrayList;
@@ -28,11 +27,14 @@ import org.argeo.slc.SlcException;
 import org.argeo.slc.execution.ExecutionModulesManager;
 import org.argeo.slc.execution.ExecutionProcess;
 import org.argeo.slc.execution.ExecutionStep;
-import org.argeo.slc.process.RealizedFlow;
-import org.argeo.slc.process.SlcExecution;
+import org.argeo.slc.execution.RealizedFlow;
+import org.springframework.security.Authentication;
+import org.springframework.security.context.SecurityContextHolder;
 
-/** Thread of the SLC Process, starting the sub executions. */
-@SuppressWarnings("deprecation")
+/**
+ * Main thread coordinating an {@link ExecutionProcess}, launching parallel or
+ * sequential {@link ExecutionThread}s.
+ */
 public class ProcessThread extends Thread {
        private final static Log log = LogFactory.getLog(ProcessThread.class);
 
@@ -43,7 +45,7 @@ public class ProcessThread extends Thread {
        private Set<ExecutionThread> executionThreads = Collections
                        .synchronizedSet(new HashSet<ExecutionThread>());
 
-       private Boolean hadAnError = false;
+       // private Boolean hadAnError = false;
        private Boolean killed = false;
 
        public ProcessThread(ThreadGroup processesThreadGroup,
@@ -52,37 +54,36 @@ public class ProcessThread extends Thread {
                super(processesThreadGroup, "SLC Process #" + process.getUuid());
                this.executionModulesManager = executionModulesManager;
                this.process = process;
-               processThreadGroup = new ProcessThreadGroup(executionModulesManager,
-                               this);
+               processThreadGroup = new ProcessThreadGroup(process);
        }
 
        public final void run() {
                // authenticate thread
-               // Authentication authentication = getProcessThreadGroup()
-               // .getAuthentication();
-               // if (authentication == null)
-               // throw new SlcException("Can only execute authenticated threads");
-               // SecurityContextHolder.getContext().setAuthentication(authentication);
-
-               // log.info("\n##\n## SLC Process #" + process.getUuid() +
-               // " STARTED by "
-               // + authentication.getName() + "\n##\n");
+               Authentication authentication = getProcessThreadGroup()
+                               .getAuthentication();
+               if (authentication == null)
+                       throw new SlcException("Can only execute authenticated threads");
+               SecurityContextHolder.getContext().setAuthentication(authentication);
+
                log.info("\n##\n## SLC Process #" + process.getUuid()
                                + " STARTED\n##\n");
 
                // Start logging
                new LoggingThread().start();
 
-               String oldStatus = process.getStatus();
                process.setStatus(ExecutionProcess.RUNNING);
-               executionModulesManager.dispatchUpdateStatus(process, oldStatus,
-                               ExecutionProcess.RUNNING);
-
                try {
                        process();
                } catch (InterruptedException e) {
                        die();
                        return;
+               } catch (Exception e) {
+                       String msg = "Process " + getProcess().getUuid()
+                                       + " failed unexpectedly.";
+                       log.error(msg, e);
+                       getProcessThreadGroup().dispatchAddStep(
+                                       new ExecutionStep("Process", ExecutionStep.ERROR, msg + " "
+                                                       + e.getMessage()));
                }
 
                // waits for all execution threads to complete (in case they were
@@ -103,16 +104,16 @@ public class ProcessThread extends Thread {
 
        /** Make sure this is called BEFORE all the threads are interrupted. */
        private void computeFinalStatus() {
-               String oldStatus = process.getStatus();
+               // String oldStatus = process.getStatus();
                // TODO: error management at flow level?
                if (killed)
                        process.setStatus(ExecutionProcess.KILLED);
-               else if (hadAnError)
+               else if (processThreadGroup.hadAnError())
                        process.setStatus(ExecutionProcess.ERROR);
                else
                        process.setStatus(ExecutionProcess.COMPLETED);
-               executionModulesManager.dispatchUpdateStatus(process, oldStatus,
-                               process.getStatus());
+               // executionModulesManager.dispatchUpdateStatus(process, oldStatus,
+               // process.getStatus());
                log.info("\n## SLC Process #" + process.getUuid() + " "
                                + process.getStatus() + "\n");
        }
@@ -136,13 +137,8 @@ public class ProcessThread extends Thread {
         * custom process types. Default expects an {@link SlcExecution}.
         */
        protected void process() throws InterruptedException {
-               if (!(process instanceof SlcExecution))
-                       throw new SlcException("Unsupported process type "
-                                       + process.getClass());
-               SlcExecution slcExecution = (SlcExecution) process;
                List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
-               flowsToProcess.addAll(slcExecution.getRealizedFlows());
-
+               flowsToProcess.addAll(process.getRealizedFlows());
                while (flowsToProcess.size() > 0) {
                        RealizedFlow realizedFlow = flowsToProcess.remove(0);
                        execute(realizedFlow, true);
@@ -155,7 +151,8 @@ public class ProcessThread extends Thread {
                if (killed)
                        return;
 
-               ExecutionThread thread = new ExecutionThread(this, realizedFlow);
+               ExecutionThread thread = new ExecutionThread(processThreadGroup,
+                               executionModulesManager, realizedFlow);
                executionThreads.add(thread);
                thread.start();
 
@@ -165,13 +162,13 @@ public class ProcessThread extends Thread {
                return;
        }
 
-       public void notifyError() {
-               hadAnError = true;
-       }
-
-       public synchronized void flowCompleted() {
-               // notifyAll();
-       }
+       // public void notifyError() {
+       // hadAnError = true;
+       // }
+       //
+       // public synchronized void flowCompleted() {
+       // // notifyAll();
+       // }
 
        public ExecutionProcess getProcess() {
                return process;