]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - org.argeo.slc.runtime/src/org/argeo/slc/runtime/ProcessThread.java
Clarify SLC project structure.
[gpl/argeo-slc.git] / org.argeo.slc.runtime / src / org / argeo / slc / runtime / ProcessThread.java
diff --git a/org.argeo.slc.runtime/src/org/argeo/slc/runtime/ProcessThread.java b/org.argeo.slc.runtime/src/org/argeo/slc/runtime/ProcessThread.java
deleted file mode 100644 (file)
index 2c4f73c..0000000
+++ /dev/null
@@ -1,215 +0,0 @@
-package org.argeo.slc.runtime;
-
-import java.security.AccessControlContext;
-import java.security.AccessController;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import javax.security.auth.Subject;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.execution.ExecutionModulesManager;
-import org.argeo.slc.execution.ExecutionProcess;
-import org.argeo.slc.execution.ExecutionStep;
-import org.argeo.slc.execution.RealizedFlow;
-
-/**
- * Main thread coordinating an {@link ExecutionProcess}, launching parallel or
- * sequential {@link ExecutionThread}s.
- */
-public class ProcessThread extends Thread {
-       private final static Log log = LogFactory.getLog(ProcessThread.class);
-
-       private final ExecutionModulesManager executionModulesManager;
-       private final ExecutionProcess process;
-       private final ProcessThreadGroup processThreadGroup;
-
-       private Set<ExecutionThread> executionThreads = Collections.synchronizedSet(new HashSet<ExecutionThread>());
-
-       // private Boolean hadAnError = false;
-       private Boolean killed = false;
-
-       private final AccessControlContext accessControlContext;
-
-       public ProcessThread(ThreadGroup processesThreadGroup, ExecutionModulesManager executionModulesManager,
-                       ExecutionProcess process) {
-               super(processesThreadGroup, "SLC Process #" + process.getUuid());
-               this.executionModulesManager = executionModulesManager;
-               this.process = process;
-               processThreadGroup = new ProcessThreadGroup(process);
-               accessControlContext = AccessController.getContext();
-       }
-
-       public final void run() {
-               // authenticate thread
-               // Authentication authentication = getProcessThreadGroup()
-               // .getAuthentication();
-               // if (authentication == null)
-               // throw new SlcException("Can only execute authenticated threads");
-               // SecurityContextHolder.getContext().setAuthentication(authentication);
-
-               log.info("\n##\n## SLC Process #" + process.getUuid() + " STARTED\n##\n");
-
-               // Start logging
-               new LoggingThread().start();
-
-               process.setStatus(ExecutionProcess.RUNNING);
-               try {
-                       Subject subject = Subject.getSubject(accessControlContext);
-                       try {
-                               Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
-
-                                       @Override
-                                       public Void run() throws Exception {
-                                               process();
-                                               return null;
-                                       }
-
-                               });
-                       } catch (PrivilegedActionException privilegedActionException) {
-                               Throwable cause = privilegedActionException.getCause();
-                               if (cause instanceof InterruptedException)
-                                       throw (InterruptedException) cause;
-                               else
-                                       throw new SlcException("Cannot process", cause);
-                       }
-                       // process();
-               } catch (InterruptedException e) {
-                       die();
-                       return;
-               } catch (Exception e) {
-                       String msg = "Process " + getProcess().getUuid() + " failed unexpectedly.";
-                       log.error(msg, e);
-                       getProcessThreadGroup()
-                                       .dispatchAddStep(new ExecutionStep("Process", ExecutionStep.ERROR, msg + " " + e.getMessage()));
-               }
-
-               // waits for all execution threads to complete (in case they were
-               // started asynchronously)
-               for (ExecutionThread executionThread : executionThreads) {
-                       if (executionThread.isAlive()) {
-                               try {
-                                       executionThread.join();
-                               } catch (InterruptedException e) {
-                                       die();
-                                       return;
-                               }
-                       }
-               }
-
-               computeFinalStatus();
-       }
-
-       /** Make sure this is called BEFORE all the threads are interrupted. */
-       private void computeFinalStatus() {
-               // String oldStatus = process.getStatus();
-               // TODO: error management at flow level?
-               if (killed)
-                       process.setStatus(ExecutionProcess.KILLED);
-               else if (processThreadGroup.hadAnError())
-                       process.setStatus(ExecutionProcess.ERROR);
-               else
-                       process.setStatus(ExecutionProcess.COMPLETED);
-               // executionModulesManager.dispatchUpdateStatus(process, oldStatus,
-               // process.getStatus());
-               log.info("\n## SLC Process #" + process.getUuid() + " " + process.getStatus() + "\n");
-       }
-
-       /** Called when being killed */
-       private synchronized void die() {
-               killed = true;
-               computeFinalStatus();
-               for (ExecutionThread executionThread : executionThreads) {
-                       try {
-                               executionThread.interrupt();
-                       } catch (Exception e) {
-                               log.error("Cannot interrupt " + executionThread);
-                       }
-               }
-               processThreadGroup.interrupt();
-       }
-
-       /**
-        * Implementation specific execution. To be overridden in order to deal with
-        * custom process types. Default expects an {@link SlcExecution}.
-        */
-       protected void process() throws InterruptedException {
-               List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
-               flowsToProcess.addAll(process.getRealizedFlows());
-               while (flowsToProcess.size() > 0) {
-                       RealizedFlow realizedFlow = flowsToProcess.remove(0);
-                       execute(realizedFlow, true);
-               }
-       }
-
-       /** @return the (distinct) thread used for this execution */
-       protected final void execute(RealizedFlow realizedFlow, Boolean synchronous) throws InterruptedException {
-               if (killed)
-                       return;
-
-               ExecutionThread thread = new ExecutionThread(processThreadGroup, executionModulesManager, realizedFlow);
-               executionThreads.add(thread);
-               thread.start();
-
-               if (synchronous)
-                       thread.join();
-
-               return;
-       }
-
-       // public void notifyError() {
-       // hadAnError = true;
-       // }
-       //
-       // public synchronized void flowCompleted() {
-       // // notifyAll();
-       // }
-
-       public ExecutionProcess getProcess() {
-               return process;
-       }
-
-       public ProcessThreadGroup getProcessThreadGroup() {
-               return processThreadGroup;
-       }
-
-       public ExecutionModulesManager getExecutionModulesManager() {
-               return executionModulesManager;
-       }
-
-       private class LoggingThread extends Thread {
-
-               public LoggingThread() {
-                       super("SLC Process Logger #" + process.getUuid());
-               }
-
-               public void run() {
-                       boolean run = true;
-                       while (run) {
-                               List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
-                               processThreadGroup.getSteps().drainTo(newSteps);
-                               if (newSteps.size() > 0) {
-                                       // System.out.println(steps.size() + " steps");
-                                       process.addSteps(newSteps);
-                               }
-
-                               try {
-                                       Thread.sleep(1000);
-                               } catch (InterruptedException e) {
-                                       break;
-                               }
-
-                               if (!ProcessThread.this.isAlive() && processThreadGroup.getSteps().size() == 0)
-                                       run = false;
-                       }
-               }
-
-       }
-}