]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/execution/DefaultModulesManager.java
Remove eclipse project definitions
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.simple / src / main / java / org / argeo / slc / core / execution / DefaultModulesManager.java
index fdeb864a5b6b19be2b8c2346655ad692e524be5e..b843c4ff0b83aff31a2185656003320af0db661d 100644 (file)
@@ -1,15 +1,12 @@
 package org.argeo.slc.core.execution;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.argeo.slc.SlcException;
-import org.argeo.slc.execution.ExecutionContext;
 import org.argeo.slc.execution.ExecutionFlowDescriptor;
 import org.argeo.slc.execution.ExecutionModule;
 import org.argeo.slc.execution.ExecutionModuleDescriptor;
@@ -17,8 +14,7 @@ import org.argeo.slc.execution.ExecutionModulesManager;
 import org.argeo.slc.process.RealizedFlow;
 import org.argeo.slc.process.SlcExecution;
 import org.argeo.slc.process.SlcExecutionNotifier;
-import org.dbunit.operation.UpdateOperation;
-import org.springframework.util.Assert;
+import org.argeo.slc.process.SlcExecutionStep;
 
 public class DefaultModulesManager implements ExecutionModulesManager {
        private final static Log log = LogFactory
@@ -26,6 +22,7 @@ public class DefaultModulesManager implements ExecutionModulesManager {
 
        private List<ExecutionModule> executionModules = new ArrayList<ExecutionModule>();
        private List<SlcExecutionNotifier> slcExecutionNotifiers = new ArrayList<SlcExecutionNotifier>();
+       private ThreadGroup processesThreadGroup = new ThreadGroup("Processes");
 
        protected ExecutionModule getExecutionModule(String moduleName,
                        String version) {
@@ -43,7 +40,9 @@ public class DefaultModulesManager implements ExecutionModulesManager {
                        String moduleName, String version) {
                ExecutionModule module = getExecutionModule(moduleName, version);
 
-               Assert.notNull(module);
+               if (module == null)
+                       throw new SlcException("Module " + moduleName + " (" + version
+                                       + ") not found");
 
                return module.getDescriptor();
        }
@@ -56,50 +55,8 @@ public class DefaultModulesManager implements ExecutionModulesManager {
                this.executionModules = executionModules;
        }
 
-       protected Map<String, Object> convertValues(
-                       ExecutionFlowDescriptor executionFlowDescriptor) {
-               // convert the values of flow.getFlowDescriptor()
-               Map<String, Object> values = executionFlowDescriptor.getValues();
-
-               Map<String, Object> convertedValues = new HashMap<String, Object>();
-
-               for (String key : values.keySet()) {
-                       Object value = values.get(key);
-                       if (value instanceof PrimitiveValue) {
-                               PrimitiveValue primitiveValue = (PrimitiveValue) value;
-
-                               // TODO: check that the class of the the primitiveValue.value
-                               // matches
-                               // the primitiveValue.type
-                               convertedValues.put(key, primitiveValue.getValue());
-                       } else if (value instanceof RefValue) {
-                               RefValue refValue = (RefValue) value;
-                               convertedValues.put(key, refValue.getLabel());
-                       }
-               }
-               return convertedValues;
-       }
-
        public void process(SlcExecution slcExecution) {
-               log.info("\n##\n## Process SLC Execution " + slcExecution + "\n##\n");
-
-               for (RealizedFlow flow : slcExecution.getRealizedFlows()) {
-                       ExecutionModule module = getExecutionModule(flow.getModuleName(),
-                                       flow.getModuleVersion());
-                       if (module != null) {
-                               ExecutionThread thread = new ExecutionThread(flow
-                                               .getFlowDescriptor(), module);
-                               thread.start();
-                       } else {
-                               throw new SlcException("ExecutionModule "
-                                               + flow.getModuleName() + ", version "
-                                               + flow.getModuleVersion() + " not found.");
-                       }
-               }
-
-               slcExecution.setStatus(SlcExecution.STATUS_RUNNING);
-               dispatchUpdateStatus(slcExecution, SlcExecution.STATUS_SCHEDULED,
-                               SlcExecution.STATUS_RUNNING);
+               new ProcessThread(processesThreadGroup, slcExecution).start();
        }
 
        protected void dispatchUpdateStatus(SlcExecution slcExecution,
@@ -110,33 +67,129 @@ public class DefaultModulesManager implements ExecutionModulesManager {
                }
        }
 
+       protected synchronized void dispatchAddStep(SlcExecution slcExecution,
+                       SlcExecutionStep step) {
+               slcExecution.getSteps().add(step);
+               List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();
+               steps.add(step);
+               for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
+                               .iterator(); it.hasNext();) {
+                       it.next().addSteps(slcExecution, steps);
+               }
+       }
+
        public void setSlcExecutionNotifiers(
                        List<SlcExecutionNotifier> slcExecutionNotifiers) {
                this.slcExecutionNotifiers = slcExecutionNotifiers;
        }
 
+       /** Thread of the SLC Process, starting the sub executions. */
+       private class ProcessThread extends Thread {
+               private final SlcExecution slcProcess;
+               private final ThreadGroup processThreadGroup;
+               private final List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
+
+               public ProcessThread(ThreadGroup processesThreadGroup,
+                               SlcExecution slcExecution) {
+                       super(processesThreadGroup, "SLC Process #"
+                                       + slcExecution.getUuid());
+                       this.slcProcess = slcExecution;
+                       processThreadGroup = new ThreadGroup("SLC Process #"
+                                       + slcExecution.getUuid() + " thread group");
+               }
+
+               public void run() {
+                       log.info("\n##\n## Process SLC Execution " + slcProcess + "\n##\n");
+
+                       // FIXME: hack to let the SlcExecution be registered on server
+                       try {
+                               Thread.sleep(500);
+                       } catch (InterruptedException e1) {
+                               // silent
+                       }
+
+                       slcProcess.setStatus(SlcExecution.STATUS_RUNNING);
+                       dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED,
+                                       SlcExecution.STATUS_RUNNING);
+
+                       flowsToProcess.addAll(slcProcess.getRealizedFlows());
+
+                       while (flowsToProcess.size() > 0) {
+                               RealizedFlow flow = flowsToProcess.remove(0);
+                               ExecutionModule module = getExecutionModule(flow
+                                               .getModuleName(), flow.getModuleVersion());
+                               if (module != null) {
+                                       ExecutionThread thread = new ExecutionThread(this, flow
+                                                       .getFlowDescriptor(), module);
+                                       thread.start();
+                               } else {
+                                       throw new SlcException("ExecutionModule "
+                                                       + flow.getModuleName() + ", version "
+                                                       + flow.getModuleVersion() + " not found.");
+                               }
+
+                               synchronized (this) {
+                                       try {
+                                               wait();
+                                       } catch (InterruptedException e) {
+                                               // silent
+                                       }
+                               }
+                       }
+
+                       slcProcess.setStatus(SlcExecution.STATUS_FINISHED);
+                       dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING,
+                                       SlcExecution.STATUS_FINISHED);
+               }
+
+               public synchronized void flowCompleted() {
+                       notifyAll();
+               }
+
+               public SlcExecution getSlcProcess() {
+                       return slcProcess;
+               }
+
+               public ThreadGroup getProcessThreadGroup() {
+                       return processThreadGroup;
+               }
+       }
+
+       /** Thread of a single execution */
        private class ExecutionThread extends Thread {
                private final ExecutionFlowDescriptor executionFlowDescriptor;
                private final ExecutionModule executionModule;
+               private final ProcessThread processThread;
 
-               public ExecutionThread(ExecutionFlowDescriptor executionFlowDescriptor,
+               public ExecutionThread(ProcessThread processThread,
+                               ExecutionFlowDescriptor executionFlowDescriptor,
                                ExecutionModule executionModule) {
-                       super("SLC Execution #" /* + executionContext.getUuid() */);
+                       super(processThread.getProcessThreadGroup(), "Flow "
+                                       + executionFlowDescriptor.getName());
                        this.executionFlowDescriptor = executionFlowDescriptor;
                        this.executionModule = executionModule;
+                       this.processThread = processThread;
                }
 
                public void run() {
-                       ExecutionContext executionContext = executionModule
-                                       .getExecutionContext();
-                       executionContext
-                                       .addVariables(convertValues(executionFlowDescriptor));
+                       dispatchAddStep(processThread.getSlcProcess(),
+                                       new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START,
+                                                       "Flow " + executionFlowDescriptor.getName()));
+
                        try {
                                executionModule.execute(executionFlowDescriptor);
                        } catch (Exception e) {
                                // TODO: re-throw exception ?
-                               log.error("Execution " + executionContext.getUuid()
-                                               + " failed.", e);
+                               String msg = "Execution of flow "
+                                               + executionFlowDescriptor.getName() + " failed.";
+                               log.error(msg, e);
+                               dispatchAddStep(processThread.getSlcProcess(),
+                                               new SlcExecutionStep(msg + " " + e.getMessage()));
+                       } finally {
+                               processThread.flowCompleted();
+                               dispatchAddStep(processThread.getSlcProcess(),
+                                               new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END,
+                                                               "Flow " + executionFlowDescriptor.getName()));
                        }
                }
        }