From: Mathieu Baudier Date: Wed, 6 Jul 2011 16:39:07 +0000 (+0000) Subject: Implement kill and process progress X-Git-Tag: argeo-slc-2.1.7~909 X-Git-Url: http://git.argeo.org/?a=commitdiff_plain;h=24d560ee846fda5d7954d44f83cb22ab449dbe61;p=gpl%2Fargeo-slc.git Implement kill and process progress git-svn-id: https://svn.argeo.org/slc/trunk@4669 4cfe0d0a-d680-48aa-b62c-e0a02a3f76cc --- diff --git a/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/SlcImages.java b/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/SlcImages.java index aaef64324..4b38db0b7 100644 --- a/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/SlcImages.java +++ b/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/SlcImages.java @@ -17,6 +17,7 @@ public class SlcImages { public final static Image ERROR = img("icons/error.gif"); public final static Image LAUNCH = img("icons/launch.gif"); public final static Image RELAUNCH = img("icons/relaunch.gif"); + public final static Image KILL = img("icons/kill.png"); public final static Image REMOVE_ONE = img("icons/remove_one.gif"); public final static Image REMOVE_ALL = img("icons/removeAll.png"); public final static Image EXECUTION_SPECS = img("icons/executionSpecs.gif"); diff --git a/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/controllers/ProcessController.java b/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/controllers/ProcessController.java index 328282c54..d6719dce0 100644 --- a/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/controllers/ProcessController.java +++ b/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/controllers/ProcessController.java @@ -6,6 +6,7 @@ import java.util.Map; import javax.jcr.Node; import javax.jcr.NodeIterator; import javax.jcr.Property; +import javax.jcr.RepositoryException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,34 +30,10 @@ public class ProcessController { public ExecutionProcess process(Node processNode) { JcrExecutionProcess process = new JcrExecutionProcess(processNode); try { - // we currently only deal with single agents - Node realizedFlowNode = processNode.getNode(SlcNames.SLC_FLOW); - NodeIterator nit = realizedFlowNode.getNodes(); - if (nit.hasNext()) { - // TODO find a better way to determine which agent to use - // currently we check the agent of the first registered flow - Node firstRealizedFlow = nit.nextNode(); - // we assume there is an nt:address - String firstFlowPath = firstRealizedFlow - .getNode(SlcNames.SLC_ADDRESS) - .getProperty(Property.JCR_PATH).getString(); - Node flowNode = processNode.getSession().getNode(firstFlowPath); - String agentFactoryPath = SlcJcrUtils - .flowAgentFactoryPath(firstFlowPath); - if (!agentFactories.containsKey(agentFactoryPath)) - throw new SlcException("No agent factory registered under " - + agentFactoryPath); - SlcAgentFactory agentFactory = agentFactories - .get(agentFactoryPath); - Node agentNode = ((Node) flowNode - .getAncestor(SlcJcrUtils.AGENT_FACTORY_DEPTH + 1)); - String agentUuid = agentNode.getProperty(SlcNames.SLC_UUID) - .getString(); - - // process - SlcAgent slcAgent = agentFactory.getAgent(agentUuid); - slcAgent.process(process); - } + SlcAgent slcAgent = findAgent(processNode); + if (slcAgent == null) + throw new SlcException("Cannot find agent for " + processNode); + slcAgent.process(process); return process; } catch (Exception e) { if (!process.getStatus().equals(ExecutionProcess.ERROR)) @@ -65,6 +42,50 @@ public class ProcessController { } } + public void kill(Node processNode) { + JcrExecutionProcess process = new JcrExecutionProcess(processNode); + try { + SlcAgent slcAgent = findAgent(processNode); + if (slcAgent == null) + throw new SlcException("Cannot find agent for " + processNode); + slcAgent.kill(process); + } catch (Exception e) { + if (!process.getStatus().equals(ExecutionProcess.ERROR)) + process.setStatus(ExecutionProcess.ERROR); + throw new SlcException("Cannot execute " + processNode, e); + } + } + + protected SlcAgent findAgent(Node processNode) throws RepositoryException { + // we currently only deal with single agents + Node realizedFlowNode = processNode.getNode(SlcNames.SLC_FLOW); + NodeIterator nit = realizedFlowNode.getNodes(); + if (nit.hasNext()) { + // TODO find a better way to determine which agent to use + // currently we check the agent of the first registered flow + Node firstRealizedFlow = nit.nextNode(); + // we assume there is an nt:address + String firstFlowPath = firstRealizedFlow + .getNode(SlcNames.SLC_ADDRESS) + .getProperty(Property.JCR_PATH).getString(); + Node flowNode = processNode.getSession().getNode(firstFlowPath); + String agentFactoryPath = SlcJcrUtils + .flowAgentFactoryPath(firstFlowPath); + if (!agentFactories.containsKey(agentFactoryPath)) + throw new SlcException("No agent factory registered under " + + agentFactoryPath); + SlcAgentFactory agentFactory = agentFactories.get(agentFactoryPath); + Node agentNode = ((Node) flowNode + .getAncestor(SlcJcrUtils.AGENT_FACTORY_DEPTH + 1)); + String agentUuid = agentNode.getProperty(SlcNames.SLC_UUID) + .getString(); + + // process + return agentFactory.getAgent(agentUuid); + } + return null; + } + public synchronized void register(SlcAgentFactory agentFactory, Map properties) { String path = properties.get(SlcJcrConstants.PROPERTY_PATH); diff --git a/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/editors/ProcessBuilderPage.java b/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/editors/ProcessBuilderPage.java index fc0e4f1c8..28c2f69c6 100644 --- a/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/editors/ProcessBuilderPage.java +++ b/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/editors/ProcessBuilderPage.java @@ -5,13 +5,11 @@ import java.util.Iterator; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; -import java.util.UUID; import javax.jcr.Node; import javax.jcr.NodeIterator; import javax.jcr.Property; import javax.jcr.RepositoryException; -import javax.jcr.Session; import javax.jcr.nodetype.NodeType; import javax.jcr.observation.Event; import javax.jcr.observation.EventListener; @@ -63,8 +61,6 @@ import org.eclipse.swt.widgets.Button; import org.eclipse.swt.widgets.Composite; import org.eclipse.swt.widgets.Label; import org.eclipse.swt.widgets.Table; -import org.eclipse.ui.IWorkbenchPage; -import org.eclipse.ui.PlatformUI; import org.eclipse.ui.forms.AbstractFormPart; import org.eclipse.ui.forms.IManagedForm; import org.eclipse.ui.forms.editor.FormPage; @@ -125,6 +121,9 @@ public class ProcessBuilderPage extends FormPage implements SlcNames { Event.PROPERTY_CHANGED, processNode.getPath(), true, null, null, false); + // make sure all controls are in line with status + statusChanged(); + // add initial flows addInitialFlows(); @@ -146,7 +145,9 @@ public class ProcessBuilderPage extends FormPage implements SlcNames { run.addSelectionListener(new SelectionListener() { public void widgetSelected(SelectionEvent e) { if (isFinished(getProcessStatus())) { - relaunch(); + ((ProcessEditor) getEditor()).relaunch(); + } else if (isRunning(getProcessStatus())) { + ((ProcessEditor) getEditor()).kill(); } else { ((ProcessEditor) getEditor()).process(); } @@ -191,8 +192,6 @@ public class ProcessBuilderPage extends FormPage implements SlcNames { statusComposite.setLayout(new FillLayout()); statusLabel = tk.createLabel(statusComposite, getProcessStatus()); - // make sure all controls are in line with status - statusChanged(); } protected void createBuilder(Composite parent) { @@ -279,59 +278,27 @@ public class ProcessBuilderPage extends FormPage implements SlcNames { /* * CONTROLLERS */ - /** Opens a new editor with a copy of this process */ - protected void relaunch() { - try { - Node duplicatedNode = duplicateProcess(); - IWorkbenchPage activePage = PlatformUI.getWorkbench() - .getActiveWorkbenchWindow().getActivePage(); - activePage.openEditor( - new ProcessEditorInput(duplicatedNode.getPath()), - ProcessEditor.ID); - getEditor().close(false); - } catch (Exception e1) { - throw new SlcException("Cannot relaunch " + processNode, e1); - } - } - - /** Duplicates the process */ - protected Node duplicateProcess() { - try { - Session session = processNode.getSession(); - String uuid = UUID.randomUUID().toString(); - String destPath = SlcJcrUtils.createExecutionProcessPath(uuid); - Node newNode = JcrUtils.mkdirs(session, destPath, - SlcTypes.SLC_PROCESS); - JcrUtils.copy(processNode, newNode); - // session.getWorkspace().copy(processNode.getPath(), destPath); - // Node newNode = session.getNode(destPath); - // make sure that we kept the mixins - // newNode.addMixin(NodeType.MIX_CREATED); - // newNode.addMixin(NodeType.MIX_LAST_MODIFIED); - newNode.setProperty(SLC_UUID, uuid); - newNode.setProperty(SLC_STATUS, ExecutionProcess.INITIALIZED); - session.save(); - return newNode; - } catch (RepositoryException e) { - throw new SlcException("Cannot duplicate process", e); - } - } - /** Reflects a status change */ protected void statusChanged() { String status = getProcessStatus(); statusLabel.setText(status); Boolean isEditable = isEditable(status); - run.setEnabled(isEditable); + run.setEnabled(status.equals(ExecutionProcess.RUNNING) || isEditable); remove.setEnabled(isEditable); clear.setEnabled(isEditable); // flowsViewer.getTree().setEnabled(isEditable); - if (status.equals(ExecutionProcess.COMPLETED) - || status.equals(ExecutionProcess.ERROR)) { + if (status.equals(ExecutionProcess.RUNNING)) { + run.setEnabled(true); + run.setImage(SlcImages.KILL); + run.setToolTipText("Kill"); + } else if (isFinished(status)) { run.setEnabled(true); run.setImage(SlcImages.RELAUNCH); run.setToolTipText("Relaunch"); } + + if (flowsViewer != null) + flowsViewer.refresh(); } /** Adds initial flows from the editor input if any */ @@ -448,14 +415,19 @@ public class ProcessBuilderPage extends FormPage implements SlcNames { } /** Optimization so that we don't call the node each time */ - protected Boolean isEditable(String status) { + protected static Boolean isEditable(String status) { return status.equals(ExecutionProcess.NEW) || status.equals(ExecutionProcess.INITIALIZED); } - protected Boolean isFinished(String status) { + protected static Boolean isFinished(String status) { return status.equals(ExecutionProcess.COMPLETED) - || status.equals(ExecutionProcess.ERROR); + || status.equals(ExecutionProcess.ERROR) + || status.equals(ExecutionProcess.KILLED); + } + + protected static Boolean isRunning(String status) { + return status.equals(ExecutionProcess.RUNNING); } /* @@ -556,6 +528,18 @@ public class ProcessBuilderPage extends FormPage implements SlcNames { Node node = (Node) element; try { if (node.isNodeType(SlcTypes.SLC_REALIZED_FLOW)) { + if (node.hasProperty(SLC_STATUS)) { + String status = node.getProperty(SLC_STATUS) + .getString(); + // TODO: factorize with process view ? + if (status.equals(ExecutionProcess.RUNNING)) + return SlcImages.PROCESS_RUNNING; + else if (status.equals(ExecutionProcess.ERROR) + || status.equals(ExecutionProcess.KILLED)) + return SlcImages.PROCESS_ERROR; + else if (status.equals(ExecutionProcess.COMPLETED)) + return SlcImages.PROCESS_COMPLETED; + } return SlcImages.FLOW; } } catch (RepositoryException e) { diff --git a/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/editors/ProcessEditor.java b/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/editors/ProcessEditor.java index b00b41298..57b896272 100644 --- a/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/editors/ProcessEditor.java +++ b/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/editors/ProcessEditor.java @@ -6,6 +6,7 @@ import java.util.Map; import java.util.UUID; import javax.jcr.Node; +import javax.jcr.NodeIterator; import javax.jcr.RepositoryException; import javax.jcr.Session; @@ -24,9 +25,12 @@ import org.argeo.slc.jcr.SlcTypes; import org.eclipse.core.runtime.IProgressMonitor; import org.eclipse.ui.IEditorInput; import org.eclipse.ui.IEditorSite; +import org.eclipse.ui.IWorkbenchPage; import org.eclipse.ui.PartInitException; +import org.eclipse.ui.PlatformUI; import org.eclipse.ui.forms.editor.FormEditor; +/** Editor for an execution process. */ public class ProcessEditor extends FormEditor implements ExecutionProcessNotifier, SlcTypes, SlcNames { public final static String ID = ClientUiPlugin.ID + ".processEditor"; @@ -96,7 +100,7 @@ public class ProcessEditor extends FormEditor implements } /** Actually runs the process. */ - public void process() { + void process() { // the modifications have to be saved before execution try { processNode.setProperty(SLC_STATUS, ExecutionProcess.SCHEDULED); @@ -105,6 +109,9 @@ public class ProcessEditor extends FormEditor implements } doSave(null); try { + // show log + // setActivePage(logPage.getId()); + ExecutionProcess process = processController.process(processNode); Map properties = new HashMap(); properties.put(ExecutionModulesManager.SLC_PROCESS_ID, @@ -115,6 +122,56 @@ public class ProcessEditor extends FormEditor implements } } + void kill() { + processController.kill(processNode); + } + + /** Opens a new editor with a copy of this process */ + void relaunch() { + try { + Node duplicatedNode = duplicateProcess(); + IWorkbenchPage activePage = PlatformUI.getWorkbench() + .getActiveWorkbenchWindow().getActivePage(); + activePage.openEditor( + new ProcessEditorInput(duplicatedNode.getPath()), + ProcessEditor.ID); + close(false); + } catch (Exception e1) { + throw new SlcException("Cannot relaunch " + processNode, e1); + } + } + + /** Duplicates the process */ + protected Node duplicateProcess() { + try { + Session session = processNode.getSession(); + String uuid = UUID.randomUUID().toString(); + String destPath = SlcJcrUtils.createExecutionProcessPath(uuid); + Node newNode = JcrUtils.mkdirs(session, destPath, + SlcTypes.SLC_PROCESS); + + // copy node + JcrUtils.copy(processNode, newNode); + + newNode.setProperty(SLC_UUID, uuid); + newNode.setProperty(SLC_STATUS, ExecutionProcess.INITIALIZED); + + // reset realized flow status + Node rootRealizedFlowNode = newNode.getNode(SLC_FLOW); + // we just manage one level for the time being + NodeIterator nit = rootRealizedFlowNode.getNodes(SLC_FLOW); + while (nit.hasNext()) { + nit.nextNode().setProperty(SLC_STATUS, + ExecutionProcess.INITIALIZED); + } + + session.save(); + return newNode; + } catch (RepositoryException e) { + throw new SlcException("Cannot duplicate process", e); + } + } + @Override protected void addPages() { try { diff --git a/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/views/JcrProcessListView.java b/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/views/JcrProcessListView.java index 6c2178481..f3f6f467d 100644 --- a/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/views/JcrProcessListView.java +++ b/eclipse/plugins/org.argeo.slc.client.ui/src/main/java/org/argeo/slc/client/ui/views/JcrProcessListView.java @@ -175,6 +175,8 @@ public class JcrProcessListView extends ViewPart { return SlcImages.PROCESS_COMPLETED; else if (status.equals(ExecutionProcess.RUNNING)) return SlcImages.PROCESS_RUNNING; + else if (status.equals(ExecutionProcess.KILLED)) + return SlcImages.PROCESS_ERROR; else throw new SlcException("Unkown status " + status); } catch (RepositoryException e) { diff --git a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java index 3b5613921..b3d92ae69 100644 --- a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java +++ b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java @@ -18,6 +18,7 @@ package org.argeo.slc.core.execution; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -46,8 +47,6 @@ public abstract class AbstractExecutionModulesManager implements private List filteredNotifiers = Collections .synchronizedList(new ArrayList()); - private ThreadGroup processesThreadGroup = new ThreadGroup("SLC Processes"); - protected abstract ExecutionFlow findExecutionFlow(String moduleName, String moduleVersion, String flowName); @@ -100,6 +99,7 @@ public abstract class AbstractExecutionModulesManager implements filteredNotifier.getNotifier().updateStatus(process, oldStatus, newStatus); } + } public void dispatchAddSteps(ExecutionProcess process, @@ -122,6 +122,11 @@ public abstract class AbstractExecutionModulesManager implements filteredNotifiers.add(new FilteredNotifier(notifier, properties)); } + public void unregisterProcessNotifier(ExecutionProcessNotifier notifier, + Map properties) { + filteredNotifiers.remove(notifier); + } + public void setSlcExecutionNotifiers( List slcExecutionNotifiers) { this.slcExecutionNotifiers = slcExecutionNotifiers; @@ -131,10 +136,6 @@ public abstract class AbstractExecutionModulesManager implements return slcExecutionNotifiers; } - public ThreadGroup getProcessesThreadGroup() { - return processesThreadGroup; - } - protected class FilteredNotifier { private final ExecutionProcessNotifier notifier; private final String processId; @@ -143,6 +144,8 @@ public abstract class AbstractExecutionModulesManager implements Map properties) { super(); this.notifier = notifier; + if (properties == null) + properties = new HashMap(); if (properties.containsKey(SLC_PROCESS_ID)) processId = properties.get(SLC_PROCESS_ID); else @@ -168,7 +171,14 @@ public abstract class AbstractExecutionModulesManager implements @Override public boolean equals(Object obj) { - return notifier.equals(obj); + if (obj instanceof FilteredNotifier) { + FilteredNotifier fn = (FilteredNotifier) obj; + return notifier.equals(fn.notifier); + } else if (obj instanceof ExecutionProcessNotifier) { + ExecutionProcessNotifier epn = (ExecutionProcessNotifier) obj; + return notifier.equals(epn); + } else + return false; } public ExecutionProcessNotifier getNotifier() { diff --git a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultExecutionFlow.java b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultExecutionFlow.java index bf1cf99a5..afb6ddbd2 100644 --- a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultExecutionFlow.java +++ b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultExecutionFlow.java @@ -36,6 +36,7 @@ import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.InitializingBean; import org.springframework.validation.MapBindingResult; +/** Default implementation of an execution flow. */ public class DefaultExecutionFlow implements ExecutionFlow, InitializingBean, BeanNameAware { private final static Log log = LogFactory @@ -83,13 +84,13 @@ public class DefaultExecutionFlow implements ExecutionFlow, InitializingBean, ExecutionSpecAttribute attr = executionSpec.getAttributes() .get(key); - if (attr.getIsParameter() && !isSetAsParameter(key)) { - errors.rejectValue(key, "Parameter not set"); + if (attr.getIsImmutable() && !isSetAsParameter(key)) { + errors.rejectValue(key, "Immutable but not set"); break; } - if (attr.getIsFrozen() && !isSetAsParameter(key)) { - errors.rejectValue(key, "Frozen but not set as parameter"); + if (attr.getIsConstant() && !isSetAsParameter(key)) { + errors.rejectValue(key, "Constant but not set as parameter"); break; } @@ -108,9 +109,23 @@ public class DefaultExecutionFlow implements ExecutionFlow, InitializingBean, public void run() { try { for (Runnable executable : executables) { + if (Thread.interrupted()) { + log.error("Flow '" + getName() + "' killed before '" + + executable + "'"); + Thread.currentThread().interrupt(); + return; + // throw new ThreadDeath(); + } this.doExecuteRunnable(executable); } } catch (RuntimeException e) { + if (Thread.interrupted()) { + log.error("Flow '" + getName() + + "' killed while receiving an unrelated exception", e); + Thread.currentThread().interrupt(); + return; + // throw new ThreadDeath(); + } if (failOnError) throw e; else { diff --git a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ExecutionThread.java b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ExecutionThread.java index b42726dd4..5e288f264 100644 --- a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ExecutionThread.java +++ b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ExecutionThread.java @@ -42,7 +42,7 @@ public class ExecutionThread extends Thread { public void run() { if (getContextClassLoader() != null) { if (log.isTraceEnabled()) - log.debug("Context class loader set to " + log.trace("Context class loader set to " + getContextClassLoader()); } diff --git a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java index dc9e7efdd..c11e875b8 100644 --- a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java +++ b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java @@ -17,6 +17,7 @@ package org.argeo.slc.core.execution; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -37,29 +38,37 @@ public class ProcessThread extends Thread { private final ExecutionProcess process; private final ProcessThreadGroup processThreadGroup; - private Set executionThreads = new HashSet(); + private Set executionThreads = Collections + .synchronizedSet(new HashSet()); private Boolean hadAnError = false; + private Boolean killed = false; - public ProcessThread(ExecutionModulesManager executionModulesManager, + public ProcessThread(ThreadGroup processesThreadGroup, + ExecutionModulesManager executionModulesManager, ExecutionProcess process) { - super(executionModulesManager.getProcessesThreadGroup(), - "SLC Process #" + process.getUuid()); + super(processesThreadGroup, "SLC Process #" + process.getUuid()); this.executionModulesManager = executionModulesManager; this.process = process; processThreadGroup = new ProcessThreadGroup(executionModulesManager, this); } - public void run() { + public final void run() { log.info("\n##\n## SLC Process #" + process.getUuid() + " STARTED\n##\n"); - process.setStatus(SlcExecution.RUNNING); - executionModulesManager.dispatchUpdateStatus(process, - SlcExecution.SCHEDULED, SlcExecution.RUNNING); + String oldStatus = process.getStatus(); + process.setStatus(ExecutionProcess.RUNNING); + executionModulesManager.dispatchUpdateStatus(process, oldStatus, + ExecutionProcess.RUNNING); - process(); + try { + process(); + } catch (InterruptedException e) { + die(); + return; + } // waits for all execution threads to complete (in case they were // started asynchronously) @@ -68,28 +77,51 @@ public class ProcessThread extends Thread { try { executionThread.join(); } catch (InterruptedException e) { - log.error("Execution thread " + executionThread - + " was interrupted"); + die(); + return; } } } + computeFinalStatus(); + } + + /** Make sure this is called BEFORE all the threads are interrupted. */ + private void computeFinalStatus() { + String oldStatus = process.getStatus(); // TODO: error management at flow level? - if (hadAnError) - process.setStatus(SlcExecution.ERROR); + if (killed) + process.setStatus(ExecutionProcess.KILLED); + else if (hadAnError) + process.setStatus(ExecutionProcess.ERROR); else - process.setStatus(SlcExecution.COMPLETED); - executionModulesManager.dispatchUpdateStatus(process, - SlcExecution.RUNNING, process.getStatus()); + process.setStatus(ExecutionProcess.COMPLETED); + executionModulesManager.dispatchUpdateStatus(process, oldStatus, + process.getStatus()); + log.info("\n## SLC Process #" + process.getUuid() + " " + + process.getStatus() + "\n"); + } - log.info("\n## SLC Process #" + process.getUuid() + " COMPLETED\n"); + /** Called when being killed */ + private synchronized void die() { + killed = true; + computeFinalStatus(); + for (ExecutionThread executionThread : executionThreads) { + try { + executionThread.interrupt(); + } catch (Exception e) { + log.error("Cannot interrupt " + executionThread); + } + } + processThreadGroup.interrupt(); } /** * Implementation specific execution. To be overridden in order to deal with * custom process types. Default expects an {@link SlcExecution}. */ - protected void process() { + @SuppressWarnings("deprecation") + protected void process() throws InterruptedException { if (!(process instanceof SlcExecution)) throw new SlcException("Unsupported process type " + process.getClass()); @@ -104,27 +136,19 @@ public class ProcessThread extends Thread { } /** @return the (distinct) thread used for this execution */ - protected Thread execute(RealizedFlow realizedFlow, Boolean synchronous) { + protected final void execute(RealizedFlow realizedFlow, Boolean synchronous) + throws InterruptedException { + if (killed) + return; + ExecutionThread thread = new ExecutionThread(this, realizedFlow); executionThreads.add(thread); thread.start(); - if (synchronous) { - try { - thread.join(); - } catch (InterruptedException e) { - log.error("Flow " + realizedFlow + " was interrupted", e); - } - } - return thread; - - // synchronized (this) { - // try { - // wait(); - // } catch (InterruptedException e) { - // // silent - // } - // } + if (synchronous) + thread.join(); + + return; } public void notifyError() { diff --git a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/runtime/DefaultAgent.java b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/runtime/DefaultAgent.java index 3220e910f..e4676c774 100644 --- a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/runtime/DefaultAgent.java +++ b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/runtime/DefaultAgent.java @@ -18,7 +18,10 @@ package org.argeo.slc.core.runtime; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import org.argeo.slc.SlcException; @@ -26,15 +29,21 @@ import org.argeo.slc.core.execution.ProcessThread; import org.argeo.slc.execution.ExecutionModuleDescriptor; import org.argeo.slc.execution.ExecutionModulesManager; import org.argeo.slc.execution.ExecutionProcess; +import org.argeo.slc.execution.ExecutionProcessNotifier; +import org.argeo.slc.execution.ExecutionStep; import org.argeo.slc.process.SlcExecution; import org.argeo.slc.runtime.SlcAgent; import org.argeo.slc.runtime.SlcAgentDescriptor; /** Implements the base methods of an SLC agent. */ -public class DefaultAgent implements SlcAgent { +public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { private SlcAgentDescriptor agentDescriptor; private ExecutionModulesManager modulesManager; + private ThreadGroup processesThreadGroup; + private Map runningProcesses = Collections + .synchronizedMap(new HashMap()); + /* * LIFECYCLE */ @@ -46,10 +55,15 @@ public class DefaultAgent implements SlcAgent { } catch (UnknownHostException e) { throw new SlcException("Unable to create agent descriptor.", e); } + processesThreadGroup = new ThreadGroup("SLC Processes of Agent #" + + agentDescriptor.getUuid()); + modulesManager.registerProcessNotifier(this, + new HashMap()); } public void dispose() { - + modulesManager.unregisterProcessNotifier(this, + new HashMap()); } /** @@ -68,19 +82,29 @@ public class DefaultAgent implements SlcAgent { } public void process(ExecutionProcess process) { - ProcessThread processThread = createProcessThread(modulesManager, - process); + ProcessThread processThread = createProcessThread(processesThreadGroup, + modulesManager, process); processThread.start(); + runningProcesses.put(process.getUuid(), processThread); + // FIXME find a way to remove them from this register + } + + public void kill(ExecutionProcess process) { + String processUuid = process.getUuid(); + if (runningProcesses.containsKey(processUuid)) { + runningProcesses.get(processUuid).interrupt(); + } } /** Creates the thread which will coordinate the execution for this agent. */ protected ProcessThread createProcessThread( + ThreadGroup processesThreadGroup, ExecutionModulesManager modulesManager, ExecutionProcess process) { if (!(process instanceof SlcExecution)) throw new SlcException("Unsupported process type " + process.getClass()); - ProcessThread processThread = new ProcessThread(modulesManager, - (SlcExecution) process); + ProcessThread processThread = new ProcessThread(processesThreadGroup, + modulesManager, (SlcExecution) process); return processThread; } @@ -97,6 +121,21 @@ public class DefaultAgent implements SlcAgent { return true; } + /* + * PROCESS NOTIFIER + */ + public void updateStatus(ExecutionProcess process, String oldStatus, + String newStatus) { + if (newStatus.equals(ExecutionProcess.COMPLETED) + || newStatus.equals(ExecutionProcess.ERROR) + || newStatus.equals(ExecutionProcess.KILLED)) { + runningProcesses.remove(process.getUuid()); + } + } + + public void addSteps(ExecutionProcess process, List steps) { + } + /* * BEAN */ diff --git a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/runtime/SimpleAgentFactory.java b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/runtime/SimpleAgentFactory.java index 6688ec35c..49dff1403 100644 --- a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/runtime/SimpleAgentFactory.java +++ b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/runtime/SimpleAgentFactory.java @@ -5,6 +5,7 @@ import java.util.List; import org.argeo.slc.runtime.SlcAgent; import org.argeo.slc.runtime.SlcAgentFactory; +/** @deprecated old prototype, should be removed */ public class SimpleAgentFactory implements SlcAgentFactory { private List agents; diff --git a/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/execution/ExecutionModulesManager.java b/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/execution/ExecutionModulesManager.java index 88a8b2bfe..3f927a345 100644 --- a/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/execution/ExecutionModulesManager.java +++ b/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/execution/ExecutionModulesManager.java @@ -21,7 +21,6 @@ import java.util.Map; import org.argeo.slc.deploy.ModulesManager; import org.argeo.slc.process.RealizedFlow; -import org.argeo.slc.process.SlcExecutionStep; /** Provides access to the execution modules */ public interface ExecutionModulesManager extends ModulesManager { @@ -38,9 +37,6 @@ public interface ExecutionModulesManager extends ModulesManager { */ public List listExecutionModules(); - /** The thread group to which all process threads will belong. */ - public ThreadGroup getProcessesThreadGroup(); - /** Synchronously finds and executes an {@link ExecutionFlow}. */ public void execute(RealizedFlow realizedFlow); @@ -58,4 +54,8 @@ public interface ExecutionModulesManager extends ModulesManager { */ public void registerProcessNotifier(ExecutionProcessNotifier notifier, Map properties); + + /** Unregisters a notifier */ + public void unregisterProcessNotifier(ExecutionProcessNotifier notifier, + Map properties); } diff --git a/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/execution/ExecutionProcess.java b/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/execution/ExecutionProcess.java index 7389036ed..be70b0955 100644 --- a/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/execution/ExecutionProcess.java +++ b/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/execution/ExecutionProcess.java @@ -6,6 +6,11 @@ package org.argeo.slc.execution; * during and after the execution actually took place, providing an entry point * for the definition of executions, their monitoring (e.g. logging) and * tracking. A process can be distributed or parallelized. + *
+ * NEW => INITIALIZED => SCHEDULED => RUNNING
+ * RUNNING => {COMPLETED | ERROR | KILLED}
+ * {COMPLETED | ERROR | KILLED} => PURGED
+ * UNKOWN : this is a bug if this status occurs
*/ public interface ExecutionProcess { /** The process is not yet usable. */ @@ -20,6 +25,8 @@ public interface ExecutionProcess { public final static String COMPLETED = "COMPLETED"; /** The process failed because of an unexpected error. */ public final static String ERROR = "ERROR"; + /** The process was killed explicitly or through a crash. */ + public final static String KILLED = "KILLED"; /** The status cannot be retrieved (probably because of unexpected errors). */ public final static String UNKOWN = "UNKOWN"; diff --git a/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/process/SlcExecution.java b/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/process/SlcExecution.java index d11f99922..9b4043462 100644 --- a/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/process/SlcExecution.java +++ b/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/process/SlcExecution.java @@ -25,6 +25,7 @@ import java.util.TreeMap; import org.argeo.slc.execution.ExecutionProcess; +/** @deprecated use other implementations of {@link ExecutionProcess} */ public class SlcExecution implements ExecutionProcess, Serializable { private static final long serialVersionUID = -7607457971382118466L; diff --git a/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/runtime/SlcAgent.java b/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/runtime/SlcAgent.java index d295f20b6..b7afe2269 100644 --- a/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/runtime/SlcAgent.java +++ b/runtime/org.argeo.slc.specs/src/main/java/org/argeo/slc/runtime/SlcAgent.java @@ -27,6 +27,12 @@ public interface SlcAgent { /** Agent unique identifier */ public String getAgentUuid(); + /** Execute / take part to this process */ + public void process(ExecutionProcess process); + + /** Kills this process */ + public void kill(ExecutionProcess process); + public ExecutionModuleDescriptor getExecutionModuleDescriptor( String moduleName, String version); @@ -35,8 +41,6 @@ public interface SlcAgent { /** @deprecated Use {@link #process(ExecutionProcess)} instead. */ public void runSlcExecution(SlcExecution slcExecution); - public void process(ExecutionProcess process); - /** @return true if still alive. */ public boolean ping(); } diff --git a/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgentProxy.java b/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgentProxy.java index d3027c93d..8c0851937 100644 --- a/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgentProxy.java +++ b/runtime/org.argeo.slc.support.activemq/src/main/java/org/argeo/slc/jms/JmsAgentProxy.java @@ -96,6 +96,10 @@ public class JmsAgentProxy implements SlcAgent { } } + public void kill(ExecutionProcess process) { + throw new UnsupportedOperationException(); + } + protected Object sendReceive(AgentMC messageCreator) { long begin = System.currentTimeMillis(); Object res; diff --git a/runtime/org.argeo.slc.support.jcr/src/main/java/org/argeo/slc/jcr/execution/JcrAgent.java b/runtime/org.argeo.slc.support.jcr/src/main/java/org/argeo/slc/jcr/execution/JcrAgent.java index 69bdba767..89b618d1b 100644 --- a/runtime/org.argeo.slc.support.jcr/src/main/java/org/argeo/slc/jcr/execution/JcrAgent.java +++ b/runtime/org.argeo.slc.support.jcr/src/main/java/org/argeo/slc/jcr/execution/JcrAgent.java @@ -58,8 +58,9 @@ public class JcrAgent extends DefaultAgent implements SlcAgentFactory, SlcNames */ @Override protected ProcessThread createProcessThread( + ThreadGroup processesThreadGroup, ExecutionModulesManager modulesManager, ExecutionProcess process) { - return new JcrProcessThread(modulesManager, + return new JcrProcessThread(processesThreadGroup, modulesManager, (JcrExecutionProcess) process); } diff --git a/runtime/org.argeo.slc.support.jcr/src/main/java/org/argeo/slc/jcr/execution/JcrProcessThread.java b/runtime/org.argeo.slc.support.jcr/src/main/java/org/argeo/slc/jcr/execution/JcrProcessThread.java index 1c97322bd..eab881f73 100644 --- a/runtime/org.argeo.slc.support.jcr/src/main/java/org/argeo/slc/jcr/execution/JcrProcessThread.java +++ b/runtime/org.argeo.slc.support.jcr/src/main/java/org/argeo/slc/jcr/execution/JcrProcessThread.java @@ -16,6 +16,7 @@ import org.argeo.slc.core.execution.PrimitiveUtils; import org.argeo.slc.core.execution.ProcessThread; import org.argeo.slc.execution.ExecutionFlowDescriptor; import org.argeo.slc.execution.ExecutionModulesManager; +import org.argeo.slc.execution.ExecutionProcess; import org.argeo.slc.execution.ExecutionSpecAttribute; import org.argeo.slc.jcr.SlcJcrUtils; import org.argeo.slc.jcr.SlcNames; @@ -25,19 +26,47 @@ import org.argeo.slc.process.RealizedFlow; /** Where the actual execution takes place */ public class JcrProcessThread extends ProcessThread implements SlcNames { - public JcrProcessThread(ExecutionModulesManager executionModulesManager, + public JcrProcessThread(ThreadGroup processesThreadGroup, + ExecutionModulesManager executionModulesManager, JcrExecutionProcess process) { - super(executionModulesManager, process); + super(processesThreadGroup, executionModulesManager, process); } @Override - protected void process() { + protected void process() throws InterruptedException { try { - Node realizedFlowNode = getNode().getNode(SLC_FLOW); + Node rootRealizedFlowNode = getNode().getNode(SLC_FLOW); // we just manage one level for the time being - NodeIterator nit = realizedFlowNode.getNodes(SLC_FLOW); + NodeIterator nit = rootRealizedFlowNode.getNodes(SLC_FLOW); while (nit.hasNext()) { - process(nit.nextNode()); + Node realizedFlowNode = nit.nextNode(); + + // set status on realized flow + realizedFlowNode.setProperty(SLC_STATUS, + ExecutionProcess.RUNNING); + realizedFlowNode.getSession().save(); + try { + execute(realizedFlowNode); + + // set status on realized flow + realizedFlowNode.setProperty(SLC_STATUS, + ExecutionProcess.COMPLETED); + realizedFlowNode.getSession().save(); + } catch (RepositoryException e) { + throw e; + } catch (InterruptedException e) { + // set status on realized flow + realizedFlowNode.setProperty(SLC_STATUS, + ExecutionProcess.KILLED); + realizedFlowNode.getSession().save(); + throw e; + } catch (RuntimeException e) { + // set status on realized flow + realizedFlowNode.setProperty(SLC_STATUS, + ExecutionProcess.ERROR); + realizedFlowNode.getSession().save(); + throw e; + } } } catch (RepositoryException e) { throw new ArgeoException("Cannot process " + getNode(), e); @@ -45,7 +74,8 @@ public class JcrProcessThread extends ProcessThread implements SlcNames { } /** Configure the realized flows */ - protected void process(Node realizedFlowNode) throws RepositoryException { + protected void execute(Node realizedFlowNode) throws RepositoryException, + InterruptedException { if (realizedFlowNode.hasNode(SLC_ADDRESS)) { String flowPath = realizedFlowNode.getNode(SLC_ADDRESS) .getProperty(Property.JCR_PATH).getString(); @@ -100,7 +130,11 @@ public class JcrProcessThread extends ProcessThread implements SlcNames { values, executionSpec); realizedFlow.setFlowDescriptor(efd); + // + // EXECUTE THE FLOW + // execute(realizedFlow, true); + // } } diff --git a/runtime/org.argeo.slc.support.jcr/src/main/resources/org/argeo/slc/jcr/slc.cnd b/runtime/org.argeo.slc.support.jcr/src/main/resources/org/argeo/slc/jcr/slc.cnd index d644de3ca..0e2a5494a 100644 --- a/runtime/org.argeo.slc.support.jcr/src/main/resources/org/argeo/slc/jcr/slc.cnd +++ b/runtime/org.argeo.slc.support.jcr/src/main/resources/org/argeo/slc/jcr/slc.cnd @@ -65,6 +65,7 @@ mixin // - slc:spec (STRING) - slc:started (DATE) - slc:completed (DATE) +//- slc:status (STRING) + slc:address (nt:address) + slc:flow (slc:realizedFlow) * // the realized execution spec attributes