]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java
Adapt to changes in Argeo Commons
[gpl/argeo-slc.git] / runtime / org.argeo.slc.core / src / main / java / org / argeo / slc / core / execution / AbstractExecutionModulesManager.java
index 4722280d9f28663610084e58807b6fd5a40a0e8f..c4832dbc4bceb926b9cc3890d168d716a464780d 100644 (file)
+/*
+ * Copyright (C) 2007-2012 Mathieu Baudier
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.argeo.slc.core.execution;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.argeo.slc.SlcException;
+import org.argeo.slc.execution.ExecutionContext;
 import org.argeo.slc.execution.ExecutionFlow;
-import org.argeo.slc.execution.ExecutionFlowDescriptor;
-import org.argeo.slc.execution.ExecutionModuleDescriptor;
+import org.argeo.slc.execution.ExecutionFlowDescriptorConverter;
 import org.argeo.slc.execution.ExecutionModulesManager;
-import org.argeo.slc.execution.ExecutionSpec;
-import org.argeo.slc.execution.ExecutionSpecAttribute;
+import org.argeo.slc.execution.ExecutionProcess;
+import org.argeo.slc.execution.ExecutionProcessNotifier;
+import org.argeo.slc.execution.ExecutionStep;
 import org.argeo.slc.process.RealizedFlow;
-import org.argeo.slc.process.SlcExecution;
 import org.argeo.slc.process.SlcExecutionNotifier;
-import org.argeo.slc.process.SlcExecutionStep;
-import org.springframework.aop.scope.ScopedObject;
-import org.springframework.util.Assert;
 
+/** Provides the base feature of an execution module manager. */
+@SuppressWarnings("deprecation")
 public abstract class AbstractExecutionModulesManager implements
                ExecutionModulesManager {
        private final static Log log = LogFactory
                        .getLog(AbstractExecutionModulesManager.class);
 
        private List<SlcExecutionNotifier> slcExecutionNotifiers = new ArrayList<SlcExecutionNotifier>();
-       private ThreadGroup processesThreadGroup = new ThreadGroup("Processes");
 
-       public void process(SlcExecution slcExecution) {
-               new ProcessThread(processesThreadGroup, slcExecution).start();
+       private List<FilteredNotifier> filteredNotifiers = Collections
+                       .synchronizedList(new ArrayList<FilteredNotifier>());
+
+       protected abstract ExecutionFlow findExecutionFlow(String moduleName,
+                       String moduleVersion, String flowName);
+
+       protected abstract ExecutionContext findExecutionContext(String moduleName,
+                       String moduleVersion);
+
+       protected abstract ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter(
+                       String moduleName, String moduleVersion);
+
+       public void execute(RealizedFlow realizedFlow) {
+               if (log.isTraceEnabled())
+                       log.trace("Executing " + realizedFlow);
+
+               String moduleName = realizedFlow.getModuleName();
+               String moduleVersion = realizedFlow.getModuleVersion();
+
+               Map<? extends String, ? extends Object> variablesToAdd = getExecutionFlowDescriptorConverter(
+                               moduleName, moduleVersion).convertValues(
+                               realizedFlow.getFlowDescriptor());
+               ExecutionContext executionContext = findExecutionContext(moduleName,
+                               moduleVersion);
+               for (String key : variablesToAdd.keySet())
+                       executionContext.setVariable(key, variablesToAdd.get(key));
+
+               ExecutionFlow flow = findExecutionFlow(moduleName, moduleVersion,
+                               realizedFlow.getFlowDescriptor().getName());
+
+               //
+               // Actually runs the flow, IN THIS THREAD
+               //
+               flow.run();
+               //
+               //
+               //
        }
 
-       protected void dispatchUpdateStatus(SlcExecution slcExecution,
+       public void dispatchUpdateStatus(ExecutionProcess process,
                        String oldStatus, String newStatus) {
-               for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
+               // generic notifiers (notified of everything)
+               for (Iterator<SlcExecutionNotifier> it = getSlcExecutionNotifiers()
                                .iterator(); it.hasNext();) {
-                       it.next().updateStatus(slcExecution, oldStatus, newStatus);
+                       it.next().updateStatus(process, oldStatus, newStatus);
                }
-       }
 
-       protected synchronized void dispatchAddStep(SlcExecution slcExecution,
-                       SlcExecutionStep step) {
-               slcExecution.getSteps().add(step);
-               List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();
-               steps.add(step);
-               for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
-                               .iterator(); it.hasNext();) {
-                       it.next().addSteps(slcExecution, steps);
+               // filtered notifiers
+               for (Iterator<FilteredNotifier> it = filteredNotifiers.iterator(); it
+                               .hasNext();) {
+                       FilteredNotifier filteredNotifier = it.next();
+                       if (filteredNotifier.receiveFrom(process))
+                               filteredNotifier.getNotifier().updateStatus(process, oldStatus,
+                                               newStatus);
                }
-       }
 
-       public void setSlcExecutionNotifiers(
-                       List<SlcExecutionNotifier> slcExecutionNotifiers) {
-               this.slcExecutionNotifiers = slcExecutionNotifiers;
        }
 
-       protected static ExecutionModuleDescriptor createDescriptor(
-                       String moduleName, String moduleVersion,
-                       Map<String, ExecutionFlow> executionFlows) {
-               // TODO: put this in a separate configurable object
-               ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
-               md.setName(moduleName);
-               md.setVersion(moduleVersion);
-
-               for (String name : executionFlows.keySet()) {
-                       ExecutionFlow executionFlow = executionFlows.get(name);
-
-                       Assert.notNull(executionFlow.getName());
-                       Assert.state(name.equals(executionFlow.getName()));
-
-                       ExecutionSpec executionSpec = executionFlow.getExecutionSpec();
-                       Assert.notNull(executionSpec);
-                       Assert.notNull(executionSpec.getName());
-
-                       Map<String, Object> values = new TreeMap<String, Object>();
-                       for (String key : executionSpec.getAttributes().keySet()) {
-                               ExecutionSpecAttribute attribute = executionSpec
-                                               .getAttributes().get(key);
-
-                               if (executionFlow.isSetAsParameter(key)) {
-                                       Object value = executionFlow.getParameter(key);
-                                       if (attribute instanceof PrimitiveSpecAttribute) {
-                                               PrimitiveValue primitiveValue = new PrimitiveValue();
-                                               primitiveValue
-                                                               .setType(((PrimitiveSpecAttribute) attribute)
-                                                                               .getType());
-                                               primitiveValue.setValue(value);
-                                               values.put(key, primitiveValue);
-                                       } else if (attribute instanceof RefSpecAttribute) {
-                                               RefValue refValue = new RefValue();
-                                               if (value instanceof ScopedObject) {
-                                                       refValue.setLabel("RUNTIME "
-                                                                       + value.getClass().getName());
-                                               } else {
-                                                       refValue.setLabel("STATIC "
-                                                                       + value.getClass().getName());
-                                               }
-                                               values.put(key, refValue);
-                                       } else {
-                                               throw new SlcException("Unkown spec attribute type "
-                                                               + attribute.getClass());
-                                       }
-                               }
-
-                       }
-
-                       ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(name,
-                                       values, executionSpec);
-                       if (executionFlow.getPath() != null)
-                               efd.setPath(executionFlow.getPath());
-
-                       // Add execution spec if necessary
-                       if (!md.getExecutionSpecs().contains(executionSpec))
-                               md.getExecutionSpecs().add(executionSpec);
-
-                       // Add execution flow
-                       md.getExecutionFlows().add(efd);
-               }
-
-               return md;
-       }
+       public void dispatchAddSteps(ExecutionProcess process,
+                       List<ExecutionStep> steps) {
+               process.addSteps(steps);
 
-       /** Thread of the SLC Process, starting the sub executions. */
-       private class ProcessThread extends Thread {
-               private final SlcExecution slcProcess;
-               private final ThreadGroup processThreadGroup;
-               private final List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
-
-               public ProcessThread(ThreadGroup processesThreadGroup,
-                               SlcExecution slcExecution) {
-                       super(processesThreadGroup, "SLC Process #"
-                                       + slcExecution.getUuid());
-                       this.slcProcess = slcExecution;
-                       processThreadGroup = new ThreadGroup("SLC Process #"
-                                       + slcExecution.getUuid() + " thread group");
+               for (Iterator<SlcExecutionNotifier> it = getSlcExecutionNotifiers()
+                               .iterator(); it.hasNext();) {
+                       it.next().addSteps(process, steps);
                }
 
-               public void run() {
-                       log.info("\n##\n## Process SLC Execution " + slcProcess + "\n##\n");
+               for (Iterator<FilteredNotifier> it = filteredNotifiers.iterator(); it
+                               .hasNext();) {
+                       FilteredNotifier filteredNotifier = it.next();
+                       if (filteredNotifier.receiveFrom(process))
+                               filteredNotifier.getNotifier().addSteps(process, steps);
+               }
+       }
 
-                       slcProcess.setStatus(SlcExecution.STATUS_RUNNING);
-                       dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED,
-                                       SlcExecution.STATUS_RUNNING);
+       public void registerProcessNotifier(ExecutionProcessNotifier notifier,
+                       Map<String, String> properties) {
+               filteredNotifiers.add(new FilteredNotifier(notifier, properties));
+       }
 
-                       flowsToProcess.addAll(slcProcess.getRealizedFlows());
+       public void unregisterProcessNotifier(ExecutionProcessNotifier notifier,
+                       Map<String, String> properties) {
+               filteredNotifiers.remove(notifier);
+       }
 
-                       while (flowsToProcess.size() > 0) {
-                               RealizedFlow flow = flowsToProcess.remove(0);
-                               ExecutionThread thread = new ExecutionThread(this, flow);
-                               thread.start();
+       public void setSlcExecutionNotifiers(
+                       List<SlcExecutionNotifier> slcExecutionNotifiers) {
+               this.slcExecutionNotifiers = slcExecutionNotifiers;
+       }
 
-                               synchronized (this) {
-                                       try {
-                                               wait();
-                                       } catch (InterruptedException e) {
-                                               // silent
-                                       }
-                               }
-                       }
+       private List<SlcExecutionNotifier> getSlcExecutionNotifiers() {
+               return slcExecutionNotifiers;
+       }
 
-                       slcProcess.setStatus(SlcExecution.STATUS_FINISHED);
-                       dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING,
-                                       SlcExecution.STATUS_FINISHED);
+       protected class FilteredNotifier {
+               private final ExecutionProcessNotifier notifier;
+               private final String processId;
+
+               public FilteredNotifier(ExecutionProcessNotifier notifier,
+                               Map<String, String> properties) {
+                       super();
+                       this.notifier = notifier;
+                       if (properties == null)
+                               properties = new HashMap<String, String>();
+                       if (properties.containsKey(SLC_PROCESS_ID))
+                               processId = properties.get(SLC_PROCESS_ID);
+                       else
+                               processId = null;
                }
 
-               public synchronized void flowCompleted() {
-                       notifyAll();
+               /**
+                * Whether event from this process should be received by this listener.
+                */
+               public Boolean receiveFrom(ExecutionProcess process) {
+                       if (processId != null)
+                               if (process.getUuid().equals(processId))
+                                       return true;
+                               else
+                                       return false;
+                       return true;
                }
 
-               public SlcExecution getSlcProcess() {
-                       return slcProcess;
+               @Override
+               public int hashCode() {
+                       return notifier.hashCode();
                }
 
-               public ThreadGroup getProcessThreadGroup() {
-                       return processThreadGroup;
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj instanceof FilteredNotifier) {
+                               FilteredNotifier fn = (FilteredNotifier) obj;
+                               return notifier.equals(fn.notifier);
+                       } else if (obj instanceof ExecutionProcessNotifier) {
+                               ExecutionProcessNotifier epn = (ExecutionProcessNotifier) obj;
+                               return notifier.equals(epn);
+                       } else
+                               return false;
                }
-       }
 
-       /** Thread of a single execution */
-       private class ExecutionThread extends Thread {
-               private final RealizedFlow realizedFlow;
-               private final ProcessThread processThread;
-
-               public ExecutionThread(ProcessThread processThread,
-                               RealizedFlow realizedFlow) {
-                       super(processThread.getProcessThreadGroup(), "Flow "
-                                       + realizedFlow.getFlowDescriptor().getName());
-                       this.realizedFlow = realizedFlow;
-                       this.processThread = processThread;
+               public ExecutionProcessNotifier getNotifier() {
+                       return notifier;
                }
 
-               public void run() {
-                       ExecutionFlowDescriptor executionFlowDescriptor = realizedFlow
-                                       .getFlowDescriptor();
-                       String flowName = executionFlowDescriptor.getName();
-
-                       dispatchAddStep(processThread.getSlcProcess(),
-                                       new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START,
-                                                       "Flow " + flowName));
-
-                       try {
-                               execute(realizedFlow);
-                       } catch (Exception e) {
-                               // TODO: re-throw exception ?
-                               String msg = "Execution of flow " + flowName + " failed.";
-                               log.error(msg, e);
-                               dispatchAddStep(processThread.getSlcProcess(),
-                                               new SlcExecutionStep(msg + " " + e.getMessage()));
-                       } finally {
-                               processThread.flowCompleted();
-                               dispatchAddStep(processThread.getSlcProcess(),
-                                               new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END,
-                                                               "Flow " + flowName));
-                       }
-               }
        }
-
 }