]> git.argeo.org Git - gpl/argeo-slc.git/commitdiff
Implement kill and process progress
authorMathieu Baudier <mbaudier@argeo.org>
Wed, 6 Jul 2011 16:39:07 +0000 (16:39 +0000)
committerMathieu Baudier <mbaudier@argeo.org>
Wed, 6 Jul 2011 16:39:07 +0000 (16:39 +0000)
git-svn-id: https://svn.argeo.org/slc/trunk@4669 4cfe0d0a-d680-48aa-b62c-e0a02a3f76cc

19 files changed:
eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/SlcImages.java
eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/controllers/ProcessController.java
eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/editors/ProcessBuilderPage.java
eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/editors/ProcessEditor.java
eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/views/JcrProcessListView.java
runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java
runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultExecutionFlow.java
runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ExecutionThread.java
runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java
runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/runtime/DefaultAgent.java
runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/runtime/SimpleAgentFactory.java
runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/execution/ExecutionModulesManager.java
runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/execution/ExecutionProcess.java
runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/process/SlcExecution.java
runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/runtime/SlcAgent.java
runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgentProxy.java
runtime/org.argeo.slc.support.jcr/src/main/java/org/argeo/slc/jcr/execution/JcrAgent.java
runtime/org.argeo.slc.support.jcr/src/main/java/org/argeo/slc/jcr/execution/JcrProcessThread.java
runtime/org.argeo.slc.support.jcr/src/main/resources/org/argeo/slc/jcr/slc.cnd

index aaef64324448410586f24b525ed4ff321691b2c5..4b38db0b70b276b32af35952e914baba78702134 100644 (file)
@@ -17,6 +17,7 @@ public class SlcImages {
        public final static Image ERROR = img("icons/error.gif");
        public final static Image LAUNCH = img("icons/launch.gif");
        public final static Image RELAUNCH = img("icons/relaunch.gif");
+       public final static Image KILL = img("icons/kill.png");
        public final static Image REMOVE_ONE = img("icons/remove_one.gif");
        public final static Image REMOVE_ALL = img("icons/removeAll.png");
        public final static Image EXECUTION_SPECS = img("icons/executionSpecs.gif");
index 328282c54921bb094d29b7671c4954fc6cc6fb3b..d6719dce0c10975b4922015d4a7ee0a59d4d2500 100644 (file)
@@ -6,6 +6,7 @@ import java.util.Map;
 import javax.jcr.Node;
 import javax.jcr.NodeIterator;
 import javax.jcr.Property;
+import javax.jcr.RepositoryException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,34 +30,10 @@ public class ProcessController {
        public ExecutionProcess process(Node processNode) {
                JcrExecutionProcess process = new JcrExecutionProcess(processNode);
                try {
-                       // we currently only deal with single agents
-                       Node realizedFlowNode = processNode.getNode(SlcNames.SLC_FLOW);
-                       NodeIterator nit = realizedFlowNode.getNodes();
-                       if (nit.hasNext()) {
-                               // TODO find a better way to determine which agent to use
-                               // currently we check the agent of the first registered flow
-                               Node firstRealizedFlow = nit.nextNode();
-                               // we assume there is an nt:address
-                               String firstFlowPath = firstRealizedFlow
-                                               .getNode(SlcNames.SLC_ADDRESS)
-                                               .getProperty(Property.JCR_PATH).getString();
-                               Node flowNode = processNode.getSession().getNode(firstFlowPath);
-                               String agentFactoryPath = SlcJcrUtils
-                                               .flowAgentFactoryPath(firstFlowPath);
-                               if (!agentFactories.containsKey(agentFactoryPath))
-                                       throw new SlcException("No agent factory registered under "
-                                                       + agentFactoryPath);
-                               SlcAgentFactory agentFactory = agentFactories
-                                               .get(agentFactoryPath);
-                               Node agentNode = ((Node) flowNode
-                                               .getAncestor(SlcJcrUtils.AGENT_FACTORY_DEPTH + 1));
-                               String agentUuid = agentNode.getProperty(SlcNames.SLC_UUID)
-                                               .getString();
-
-                               // process
-                               SlcAgent slcAgent = agentFactory.getAgent(agentUuid);
-                               slcAgent.process(process);
-                       }
+                       SlcAgent slcAgent = findAgent(processNode);
+                       if (slcAgent == null)
+                               throw new SlcException("Cannot find agent for " + processNode);
+                       slcAgent.process(process);
                        return process;
                } catch (Exception e) {
                        if (!process.getStatus().equals(ExecutionProcess.ERROR))
@@ -65,6 +42,50 @@ public class ProcessController {
                }
        }
 
+       public void kill(Node processNode) {
+               JcrExecutionProcess process = new JcrExecutionProcess(processNode);
+               try {
+                       SlcAgent slcAgent = findAgent(processNode);
+                       if (slcAgent == null)
+                               throw new SlcException("Cannot find agent for " + processNode);
+                       slcAgent.kill(process);
+               } catch (Exception e) {
+                       if (!process.getStatus().equals(ExecutionProcess.ERROR))
+                               process.setStatus(ExecutionProcess.ERROR);
+                       throw new SlcException("Cannot execute " + processNode, e);
+               }
+       }
+
+       protected SlcAgent findAgent(Node processNode) throws RepositoryException {
+               // we currently only deal with single agents
+               Node realizedFlowNode = processNode.getNode(SlcNames.SLC_FLOW);
+               NodeIterator nit = realizedFlowNode.getNodes();
+               if (nit.hasNext()) {
+                       // TODO find a better way to determine which agent to use
+                       // currently we check the agent of the first registered flow
+                       Node firstRealizedFlow = nit.nextNode();
+                       // we assume there is an nt:address
+                       String firstFlowPath = firstRealizedFlow
+                                       .getNode(SlcNames.SLC_ADDRESS)
+                                       .getProperty(Property.JCR_PATH).getString();
+                       Node flowNode = processNode.getSession().getNode(firstFlowPath);
+                       String agentFactoryPath = SlcJcrUtils
+                                       .flowAgentFactoryPath(firstFlowPath);
+                       if (!agentFactories.containsKey(agentFactoryPath))
+                               throw new SlcException("No agent factory registered under "
+                                               + agentFactoryPath);
+                       SlcAgentFactory agentFactory = agentFactories.get(agentFactoryPath);
+                       Node agentNode = ((Node) flowNode
+                                       .getAncestor(SlcJcrUtils.AGENT_FACTORY_DEPTH + 1));
+                       String agentUuid = agentNode.getProperty(SlcNames.SLC_UUID)
+                                       .getString();
+
+                       // process
+                       return agentFactory.getAgent(agentUuid);
+               }
+               return null;
+       }
+
        public synchronized void register(SlcAgentFactory agentFactory,
                        Map<String, String> properties) {
                String path = properties.get(SlcJcrConstants.PROPERTY_PATH);
index fc0e4f1c8624fcb26f07eef0b15edfd160020472..28c2f69c611a6bd485256a70f055f1759a5cc287 100644 (file)
@@ -5,13 +5,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.UUID;
 
 import javax.jcr.Node;
 import javax.jcr.NodeIterator;
 import javax.jcr.Property;
 import javax.jcr.RepositoryException;
-import javax.jcr.Session;
 import javax.jcr.nodetype.NodeType;
 import javax.jcr.observation.Event;
 import javax.jcr.observation.EventListener;
@@ -63,8 +61,6 @@ import org.eclipse.swt.widgets.Button;
 import org.eclipse.swt.widgets.Composite;
 import org.eclipse.swt.widgets.Label;
 import org.eclipse.swt.widgets.Table;
-import org.eclipse.ui.IWorkbenchPage;
-import org.eclipse.ui.PlatformUI;
 import org.eclipse.ui.forms.AbstractFormPart;
 import org.eclipse.ui.forms.IManagedForm;
 import org.eclipse.ui.forms.editor.FormPage;
@@ -125,6 +121,9 @@ public class ProcessBuilderPage extends FormPage implements SlcNames {
                                        Event.PROPERTY_CHANGED, processNode.getPath(), true, null,
                                        null, false);
 
+                       // make sure all controls are in line with status
+                       statusChanged();
+
                        // add initial flows
                        addInitialFlows();
 
@@ -146,7 +145,9 @@ public class ProcessBuilderPage extends FormPage implements SlcNames {
                run.addSelectionListener(new SelectionListener() {
                        public void widgetSelected(SelectionEvent e) {
                                if (isFinished(getProcessStatus())) {
-                                       relaunch();
+                                       ((ProcessEditor) getEditor()).relaunch();
+                               } else if (isRunning(getProcessStatus())) {
+                                       ((ProcessEditor) getEditor()).kill();
                                } else {
                                        ((ProcessEditor) getEditor()).process();
                                }
@@ -191,8 +192,6 @@ public class ProcessBuilderPage extends FormPage implements SlcNames {
                statusComposite.setLayout(new FillLayout());
                statusLabel = tk.createLabel(statusComposite, getProcessStatus());
 
-               // make sure all controls are in line with status
-               statusChanged();
        }
 
        protected void createBuilder(Composite parent) {
@@ -279,59 +278,27 @@ public class ProcessBuilderPage extends FormPage implements SlcNames {
        /*
         * CONTROLLERS
         */
-       /** Opens a new editor with a copy of this process */
-       protected void relaunch() {
-               try {
-                       Node duplicatedNode = duplicateProcess();
-                       IWorkbenchPage activePage = PlatformUI.getWorkbench()
-                                       .getActiveWorkbenchWindow().getActivePage();
-                       activePage.openEditor(
-                                       new ProcessEditorInput(duplicatedNode.getPath()),
-                                       ProcessEditor.ID);
-                       getEditor().close(false);
-               } catch (Exception e1) {
-                       throw new SlcException("Cannot relaunch " + processNode, e1);
-               }
-       }
-
-       /** Duplicates the process */
-       protected Node duplicateProcess() {
-               try {
-                       Session session = processNode.getSession();
-                       String uuid = UUID.randomUUID().toString();
-                       String destPath = SlcJcrUtils.createExecutionProcessPath(uuid);
-                       Node newNode = JcrUtils.mkdirs(session, destPath,
-                                       SlcTypes.SLC_PROCESS);
-                       JcrUtils.copy(processNode, newNode);
-                       // session.getWorkspace().copy(processNode.getPath(), destPath);
-                       // Node newNode = session.getNode(destPath);
-                       // make sure that we kept the mixins
-                       // newNode.addMixin(NodeType.MIX_CREATED);
-                       // newNode.addMixin(NodeType.MIX_LAST_MODIFIED);
-                       newNode.setProperty(SLC_UUID, uuid);
-                       newNode.setProperty(SLC_STATUS, ExecutionProcess.INITIALIZED);
-                       session.save();
-                       return newNode;
-               } catch (RepositoryException e) {
-                       throw new SlcException("Cannot duplicate process", e);
-               }
-       }
-
        /** Reflects a status change */
        protected void statusChanged() {
                String status = getProcessStatus();
                statusLabel.setText(status);
                Boolean isEditable = isEditable(status);
-               run.setEnabled(isEditable);
+               run.setEnabled(status.equals(ExecutionProcess.RUNNING) || isEditable);
                remove.setEnabled(isEditable);
                clear.setEnabled(isEditable);
                // flowsViewer.getTree().setEnabled(isEditable);
-               if (status.equals(ExecutionProcess.COMPLETED)
-                               || status.equals(ExecutionProcess.ERROR)) {
+               if (status.equals(ExecutionProcess.RUNNING)) {
+                       run.setEnabled(true);
+                       run.setImage(SlcImages.KILL);
+                       run.setToolTipText("Kill");
+               } else if (isFinished(status)) {
                        run.setEnabled(true);
                        run.setImage(SlcImages.RELAUNCH);
                        run.setToolTipText("Relaunch");
                }
+
+               if (flowsViewer != null)
+                       flowsViewer.refresh();
        }
 
        /** Adds initial flows from the editor input if any */
@@ -448,14 +415,19 @@ public class ProcessBuilderPage extends FormPage implements SlcNames {
        }
 
        /** Optimization so that we don't call the node each time */
-       protected Boolean isEditable(String status) {
+       protected static Boolean isEditable(String status) {
                return status.equals(ExecutionProcess.NEW)
                                || status.equals(ExecutionProcess.INITIALIZED);
        }
 
-       protected Boolean isFinished(String status) {
+       protected static Boolean isFinished(String status) {
                return status.equals(ExecutionProcess.COMPLETED)
-                               || status.equals(ExecutionProcess.ERROR);
+                               || status.equals(ExecutionProcess.ERROR)
+                               || status.equals(ExecutionProcess.KILLED);
+       }
+
+       protected static Boolean isRunning(String status) {
+               return status.equals(ExecutionProcess.RUNNING);
        }
 
        /*
@@ -556,6 +528,18 @@ public class ProcessBuilderPage extends FormPage implements SlcNames {
                        Node node = (Node) element;
                        try {
                                if (node.isNodeType(SlcTypes.SLC_REALIZED_FLOW)) {
+                                       if (node.hasProperty(SLC_STATUS)) {
+                                               String status = node.getProperty(SLC_STATUS)
+                                                               .getString();
+                                               // TODO: factorize with process view ?
+                                               if (status.equals(ExecutionProcess.RUNNING))
+                                                       return SlcImages.PROCESS_RUNNING;
+                                               else if (status.equals(ExecutionProcess.ERROR)
+                                                               || status.equals(ExecutionProcess.KILLED))
+                                                       return SlcImages.PROCESS_ERROR;
+                                               else if (status.equals(ExecutionProcess.COMPLETED))
+                                                       return SlcImages.PROCESS_COMPLETED;
+                                       }
                                        return SlcImages.FLOW;
                                }
                        } catch (RepositoryException e) {
index b00b41298f3b545f290307beb4488ad7fcb8f06f..57b896272f97d196ceac6ba58795a61f48d62e4a 100644 (file)
@@ -6,6 +6,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import javax.jcr.Node;
+import javax.jcr.NodeIterator;
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 
@@ -24,9 +25,12 @@ import org.argeo.slc.jcr.SlcTypes;
 import org.eclipse.core.runtime.IProgressMonitor;
 import org.eclipse.ui.IEditorInput;
 import org.eclipse.ui.IEditorSite;
+import org.eclipse.ui.IWorkbenchPage;
 import org.eclipse.ui.PartInitException;
+import org.eclipse.ui.PlatformUI;
 import org.eclipse.ui.forms.editor.FormEditor;
 
+/** Editor for an execution process. */
 public class ProcessEditor extends FormEditor implements
                ExecutionProcessNotifier, SlcTypes, SlcNames {
        public final static String ID = ClientUiPlugin.ID + ".processEditor";
@@ -96,7 +100,7 @@ public class ProcessEditor extends FormEditor implements
        }
 
        /** Actually runs the process. */
-       public void process() {
+       void process() {
                // the modifications have to be saved before execution
                try {
                        processNode.setProperty(SLC_STATUS, ExecutionProcess.SCHEDULED);
@@ -105,6 +109,9 @@ public class ProcessEditor extends FormEditor implements
                }
                doSave(null);
                try {
+                       // show log
+                       // setActivePage(logPage.getId());
+
                        ExecutionProcess process = processController.process(processNode);
                        Map<String, String> properties = new HashMap<String, String>();
                        properties.put(ExecutionModulesManager.SLC_PROCESS_ID,
@@ -115,6 +122,56 @@ public class ProcessEditor extends FormEditor implements
                }
        }
 
+       void kill() {
+               processController.kill(processNode);
+       }
+
+       /** Opens a new editor with a copy of this process */
+       void relaunch() {
+               try {
+                       Node duplicatedNode = duplicateProcess();
+                       IWorkbenchPage activePage = PlatformUI.getWorkbench()
+                                       .getActiveWorkbenchWindow().getActivePage();
+                       activePage.openEditor(
+                                       new ProcessEditorInput(duplicatedNode.getPath()),
+                                       ProcessEditor.ID);
+                       close(false);
+               } catch (Exception e1) {
+                       throw new SlcException("Cannot relaunch " + processNode, e1);
+               }
+       }
+
+       /** Duplicates the process */
+       protected Node duplicateProcess() {
+               try {
+                       Session session = processNode.getSession();
+                       String uuid = UUID.randomUUID().toString();
+                       String destPath = SlcJcrUtils.createExecutionProcessPath(uuid);
+                       Node newNode = JcrUtils.mkdirs(session, destPath,
+                                       SlcTypes.SLC_PROCESS);
+
+                       // copy node
+                       JcrUtils.copy(processNode, newNode);
+
+                       newNode.setProperty(SLC_UUID, uuid);
+                       newNode.setProperty(SLC_STATUS, ExecutionProcess.INITIALIZED);
+
+                       // reset realized flow status
+                       Node rootRealizedFlowNode = newNode.getNode(SLC_FLOW);
+                       // we just manage one level for the time being
+                       NodeIterator nit = rootRealizedFlowNode.getNodes(SLC_FLOW);
+                       while (nit.hasNext()) {
+                               nit.nextNode().setProperty(SLC_STATUS,
+                                               ExecutionProcess.INITIALIZED);
+                       }
+
+                       session.save();
+                       return newNode;
+               } catch (RepositoryException e) {
+                       throw new SlcException("Cannot duplicate process", e);
+               }
+       }
+
        @Override
        protected void addPages() {
                try {
index 6c21784819efc64aaf16bde9bc980624490edf83..f3f6f467da93645be00ba2ea0608f102e179cf59 100644 (file)
@@ -175,6 +175,8 @@ public class JcrProcessListView extends ViewPart {
                                        return SlcImages.PROCESS_COMPLETED;
                                else if (status.equals(ExecutionProcess.RUNNING))
                                        return SlcImages.PROCESS_RUNNING;
+                               else if (status.equals(ExecutionProcess.KILLED))
+                                       return SlcImages.PROCESS_ERROR;
                                else
                                        throw new SlcException("Unkown status " + status);
                        } catch (RepositoryException e) {
index 3b561392115a27d2e156493383e3d7121adacd36..b3d92ae69aad4d9bf6ae605e56569ed512d79197 100644 (file)
@@ -18,6 +18,7 @@ package org.argeo.slc.core.execution;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -46,8 +47,6 @@ public abstract class AbstractExecutionModulesManager implements
        private List<FilteredNotifier> filteredNotifiers = Collections
                        .synchronizedList(new ArrayList<FilteredNotifier>());
 
-       private ThreadGroup processesThreadGroup = new ThreadGroup("SLC Processes");
-
        protected abstract ExecutionFlow findExecutionFlow(String moduleName,
                        String moduleVersion, String flowName);
 
@@ -100,6 +99,7 @@ public abstract class AbstractExecutionModulesManager implements
                                filteredNotifier.getNotifier().updateStatus(process, oldStatus,
                                                newStatus);
                }
+
        }
 
        public void dispatchAddSteps(ExecutionProcess process,
@@ -122,6 +122,11 @@ public abstract class AbstractExecutionModulesManager implements
                filteredNotifiers.add(new FilteredNotifier(notifier, properties));
        }
 
+       public void unregisterProcessNotifier(ExecutionProcessNotifier notifier,
+                       Map<String, String> properties) {
+               filteredNotifiers.remove(notifier);
+       }
+
        public void setSlcExecutionNotifiers(
                        List<SlcExecutionNotifier> slcExecutionNotifiers) {
                this.slcExecutionNotifiers = slcExecutionNotifiers;
@@ -131,10 +136,6 @@ public abstract class AbstractExecutionModulesManager implements
                return slcExecutionNotifiers;
        }
 
-       public ThreadGroup getProcessesThreadGroup() {
-               return processesThreadGroup;
-       }
-
        protected class FilteredNotifier {
                private final ExecutionProcessNotifier notifier;
                private final String processId;
@@ -143,6 +144,8 @@ public abstract class AbstractExecutionModulesManager implements
                                Map<String, String> properties) {
                        super();
                        this.notifier = notifier;
+                       if (properties == null)
+                               properties = new HashMap<String, String>();
                        if (properties.containsKey(SLC_PROCESS_ID))
                                processId = properties.get(SLC_PROCESS_ID);
                        else
@@ -168,7 +171,14 @@ public abstract class AbstractExecutionModulesManager implements
 
                @Override
                public boolean equals(Object obj) {
-                       return notifier.equals(obj);
+                       if (obj instanceof FilteredNotifier) {
+                               FilteredNotifier fn = (FilteredNotifier) obj;
+                               return notifier.equals(fn.notifier);
+                       } else if (obj instanceof ExecutionProcessNotifier) {
+                               ExecutionProcessNotifier epn = (ExecutionProcessNotifier) obj;
+                               return notifier.equals(epn);
+                       } else
+                               return false;
                }
 
                public ExecutionProcessNotifier getNotifier() {
index bf1cf99a5f5efc8e3b3da117407ce58c831b0537..afb6ddbd2b8c72f4e955ded044fb0726dbc9f22e 100644 (file)
@@ -36,6 +36,7 @@ import org.springframework.beans.factory.BeanNameAware;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.validation.MapBindingResult;
 
+/** Default implementation of an execution flow. */
 public class DefaultExecutionFlow implements ExecutionFlow, InitializingBean,
                BeanNameAware {
        private final static Log log = LogFactory
@@ -83,13 +84,13 @@ public class DefaultExecutionFlow implements ExecutionFlow, InitializingBean,
                        ExecutionSpecAttribute attr = executionSpec.getAttributes()
                                        .get(key);
 
-                       if (attr.getIsParameter() && !isSetAsParameter(key)) {
-                               errors.rejectValue(key, "Parameter not set");
+                       if (attr.getIsImmutable() && !isSetAsParameter(key)) {
+                               errors.rejectValue(key, "Immutable but not set");
                                break;
                        }
 
-                       if (attr.getIsFrozen() && !isSetAsParameter(key)) {
-                               errors.rejectValue(key, "Frozen but not set as parameter");
+                       if (attr.getIsConstant() && !isSetAsParameter(key)) {
+                               errors.rejectValue(key, "Constant but not set as parameter");
                                break;
                        }
 
@@ -108,9 +109,23 @@ public class DefaultExecutionFlow implements ExecutionFlow, InitializingBean,
        public void run() {
                try {
                        for (Runnable executable : executables) {
+                               if (Thread.interrupted()) {
+                                       log.error("Flow '" + getName() + "' killed before '"
+                                                       + executable + "'");
+                                       Thread.currentThread().interrupt();
+                                       return;
+                                       // throw new ThreadDeath();
+                               }
                                this.doExecuteRunnable(executable);
                        }
                } catch (RuntimeException e) {
+                       if (Thread.interrupted()) {
+                               log.error("Flow '" + getName()
+                                               + "' killed while receiving an unrelated exception", e);
+                               Thread.currentThread().interrupt();
+                               return;
+                               // throw new ThreadDeath();
+                       }
                        if (failOnError)
                                throw e;
                        else {
index b42726dd44d553d27a35bf1222a1680bd3ed0c5d..5e288f264f0de75447afbbbfbeecff7fa82c7392 100644 (file)
@@ -42,7 +42,7 @@ public class ExecutionThread extends Thread {
        public void run() {
                if (getContextClassLoader() != null) {
                        if (log.isTraceEnabled())
-                               log.debug("Context class loader set to "
+                               log.trace("Context class loader set to "
                                                + getContextClassLoader());
                }
 
index dc9e7efdd8a1233d01c1ea7f05e0d69d41c34d92..c11e875b8083df0d627c51dba32587360bdfed22 100644 (file)
@@ -17,6 +17,7 @@
 package org.argeo.slc.core.execution;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -37,29 +38,37 @@ public class ProcessThread extends Thread {
        private final ExecutionProcess process;
        private final ProcessThreadGroup processThreadGroup;
 
-       private Set<ExecutionThread> executionThreads = new HashSet<ExecutionThread>();
+       private Set<ExecutionThread> executionThreads = Collections
+                       .synchronizedSet(new HashSet<ExecutionThread>());
 
        private Boolean hadAnError = false;
+       private Boolean killed = false;
 
-       public ProcessThread(ExecutionModulesManager executionModulesManager,
+       public ProcessThread(ThreadGroup processesThreadGroup,
+                       ExecutionModulesManager executionModulesManager,
                        ExecutionProcess process) {
-               super(executionModulesManager.getProcessesThreadGroup(),
-                               "SLC Process #" + process.getUuid());
+               super(processesThreadGroup, "SLC Process #" + process.getUuid());
                this.executionModulesManager = executionModulesManager;
                this.process = process;
                processThreadGroup = new ProcessThreadGroup(executionModulesManager,
                                this);
        }
 
-       public void run() {
+       public final void run() {
                log.info("\n##\n## SLC Process #" + process.getUuid()
                                + " STARTED\n##\n");
 
-               process.setStatus(SlcExecution.RUNNING);
-               executionModulesManager.dispatchUpdateStatus(process,
-                               SlcExecution.SCHEDULED, SlcExecution.RUNNING);
+               String oldStatus = process.getStatus();
+               process.setStatus(ExecutionProcess.RUNNING);
+               executionModulesManager.dispatchUpdateStatus(process, oldStatus,
+                               ExecutionProcess.RUNNING);
 
-               process();
+               try {
+                       process();
+               } catch (InterruptedException e) {
+                       die();
+                       return;
+               }
 
                // waits for all execution threads to complete (in case they were
                // started asynchronously)
@@ -68,28 +77,51 @@ public class ProcessThread extends Thread {
                                try {
                                        executionThread.join();
                                } catch (InterruptedException e) {
-                                       log.error("Execution thread " + executionThread
-                                                       + " was interrupted");
+                                       die();
+                                       return;
                                }
                        }
                }
 
+               computeFinalStatus();
+       }
+
+       /** Make sure this is called BEFORE all the threads are interrupted. */
+       private void computeFinalStatus() {
+               String oldStatus = process.getStatus();
                // TODO: error management at flow level?
-               if (hadAnError)
-                       process.setStatus(SlcExecution.ERROR);
+               if (killed)
+                       process.setStatus(ExecutionProcess.KILLED);
+               else if (hadAnError)
+                       process.setStatus(ExecutionProcess.ERROR);
                else
-                       process.setStatus(SlcExecution.COMPLETED);
-               executionModulesManager.dispatchUpdateStatus(process,
-                               SlcExecution.RUNNING, process.getStatus());
+                       process.setStatus(ExecutionProcess.COMPLETED);
+               executionModulesManager.dispatchUpdateStatus(process, oldStatus,
+                               process.getStatus());
+               log.info("\n## SLC Process #" + process.getUuid() + " "
+                               + process.getStatus() + "\n");
+       }
 
-               log.info("\n## SLC Process #" + process.getUuid() + " COMPLETED\n");
+       /** Called when being killed */
+       private synchronized void die() {
+               killed = true;
+               computeFinalStatus();
+               for (ExecutionThread executionThread : executionThreads) {
+                       try {
+                               executionThread.interrupt();
+                       } catch (Exception e) {
+                               log.error("Cannot interrupt " + executionThread);
+                       }
+               }
+               processThreadGroup.interrupt();
        }
 
        /**
         * Implementation specific execution. To be overridden in order to deal with
         * custom process types. Default expects an {@link SlcExecution}.
         */
-       protected void process() {
+       @SuppressWarnings("deprecation")
+       protected void process() throws InterruptedException {
                if (!(process instanceof SlcExecution))
                        throw new SlcException("Unsupported process type "
                                        + process.getClass());
@@ -104,27 +136,19 @@ public class ProcessThread extends Thread {
        }
 
        /** @return the (distinct) thread used for this execution */
-       protected Thread execute(RealizedFlow realizedFlow, Boolean synchronous) {
+       protected final void execute(RealizedFlow realizedFlow, Boolean synchronous)
+                       throws InterruptedException {
+               if (killed)
+                       return;
+
                ExecutionThread thread = new ExecutionThread(this, realizedFlow);
                executionThreads.add(thread);
                thread.start();
 
-               if (synchronous) {
-                       try {
-                               thread.join();
-                       } catch (InterruptedException e) {
-                               log.error("Flow " + realizedFlow + " was interrupted", e);
-                       }
-               }
-               return thread;
-
-               // synchronized (this) {
-               // try {
-               // wait();
-               // } catch (InterruptedException e) {
-               // // silent
-               // }
-               // }
+               if (synchronous)
+                       thread.join();
+
+               return;
        }
 
        public void notifyError() {
index 3220e910f2557957733372fa4ef766e6da51a44a..e4676c77419a28eb46a79a8b3913ba42887bbce6 100644 (file)
@@ -18,7 +18,10 @@ package org.argeo.slc.core.runtime;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import org.argeo.slc.SlcException;
@@ -26,15 +29,21 @@ import org.argeo.slc.core.execution.ProcessThread;
 import org.argeo.slc.execution.ExecutionModuleDescriptor;
 import org.argeo.slc.execution.ExecutionModulesManager;
 import org.argeo.slc.execution.ExecutionProcess;
+import org.argeo.slc.execution.ExecutionProcessNotifier;
+import org.argeo.slc.execution.ExecutionStep;
 import org.argeo.slc.process.SlcExecution;
 import org.argeo.slc.runtime.SlcAgent;
 import org.argeo.slc.runtime.SlcAgentDescriptor;
 
 /** Implements the base methods of an SLC agent. */
-public class DefaultAgent implements SlcAgent {
+public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
        private SlcAgentDescriptor agentDescriptor;
        private ExecutionModulesManager modulesManager;
 
+       private ThreadGroup processesThreadGroup;
+       private Map<String, ProcessThread> runningProcesses = Collections
+                       .synchronizedMap(new HashMap<String, ProcessThread>());
+
        /*
         * LIFECYCLE
         */
@@ -46,10 +55,15 @@ public class DefaultAgent implements SlcAgent {
                } catch (UnknownHostException e) {
                        throw new SlcException("Unable to create agent descriptor.", e);
                }
+               processesThreadGroup = new ThreadGroup("SLC Processes of Agent #"
+                               + agentDescriptor.getUuid());
+               modulesManager.registerProcessNotifier(this,
+                               new HashMap<String, String>());
        }
 
        public void dispose() {
-
+               modulesManager.unregisterProcessNotifier(this,
+                               new HashMap<String, String>());
        }
 
        /**
@@ -68,19 +82,29 @@ public class DefaultAgent implements SlcAgent {
        }
 
        public void process(ExecutionProcess process) {
-               ProcessThread processThread = createProcessThread(modulesManager,
-                               process);
+               ProcessThread processThread = createProcessThread(processesThreadGroup,
+                               modulesManager, process);
                processThread.start();
+               runningProcesses.put(process.getUuid(), processThread);
+               // FIXME find a way to remove them from this register
+       }
+
+       public void kill(ExecutionProcess process) {
+               String processUuid = process.getUuid();
+               if (runningProcesses.containsKey(processUuid)) {
+                       runningProcesses.get(processUuid).interrupt();
+               }
        }
 
        /** Creates the thread which will coordinate the execution for this agent. */
        protected ProcessThread createProcessThread(
+                       ThreadGroup processesThreadGroup,
                        ExecutionModulesManager modulesManager, ExecutionProcess process) {
                if (!(process instanceof SlcExecution))
                        throw new SlcException("Unsupported process type "
                                        + process.getClass());
-               ProcessThread processThread = new ProcessThread(modulesManager,
-                               (SlcExecution) process);
+               ProcessThread processThread = new ProcessThread(processesThreadGroup,
+                               modulesManager, (SlcExecution) process);
                return processThread;
        }
 
@@ -97,6 +121,21 @@ public class DefaultAgent implements SlcAgent {
                return true;
        }
 
+       /*
+        * PROCESS NOTIFIER
+        */
+       public void updateStatus(ExecutionProcess process, String oldStatus,
+                       String newStatus) {
+               if (newStatus.equals(ExecutionProcess.COMPLETED)
+                               || newStatus.equals(ExecutionProcess.ERROR)
+                               || newStatus.equals(ExecutionProcess.KILLED)) {
+                       runningProcesses.remove(process.getUuid());
+               }
+       }
+
+       public void addSteps(ExecutionProcess process, List<ExecutionStep> steps) {
+       }
+
        /*
         * BEAN
         */
index 6688ec35ca76571061e4a2421e34dd1b144189e0..49dff140356d1bb0a9ed336b62cb55f51d2fd879 100644 (file)
@@ -5,6 +5,7 @@ import java.util.List;
 import org.argeo.slc.runtime.SlcAgent;
 import org.argeo.slc.runtime.SlcAgentFactory;
 
+/** @deprecated old prototype, should be removed */
 public class SimpleAgentFactory implements SlcAgentFactory {
        private List<SlcAgent> agents;
 
index 88a8b2bfec6d8f55a574415b3976ac63ebba8d1b..3f927a345e9027d33be59b6d463ccee6863c47ee 100644 (file)
@@ -21,7 +21,6 @@ import java.util.Map;
 
 import org.argeo.slc.deploy.ModulesManager;
 import org.argeo.slc.process.RealizedFlow;
-import org.argeo.slc.process.SlcExecutionStep;
 
 /** Provides access to the execution modules */
 public interface ExecutionModulesManager extends ModulesManager {
@@ -38,9 +37,6 @@ public interface ExecutionModulesManager extends ModulesManager {
         */
        public List<ExecutionModuleDescriptor> listExecutionModules();
 
-       /** The thread group to which all process threads will belong. */
-       public ThreadGroup getProcessesThreadGroup();
-
        /** Synchronously finds and executes an {@link ExecutionFlow}. */
        public void execute(RealizedFlow realizedFlow);
 
@@ -58,4 +54,8 @@ public interface ExecutionModulesManager extends ModulesManager {
         */
        public void registerProcessNotifier(ExecutionProcessNotifier notifier,
                        Map<String, String> properties);
+
+       /** Unregisters a notifier */
+       public void unregisterProcessNotifier(ExecutionProcessNotifier notifier,
+                       Map<String, String> properties);
 }
index 7389036ed431c4c1f9df16dcbf1405ae5cb25104..be70b0955d01738287db5967e649889e0cc1a7b9 100644 (file)
@@ -6,6 +6,11 @@ package org.argeo.slc.execution;
  * during and after the execution actually took place, providing an entry point
  * for the definition of executions, their monitoring (e.g. logging) and
  * tracking. A process can be distributed or parallelized.
+ * <br/>
+ * NEW => INITIALIZED => SCHEDULED => RUNNING<br/>
+ * RUNNING => {COMPLETED | ERROR | KILLED}<br/>
+ * {COMPLETED | ERROR | KILLED} => PURGED<br/>
+ * UNKOWN : this is a bug if this status occurs<br/>
  */
 public interface ExecutionProcess {
        /** The process is not yet usable. */
@@ -20,6 +25,8 @@ public interface ExecutionProcess {
        public final static String COMPLETED = "COMPLETED";
        /** The process failed because of an unexpected error. */
        public final static String ERROR = "ERROR";
+       /** The process was killed explicitly or through a crash. */
+       public final static String KILLED = "KILLED";
        /** The status cannot be retrieved (probably because of unexpected errors). */
        public final static String UNKOWN = "UNKOWN";
 
index d11f99922aaca768bcda82a45b73cadbe91f15d9..9b40434628017698b0426d5981a9ce9674bbca27 100644 (file)
@@ -25,6 +25,7 @@ import java.util.TreeMap;
 \r
 import org.argeo.slc.execution.ExecutionProcess;\r
 \r
+/** @deprecated use other implementations of {@link ExecutionProcess} */\r
 public class SlcExecution implements ExecutionProcess, Serializable {\r
        private static final long serialVersionUID = -7607457971382118466L;\r
 \r
index d295f20b63964b6b34e3e0702046ef7c552631cd..b7afe2269b70539222f2546adf744d4962e5375b 100644 (file)
@@ -27,6 +27,12 @@ public interface SlcAgent {
        /** Agent unique identifier */
        public String getAgentUuid();
 
+       /** Execute / take part to this process */
+       public void process(ExecutionProcess process);
+
+       /** Kills this process */
+       public void kill(ExecutionProcess process);
+
        public ExecutionModuleDescriptor getExecutionModuleDescriptor(
                        String moduleName, String version);
 
@@ -35,8 +41,6 @@ public interface SlcAgent {
        /** @deprecated Use {@link #process(ExecutionProcess)} instead. */
        public void runSlcExecution(SlcExecution slcExecution);
 
-       public void process(ExecutionProcess process);
-
        /** @return true if still alive. */
        public boolean ping();
 }
index d3027c93dba717cd308fead2bd5ccf93b5901309..8c08519371afdfe06c246c96775b3fbbaf892ef7 100644 (file)
@@ -96,6 +96,10 @@ public class JmsAgentProxy implements SlcAgent {
                }
        }
 
+       public void kill(ExecutionProcess process) {
+               throw new UnsupportedOperationException();
+       }
+
        protected Object sendReceive(AgentMC messageCreator) {
                long begin = System.currentTimeMillis();
                Object res;
index 69bdba76758bd04fc2819fb3047413f774e41bb8..89b618d1bff74b535c6cf4f3fc02ec9a3b4c6366 100644 (file)
@@ -58,8 +58,9 @@ public class JcrAgent extends DefaultAgent implements SlcAgentFactory, SlcNames
         */
        @Override
        protected ProcessThread createProcessThread(
+                       ThreadGroup processesThreadGroup,
                        ExecutionModulesManager modulesManager, ExecutionProcess process) {
-               return new JcrProcessThread(modulesManager,
+               return new JcrProcessThread(processesThreadGroup, modulesManager,
                                (JcrExecutionProcess) process);
        }
 
index 1c97322bda6f344b49db5f2fd4ceb5a1dce8b14b..eab881f73ff6ccd248a3b1ef709801cf3b134412 100644 (file)
@@ -16,6 +16,7 @@ import org.argeo.slc.core.execution.PrimitiveUtils;
 import org.argeo.slc.core.execution.ProcessThread;
 import org.argeo.slc.execution.ExecutionFlowDescriptor;
 import org.argeo.slc.execution.ExecutionModulesManager;
+import org.argeo.slc.execution.ExecutionProcess;
 import org.argeo.slc.execution.ExecutionSpecAttribute;
 import org.argeo.slc.jcr.SlcJcrUtils;
 import org.argeo.slc.jcr.SlcNames;
@@ -25,19 +26,47 @@ import org.argeo.slc.process.RealizedFlow;
 /** Where the actual execution takes place */
 public class JcrProcessThread extends ProcessThread implements SlcNames {
 
-       public JcrProcessThread(ExecutionModulesManager executionModulesManager,
+       public JcrProcessThread(ThreadGroup processesThreadGroup,
+                       ExecutionModulesManager executionModulesManager,
                        JcrExecutionProcess process) {
-               super(executionModulesManager, process);
+               super(processesThreadGroup, executionModulesManager, process);
        }
 
        @Override
-       protected void process() {
+       protected void process() throws InterruptedException {
                try {
-                       Node realizedFlowNode = getNode().getNode(SLC_FLOW);
+                       Node rootRealizedFlowNode = getNode().getNode(SLC_FLOW);
                        // we just manage one level for the time being
-                       NodeIterator nit = realizedFlowNode.getNodes(SLC_FLOW);
+                       NodeIterator nit = rootRealizedFlowNode.getNodes(SLC_FLOW);
                        while (nit.hasNext()) {
-                               process(nit.nextNode());
+                               Node realizedFlowNode = nit.nextNode();
+
+                               // set status on realized flow
+                               realizedFlowNode.setProperty(SLC_STATUS,
+                                               ExecutionProcess.RUNNING);
+                               realizedFlowNode.getSession().save();
+                               try {
+                                       execute(realizedFlowNode);
+
+                                       // set status on realized flow
+                                       realizedFlowNode.setProperty(SLC_STATUS,
+                                                       ExecutionProcess.COMPLETED);
+                                       realizedFlowNode.getSession().save();
+                               } catch (RepositoryException e) {
+                                       throw e;
+                               } catch (InterruptedException e) {
+                                       // set status on realized flow
+                                       realizedFlowNode.setProperty(SLC_STATUS,
+                                                       ExecutionProcess.KILLED);
+                                       realizedFlowNode.getSession().save();
+                                       throw e;
+                               } catch (RuntimeException e) {
+                                       // set status on realized flow
+                                       realizedFlowNode.setProperty(SLC_STATUS,
+                                                       ExecutionProcess.ERROR);
+                                       realizedFlowNode.getSession().save();
+                                       throw e;
+                               }
                        }
                } catch (RepositoryException e) {
                        throw new ArgeoException("Cannot process " + getNode(), e);
@@ -45,7 +74,8 @@ public class JcrProcessThread extends ProcessThread implements SlcNames {
        }
 
        /** Configure the realized flows */
-       protected void process(Node realizedFlowNode) throws RepositoryException {
+       protected void execute(Node realizedFlowNode) throws RepositoryException,
+                       InterruptedException {
                if (realizedFlowNode.hasNode(SLC_ADDRESS)) {
                        String flowPath = realizedFlowNode.getNode(SLC_ADDRESS)
                                        .getProperty(Property.JCR_PATH).getString();
@@ -100,7 +130,11 @@ public class JcrProcessThread extends ProcessThread implements SlcNames {
                                        values, executionSpec);
                        realizedFlow.setFlowDescriptor(efd);
 
+                       //
+                       // EXECUTE THE FLOW
+                       //
                        execute(realizedFlow, true);
+                       //
                }
        }
 
index d644de3ca63177e54f584600270b0fa6e146015c..0e2a5494acb603fc586b637ed552c3d5d0fdf998 100644 (file)
@@ -65,6 +65,7 @@ mixin
 // - slc:spec (STRING)
 - slc:started (DATE)
 - slc:completed (DATE)
+//- slc:status (STRING)
 + slc:address (nt:address)
 + slc:flow (slc:realizedFlow) *
 // the realized execution spec attributes