X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;ds=inline;f=org.argeo.slc.runtime%2Fsrc%2Forg%2Fargeo%2Fslc%2Fruntime%2FProcessThread.java;fp=org.argeo.slc.runtime%2Fsrc%2Forg%2Fargeo%2Fslc%2Fruntime%2FProcessThread.java;h=0000000000000000000000000000000000000000;hb=ecc22e604e47533c79de9cecdcdeacbc752cbff1;hp=2c4f73c7e88064a18924dcec42cef528ba34ca2d;hpb=e07ded4632e53f8b8869763bc1f1f4091361e76e;p=gpl%2Fargeo-slc.git diff --git a/org.argeo.slc.runtime/src/org/argeo/slc/runtime/ProcessThread.java b/org.argeo.slc.runtime/src/org/argeo/slc/runtime/ProcessThread.java deleted file mode 100644 index 2c4f73c7e..000000000 --- a/org.argeo.slc.runtime/src/org/argeo/slc/runtime/ProcessThread.java +++ /dev/null @@ -1,215 +0,0 @@ -package org.argeo.slc.runtime; - -import java.security.AccessControlContext; -import java.security.AccessController; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import javax.security.auth.Subject; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.argeo.slc.SlcException; -import org.argeo.slc.execution.ExecutionModulesManager; -import org.argeo.slc.execution.ExecutionProcess; -import org.argeo.slc.execution.ExecutionStep; -import org.argeo.slc.execution.RealizedFlow; - -/** - * Main thread coordinating an {@link ExecutionProcess}, launching parallel or - * sequential {@link ExecutionThread}s. - */ -public class ProcessThread extends Thread { - private final static Log log = LogFactory.getLog(ProcessThread.class); - - private final ExecutionModulesManager executionModulesManager; - private final ExecutionProcess process; - private final ProcessThreadGroup processThreadGroup; - - private Set executionThreads = Collections.synchronizedSet(new HashSet()); - - // private Boolean hadAnError = false; - private Boolean killed = false; - - private final AccessControlContext accessControlContext; - - public ProcessThread(ThreadGroup processesThreadGroup, ExecutionModulesManager executionModulesManager, - ExecutionProcess process) { - super(processesThreadGroup, "SLC Process #" + process.getUuid()); - this.executionModulesManager = executionModulesManager; - this.process = process; - processThreadGroup = new ProcessThreadGroup(process); - accessControlContext = AccessController.getContext(); - } - - public final void run() { - // authenticate thread - // Authentication authentication = getProcessThreadGroup() - // .getAuthentication(); - // if (authentication == null) - // throw new SlcException("Can only execute authenticated threads"); - // SecurityContextHolder.getContext().setAuthentication(authentication); - - log.info("\n##\n## SLC Process #" + process.getUuid() + " STARTED\n##\n"); - - // Start logging - new LoggingThread().start(); - - process.setStatus(ExecutionProcess.RUNNING); - try { - Subject subject = Subject.getSubject(accessControlContext); - try { - Subject.doAs(subject, new PrivilegedExceptionAction() { - - @Override - public Void run() throws Exception { - process(); - return null; - } - - }); - } catch (PrivilegedActionException privilegedActionException) { - Throwable cause = privilegedActionException.getCause(); - if (cause instanceof InterruptedException) - throw (InterruptedException) cause; - else - throw new SlcException("Cannot process", cause); - } - // process(); - } catch (InterruptedException e) { - die(); - return; - } catch (Exception e) { - String msg = "Process " + getProcess().getUuid() + " failed unexpectedly."; - log.error(msg, e); - getProcessThreadGroup() - .dispatchAddStep(new ExecutionStep("Process", ExecutionStep.ERROR, msg + " " + e.getMessage())); - } - - // waits for all execution threads to complete (in case they were - // started asynchronously) - for (ExecutionThread executionThread : executionThreads) { - if (executionThread.isAlive()) { - try { - executionThread.join(); - } catch (InterruptedException e) { - die(); - return; - } - } - } - - computeFinalStatus(); - } - - /** Make sure this is called BEFORE all the threads are interrupted. */ - private void computeFinalStatus() { - // String oldStatus = process.getStatus(); - // TODO: error management at flow level? - if (killed) - process.setStatus(ExecutionProcess.KILLED); - else if (processThreadGroup.hadAnError()) - process.setStatus(ExecutionProcess.ERROR); - else - process.setStatus(ExecutionProcess.COMPLETED); - // executionModulesManager.dispatchUpdateStatus(process, oldStatus, - // process.getStatus()); - log.info("\n## SLC Process #" + process.getUuid() + " " + process.getStatus() + "\n"); - } - - /** Called when being killed */ - private synchronized void die() { - killed = true; - computeFinalStatus(); - for (ExecutionThread executionThread : executionThreads) { - try { - executionThread.interrupt(); - } catch (Exception e) { - log.error("Cannot interrupt " + executionThread); - } - } - processThreadGroup.interrupt(); - } - - /** - * Implementation specific execution. To be overridden in order to deal with - * custom process types. Default expects an {@link SlcExecution}. - */ - protected void process() throws InterruptedException { - List flowsToProcess = new ArrayList(); - flowsToProcess.addAll(process.getRealizedFlows()); - while (flowsToProcess.size() > 0) { - RealizedFlow realizedFlow = flowsToProcess.remove(0); - execute(realizedFlow, true); - } - } - - /** @return the (distinct) thread used for this execution */ - protected final void execute(RealizedFlow realizedFlow, Boolean synchronous) throws InterruptedException { - if (killed) - return; - - ExecutionThread thread = new ExecutionThread(processThreadGroup, executionModulesManager, realizedFlow); - executionThreads.add(thread); - thread.start(); - - if (synchronous) - thread.join(); - - return; - } - - // public void notifyError() { - // hadAnError = true; - // } - // - // public synchronized void flowCompleted() { - // // notifyAll(); - // } - - public ExecutionProcess getProcess() { - return process; - } - - public ProcessThreadGroup getProcessThreadGroup() { - return processThreadGroup; - } - - public ExecutionModulesManager getExecutionModulesManager() { - return executionModulesManager; - } - - private class LoggingThread extends Thread { - - public LoggingThread() { - super("SLC Process Logger #" + process.getUuid()); - } - - public void run() { - boolean run = true; - while (run) { - List newSteps = new ArrayList(); - processThreadGroup.getSteps().drainTo(newSteps); - if (newSteps.size() > 0) { - // System.out.println(steps.size() + " steps"); - process.addSteps(newSteps); - } - - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - break; - } - - if (!ProcessThread.this.isAlive() && processThreadGroup.getSteps().size() == 0) - run = false; - } - } - - } -}