]> git.argeo.org Git - gpl/argeo-slc.git/commitdiff
Improve execution
authorMathieu Baudier <mbaudier@argeo.org>
Sat, 16 Apr 2011 16:34:53 +0000 (16:34 +0000)
committerMathieu Baudier <mbaudier@argeo.org>
Sat, 16 Apr 2011 16:34:53 +0000 (16:34 +0000)
NEW - bug 17: Generalize agent management and registration beyond JMS
https://bugzilla.argeo.org/show_bug.cgi?id=17

git-svn-id: https://svn.argeo.org/slc/trunk@4440 4cfe0d0a-d680-48aa-b62c-e0a02a3f76cc

runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java
runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ExecutionAspect.java
runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java
runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThreadGroup.java
runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/execution/ExecutionModulesListener.java
runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/execution/ExecutionModulesManager.java

index f471d65ca4dea76c2688e9e5984b5c49a217ed60..4e9856fe6f2745bd0d2cc5ab2ffa42c51592b633 100644 (file)
@@ -17,6 +17,7 @@
 package org.argeo.slc.core.execution;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -30,7 +31,9 @@ import org.argeo.slc.execution.ExecutionModulesManager;
 import org.argeo.slc.process.RealizedFlow;
 import org.argeo.slc.process.SlcExecution;
 import org.argeo.slc.process.SlcExecutionNotifier;
+import org.argeo.slc.process.SlcExecutionStep;
 
+/** Provides the base feature of an execution module manager. */
 public abstract class AbstractExecutionModulesManager implements
                ExecutionModulesManager {
        private final static Log log = LogFactory
@@ -81,6 +84,23 @@ public abstract class AbstractExecutionModulesManager implements
                //
        }
 
+       public void dispatchUpdateStatus(SlcExecution slcExecution,
+                       String oldStatus, String newStatus) {
+               for (Iterator<SlcExecutionNotifier> it = getSlcExecutionNotifiers()
+                               .iterator(); it.hasNext();) {
+                       it.next().updateStatus(slcExecution, oldStatus, newStatus);
+               }
+       }
+
+       public void dispatchAddStep(SlcExecution slcExecution, SlcExecutionStep step) {
+               List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();
+               steps.add(step);
+               for (Iterator<SlcExecutionNotifier> it = getSlcExecutionNotifiers()
+                               .iterator(); it.hasNext();) {
+                       it.next().addSteps(slcExecution, steps);
+               }
+       }
+
        public void setSlcExecutionNotifiers(
                        List<SlcExecutionNotifier> slcExecutionNotifiers) {
                this.slcExecutionNotifiers = slcExecutionNotifiers;
@@ -94,7 +114,7 @@ public abstract class AbstractExecutionModulesManager implements
                return processesThreadGroup;
        }
 
-       public List<ExecutionModulesListener> getExecutionModulesListeners() {
+       protected List<ExecutionModulesListener> getExecutionModulesListeners() {
                return executionModulesListeners;
        }
 
index 09e2afe5b66acfac0bef30becbdf9c5acde3ce24..e9e5b4cddd89967d62724500b61aaec332dfa24d 100644 (file)
@@ -48,14 +48,14 @@ public class ExecutionAspect {
 
                if (log.isDebugEnabled())
                        logStackEvent("=> ", executionFlow);
-               
+
                try {
                        // Actually execute the flow
                        pjp.proceed();
                } finally {
                        if (log.isDebugEnabled())
                                logStackEvent("<= ", executionFlow);
-       
+
                        executionStack.leaveFlow(executionFlow);
                }
        }
@@ -104,16 +104,21 @@ public class ExecutionAspect {
 
        protected void logStackEvent(String symbol, ExecutionFlow executionFlow) {
                Integer stackSize = executionStack.getStackSize();
-               log.debug(depthSpaces(stackSize) + symbol + executionFlow + " #"
-                               + executionStack.getCurrentStackLevelUuid() + ", depth="
-                               + stackSize);
+               if (log.isTraceEnabled())
+                       log.debug(depthSpaces(stackSize) + symbol + executionFlow + " #"
+                                       + executionStack.getCurrentStackLevelUuid() + ", depth="
+                                       + stackSize);
+               else if (log.isDebugEnabled())
+                       log.debug(depthSpaces(stackSize) + symbol + executionFlow);
        }
 
        protected void logRunnableExecution(ExecutionFlow executionFlow,
                        Runnable runnable) {
                Integer stackSize = executionStack.getStackSize();
-               log.debug(depthSpaces(stackSize + 1)
-                               + runnable.getClass().getSimpleName() + " in " + executionFlow);
+               if (log.isDebugEnabled())
+                       log.debug(depthSpaces(stackSize + 1)
+                                       + runnable.getClass().getSimpleName() + " in "
+                                       + executionFlow);
        }
 
        private String depthSpaces(int depth) {
index c119127ad20b71f1ab269321db7df1f08deb33fc..7c0e7b10db0a4d3a7111916041b1f134d3676c77 100644 (file)
 package org.argeo.slc.core.execution;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.execution.ExecutionModulesManager;
 import org.argeo.slc.process.RealizedFlow;
 import org.argeo.slc.process.SlcExecution;
-import org.argeo.slc.process.SlcExecutionNotifier;
 
 /** Thread of the SLC Process, starting the sub executions. */
 public class ProcessThread extends Thread {
        private final static Log log = LogFactory.getLog(ProcessThread.class);
 
-       private final AbstractExecutionModulesManager executionModulesManager;
+       private final ExecutionModulesManager executionModulesManager;
        private final SlcExecution slcProcess;
        private final ProcessThreadGroup processThreadGroup;
        private final List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
 
        private Boolean hadAnError = false;
 
-       public ProcessThread(
-                       AbstractExecutionModulesManager executionModulesManager,
+       public ProcessThread(ExecutionModulesManager executionModulesManager,
                        SlcExecution slcExecution) {
                super(executionModulesManager.getProcessesThreadGroup(),
                                "SLC Process #" + slcExecution.getUuid());
                this.executionModulesManager = executionModulesManager;
                this.slcProcess = slcExecution;
-               processThreadGroup = new ProcessThreadGroup(this);
+               processThreadGroup = new ProcessThreadGroup(executionModulesManager,
+                               this);
        }
 
        public void run() {
-               log.info("\n##\n## Process SLC Execution " + slcProcess + "\n##\n");
+               log.info("\n##\n## SLC Process " + slcProcess + " STARTED\n##");
 
                slcProcess.setStatus(SlcExecution.STATUS_RUNNING);
-               dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED,
-                               SlcExecution.STATUS_RUNNING);
+               executionModulesManager.dispatchUpdateStatus(slcProcess,
+                               SlcExecution.STATUS_SCHEDULED, SlcExecution.STATUS_RUNNING);
 
                flowsToProcess.addAll(slcProcess.getRealizedFlows());
 
@@ -61,29 +60,30 @@ public class ProcessThread extends Thread {
                        ExecutionThread thread = new ExecutionThread(this, flow);
                        thread.start();
 
-                       synchronized (this) {
-                               try {
-                                       wait();
-                               } catch (InterruptedException e) {
-                                       // silent
-                               }
+                       try {
+                               thread.join();
+                       } catch (InterruptedException e) {
+                               log.error("Flow " + flow + " was interrupted", e);
                        }
+
+                       // synchronized (this) {
+                       // try {
+                       // wait();
+                       // } catch (InterruptedException e) {
+                       // // silent
+                       // }
+                       // }
                }
 
+               // TODO: error management at flow level?
                if (hadAnError)
                        slcProcess.setStatus(SlcExecution.STATUS_ERROR);
                else
                        slcProcess.setStatus(SlcExecution.STATUS_FINISHED);
-               dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING,
-                               slcProcess.getStatus());
-       }
+               executionModulesManager.dispatchUpdateStatus(slcProcess,
+                               SlcExecution.STATUS_RUNNING, slcProcess.getStatus());
 
-       protected void dispatchUpdateStatus(SlcExecution slcExecution,
-                       String oldStatus, String newStatus) {
-               for (Iterator<SlcExecutionNotifier> it = executionModulesManager
-                               .getSlcExecutionNotifiers().iterator(); it.hasNext();) {
-                       it.next().updateStatus(slcExecution, oldStatus, newStatus);
-               }
+               log.info("## SLC Process " + slcProcess + " COMPLETED");
        }
 
        public void notifyError() {
@@ -91,7 +91,7 @@ public class ProcessThread extends Thread {
        }
 
        public synchronized void flowCompleted() {
-               notifyAll();
+               // notifyAll();
        }
 
        public SlcExecution getSlcProcess() {
@@ -102,7 +102,7 @@ public class ProcessThread extends Thread {
                return processThreadGroup;
        }
 
-       public AbstractExecutionModulesManager getExecutionModulesManager() {
+       public ExecutionModulesManager getExecutionModulesManager() {
                return executionModulesManager;
        }
 }
index f7e2bea1c810c500e625bf5aa672014f39e7e69b..1574b97d6761a1a62318d2a050bf11b1a693aacc 100644 (file)
 
 package org.argeo.slc.core.execution;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
+import org.argeo.slc.execution.ExecutionModulesManager;
 import org.argeo.slc.process.SlcExecution;
-import org.argeo.slc.process.SlcExecutionNotifier;
 import org.argeo.slc.process.SlcExecutionStep;
 
+/** The thread group attached to a given {@link SlcExecution}. */
 public class ProcessThreadGroup extends ThreadGroup {
+       private final ExecutionModulesManager executionModulesManager;
        private final ProcessThread processThread;
 
-       public ProcessThreadGroup(ProcessThread processThread) {
+       public ProcessThreadGroup(ExecutionModulesManager executionModulesManager,
+                       ProcessThread processThread) {
                super("SLC Process #" + processThread.getSlcProcess().getUuid()
                                + " thread group");
+               this.executionModulesManager = executionModulesManager;
                this.processThread = processThread;
        }
 
@@ -38,14 +38,9 @@ public class ProcessThreadGroup extends ThreadGroup {
        }
 
        public void dispatchAddStep(SlcExecutionStep step) {
-               processThread.getSlcProcess().getSteps().add(step);
-               List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();
-               steps.add(step);
-               for (Iterator<SlcExecutionNotifier> it = processThread
-                               .getExecutionModulesManager().getSlcExecutionNotifiers()
-                               .iterator(); it.hasNext();) {
-                       it.next().addSteps(processThread.getSlcProcess(), steps);
-               }
+               SlcExecution slcProcess = processThread.getSlcProcess();
+               slcProcess.getSteps().add(step);
+               executionModulesManager.dispatchAddStep(slcProcess, step);
        }
 
 }
index c515e107305f050c2b9fbe46eaefe83b9d67ffab..54f5cb3e6a49677d52cf26af8ed1095e6b684f77 100644 (file)
@@ -18,6 +18,7 @@ package org.argeo.slc.execution;
 
 import org.argeo.slc.deploy.Module;
 
+/** Listen to events on execution modules. */
 public interface ExecutionModulesListener {
        public void executionModuleAdded(Module module,
                        ExecutionContext executionContext);
index c1b9e47a3255714bd527e6823f6ddded53985063..26890ad05a8c1870dce466c7d4cdf8863c140b1c 100644 (file)
@@ -21,6 +21,7 @@ import java.util.List;
 import org.argeo.slc.deploy.ModulesManager;
 import org.argeo.slc.process.RealizedFlow;
 import org.argeo.slc.process.SlcExecution;
+import org.argeo.slc.process.SlcExecutionStep;
 
 /** Provides access to the execution modules */
 public interface ExecutionModulesManager extends ModulesManager {
@@ -34,9 +35,19 @@ public interface ExecutionModulesManager extends ModulesManager {
         */
        public List<ExecutionModuleDescriptor> listExecutionModules();
 
-       /** Asynchronously prepare and executes an {@link SlcExecution} */
+       /** Asynchronously prepares and executes an {@link SlcExecution} */
        public void process(SlcExecution slcExecution);
 
+       /** The thread group to which all process threads will belong. */
+       public ThreadGroup getProcessesThreadGroup();
+
        /** Synchronously finds and executes an {@link ExecutionFlow}. */
        public void execute(RealizedFlow realizedFlow);
+
+       /** Notify of a status update status of the {@link SlcExecution} */
+       public void dispatchUpdateStatus(SlcExecution slcExecution,
+                       String oldStatus, String newStatus);
+
+       /** Notify that a step was added in an {@link SlcExecution} */
+       public void dispatchAddStep(SlcExecution slcExecution, SlcExecutionStep step);
 }