X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;ds=sidebyside;f=runtime%2Forg.argeo.slc.core%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fcore%2Fexecution%2FProcessThread.java;h=5552d664e9a5f01bd180fc9a248e270ec6e2d29c;hb=af9457b0628ba4cc625192762d0c0fe7564b9846;hp=1d333845d1e9bb1f7caa4996207b1b5c44533fb4;hpb=9b2422e7198df6f34282a805058dd5f497417318;p=gpl%2Fargeo-slc.git diff --git a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java index 1d333845d..5552d664e 100644 --- a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java +++ b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2010 Mathieu Baudier + * Copyright (C) 2007-2012 Argeo GmbH * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.argeo.slc.core.execution; import java.util.ArrayList; @@ -21,8 +20,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,10 +27,14 @@ import org.argeo.slc.SlcException; import org.argeo.slc.execution.ExecutionModulesManager; import org.argeo.slc.execution.ExecutionProcess; import org.argeo.slc.execution.ExecutionStep; -import org.argeo.slc.process.RealizedFlow; -import org.argeo.slc.process.SlcExecution; +import org.argeo.slc.execution.RealizedFlow; +import org.springframework.security.Authentication; +import org.springframework.security.context.SecurityContextHolder; -/** Thread of the SLC Process, starting the sub executions. */ +/** + * Main thread coordinating an {@link ExecutionProcess}, launching parallel or + * sequential {@link ExecutionThread}s. + */ public class ProcessThread extends Thread { private final static Log log = LogFactory.getLog(ProcessThread.class); @@ -44,34 +45,39 @@ public class ProcessThread extends Thread { private Set executionThreads = Collections .synchronizedSet(new HashSet()); - private Boolean hadAnError = false; + // private Boolean hadAnError = false; private Boolean killed = false; - private final static Integer STEPS_BUFFER_CAPACITY = 10000; - private BlockingQueue steps = new ArrayBlockingQueue( - STEPS_BUFFER_CAPACITY); - public ProcessThread(ThreadGroup processesThreadGroup, ExecutionModulesManager executionModulesManager, ExecutionProcess process) { super(processesThreadGroup, "SLC Process #" + process.getUuid()); this.executionModulesManager = executionModulesManager; this.process = process; - processThreadGroup = new ProcessThreadGroup(executionModulesManager, - this); + processThreadGroup = new ProcessThreadGroup(process); } public final void run() { + // authenticate thread + Authentication authentication = getProcessThreadGroup() + .getAuthentication(); + if (authentication == null) + throw new SlcException("Can only execute authenticated threads"); + SecurityContextHolder.getContext().setAuthentication(authentication); + + // log.info("\n##\n## SLC Process #" + process.getUuid() + + // " STARTED by " + // + authentication.getName() + "\n##\n"); log.info("\n##\n## SLC Process #" + process.getUuid() + " STARTED\n##\n"); // Start logging new LoggingThread().start(); - String oldStatus = process.getStatus(); + // String oldStatus = process.getStatus(); process.setStatus(ExecutionProcess.RUNNING); - executionModulesManager.dispatchUpdateStatus(process, oldStatus, - ExecutionProcess.RUNNING); + // executionModulesManager.dispatchUpdateStatus(process, oldStatus, + // ExecutionProcess.RUNNING); try { process(); @@ -98,16 +104,16 @@ public class ProcessThread extends Thread { /** Make sure this is called BEFORE all the threads are interrupted. */ private void computeFinalStatus() { - String oldStatus = process.getStatus(); + // String oldStatus = process.getStatus(); // TODO: error management at flow level? if (killed) process.setStatus(ExecutionProcess.KILLED); - else if (hadAnError) + else if (processThreadGroup.hadAnError()) process.setStatus(ExecutionProcess.ERROR); else process.setStatus(ExecutionProcess.COMPLETED); - executionModulesManager.dispatchUpdateStatus(process, oldStatus, - process.getStatus()); + // executionModulesManager.dispatchUpdateStatus(process, oldStatus, + // process.getStatus()); log.info("\n## SLC Process #" + process.getUuid() + " " + process.getStatus() + "\n"); } @@ -130,15 +136,9 @@ public class ProcessThread extends Thread { * Implementation specific execution. To be overridden in order to deal with * custom process types. Default expects an {@link SlcExecution}. */ - @SuppressWarnings("deprecation") protected void process() throws InterruptedException { - if (!(process instanceof SlcExecution)) - throw new SlcException("Unsupported process type " - + process.getClass()); - SlcExecution slcExecution = (SlcExecution) process; List flowsToProcess = new ArrayList(); - flowsToProcess.addAll(slcExecution.getRealizedFlows()); - + flowsToProcess.addAll(process.getRealizedFlows()); while (flowsToProcess.size() > 0) { RealizedFlow realizedFlow = flowsToProcess.remove(0); execute(realizedFlow, true); @@ -151,7 +151,8 @@ public class ProcessThread extends Thread { if (killed) return; - ExecutionThread thread = new ExecutionThread(this, realizedFlow); + ExecutionThread thread = new ExecutionThread(processThreadGroup, + executionModulesManager, realizedFlow); executionThreads.add(thread); thread.start(); @@ -161,13 +162,13 @@ public class ProcessThread extends Thread { return; } - public void notifyError() { - hadAnError = true; - } - - public synchronized void flowCompleted() { - // notifyAll(); - } +// public void notifyError() { +// hadAnError = true; +// } +// +// public synchronized void flowCompleted() { +// // notifyAll(); +// } public ExecutionProcess getProcess() { return process; @@ -182,13 +183,18 @@ public class ProcessThread extends Thread { } private class LoggingThread extends Thread { + + public LoggingThread() { + super("SLC Process Logger #" + process.getUuid()); + } + public void run() { boolean run = true; while (run) { List newSteps = new ArrayList(); processThreadGroup.getSteps().drainTo(newSteps); if (newSteps.size() > 0) { - //System.out.println(steps.size() + " steps"); + // System.out.println(steps.size() + " steps"); process.addSteps(newSteps); }