]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java
Primitive arguments working
[gpl/argeo-slc.git] / runtime / org.argeo.slc.core / src / main / java / org / argeo / slc / core / execution / ProcessThread.java
index 5af3262d57cb4923a76ce2e6a738154876ff12aa..dc9e7efdd8a1233d01c1ea7f05e0d69d41c34d92 100644 (file)
 package org.argeo.slc.core.execution;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.SlcException;
 import org.argeo.slc.execution.ExecutionModulesManager;
+import org.argeo.slc.execution.ExecutionProcess;
 import org.argeo.slc.process.RealizedFlow;
 import org.argeo.slc.process.SlcExecution;
 
@@ -30,61 +34,97 @@ public class ProcessThread extends Thread {
        private final static Log log = LogFactory.getLog(ProcessThread.class);
 
        private final ExecutionModulesManager executionModulesManager;
-       private final SlcExecution slcProcess;
+       private final ExecutionProcess process;
        private final ProcessThreadGroup processThreadGroup;
-       private final List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
+
+       private Set<ExecutionThread> executionThreads = new HashSet<ExecutionThread>();
 
        private Boolean hadAnError = false;
 
        public ProcessThread(ExecutionModulesManager executionModulesManager,
-                       SlcExecution slcExecution) {
+                       ExecutionProcess process) {
                super(executionModulesManager.getProcessesThreadGroup(),
-                               "SLC Process #" + slcExecution.getUuid());
+                               "SLC Process #" + process.getUuid());
                this.executionModulesManager = executionModulesManager;
-               this.slcProcess = slcExecution;
+               this.process = process;
                processThreadGroup = new ProcessThreadGroup(executionModulesManager,
                                this);
        }
 
        public void run() {
-               log.info("\n##\n## SLC Process #" + slcProcess.getUuid()
+               log.info("\n##\n## SLC Process #" + process.getUuid()
                                + " STARTED\n##\n");
 
-               slcProcess.setStatus(SlcExecution.STATUS_RUNNING);
-               executionModulesManager.dispatchUpdateStatus(slcProcess,
-                               SlcExecution.STATUS_SCHEDULED, SlcExecution.STATUS_RUNNING);
+               process.setStatus(SlcExecution.RUNNING);
+               executionModulesManager.dispatchUpdateStatus(process,
+                               SlcExecution.SCHEDULED, SlcExecution.RUNNING);
+
+               process();
+
+               // waits for all execution threads to complete (in case they were
+               // started asynchronously)
+               for (ExecutionThread executionThread : executionThreads) {
+                       if (executionThread.isAlive()) {
+                               try {
+                                       executionThread.join();
+                               } catch (InterruptedException e) {
+                                       log.error("Execution thread " + executionThread
+                                                       + " was interrupted");
+                               }
+                       }
+               }
+
+               // TODO: error management at flow level?
+               if (hadAnError)
+                       process.setStatus(SlcExecution.ERROR);
+               else
+                       process.setStatus(SlcExecution.COMPLETED);
+               executionModulesManager.dispatchUpdateStatus(process,
+                               SlcExecution.RUNNING, process.getStatus());
+
+               log.info("\n## SLC Process #" + process.getUuid() + " COMPLETED\n");
+       }
 
-               flowsToProcess.addAll(slcProcess.getRealizedFlows());
+       /**
+        * Implementation specific execution. To be overridden in order to deal with
+        * custom process types. Default expects an {@link SlcExecution}.
+        */
+       protected void process() {
+               if (!(process instanceof SlcExecution))
+                       throw new SlcException("Unsupported process type "
+                                       + process.getClass());
+               SlcExecution slcExecution = (SlcExecution) process;
+               List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
+               flowsToProcess.addAll(slcExecution.getRealizedFlows());
 
                while (flowsToProcess.size() > 0) {
-                       RealizedFlow flow = flowsToProcess.remove(0);
-                       ExecutionThread thread = new ExecutionThread(this, flow);
-                       thread.start();
+                       RealizedFlow realizedFlow = flowsToProcess.remove(0);
+                       execute(realizedFlow, true);
+               }
+       }
+
+       /** @return the (distinct) thread used for this execution */
+       protected Thread execute(RealizedFlow realizedFlow, Boolean synchronous) {
+               ExecutionThread thread = new ExecutionThread(this, realizedFlow);
+               executionThreads.add(thread);
+               thread.start();
 
+               if (synchronous) {
                        try {
                                thread.join();
                        } catch (InterruptedException e) {
-                               log.error("Flow " + flow + " was interrupted", e);
+                               log.error("Flow " + realizedFlow + " was interrupted", e);
                        }
-
-                       // synchronized (this) {
-                       // try {
-                       // wait();
-                       // } catch (InterruptedException e) {
-                       // // silent
-                       // }
-                       // }
                }
-
-               // TODO: error management at flow level?
-               if (hadAnError)
-                       slcProcess.setStatus(SlcExecution.STATUS_ERROR);
-               else
-                       slcProcess.setStatus(SlcExecution.STATUS_FINISHED);
-               executionModulesManager.dispatchUpdateStatus(slcProcess,
-                               SlcExecution.STATUS_RUNNING, slcProcess.getStatus());
-
-               log.info("\n## SLC Process #" + slcProcess.getUuid() + " COMPLETED\n");
+               return thread;
+
+               // synchronized (this) {
+               // try {
+               // wait();
+               // } catch (InterruptedException e) {
+               // // silent
+               // }
+               // }
        }
 
        public void notifyError() {
@@ -95,8 +135,8 @@ public class ProcessThread extends Thread {
                // notifyAll();
        }
 
-       public SlcExecution getSlcProcess() {
-               return slcProcess;
+       public ExecutionProcess getProcess() {
+               return process;
        }
 
        public ProcessThreadGroup getProcessThreadGroup() {