public final static Image ERROR = img("icons/error.gif");
public final static Image LAUNCH = img("icons/launch.gif");
public final static Image RELAUNCH = img("icons/relaunch.gif");
+ public final static Image KILL = img("icons/kill.png");
public final static Image REMOVE_ONE = img("icons/remove_one.gif");
public final static Image REMOVE_ALL = img("icons/removeAll.png");
public final static Image EXECUTION_SPECS = img("icons/executionSpecs.gif");
import javax.jcr.Node;
import javax.jcr.NodeIterator;
import javax.jcr.Property;
+import javax.jcr.RepositoryException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public ExecutionProcess process(Node processNode) {
JcrExecutionProcess process = new JcrExecutionProcess(processNode);
try {
- // we currently only deal with single agents
- Node realizedFlowNode = processNode.getNode(SlcNames.SLC_FLOW);
- NodeIterator nit = realizedFlowNode.getNodes();
- if (nit.hasNext()) {
- // TODO find a better way to determine which agent to use
- // currently we check the agent of the first registered flow
- Node firstRealizedFlow = nit.nextNode();
- // we assume there is an nt:address
- String firstFlowPath = firstRealizedFlow
- .getNode(SlcNames.SLC_ADDRESS)
- .getProperty(Property.JCR_PATH).getString();
- Node flowNode = processNode.getSession().getNode(firstFlowPath);
- String agentFactoryPath = SlcJcrUtils
- .flowAgentFactoryPath(firstFlowPath);
- if (!agentFactories.containsKey(agentFactoryPath))
- throw new SlcException("No agent factory registered under "
- + agentFactoryPath);
- SlcAgentFactory agentFactory = agentFactories
- .get(agentFactoryPath);
- Node agentNode = ((Node) flowNode
- .getAncestor(SlcJcrUtils.AGENT_FACTORY_DEPTH + 1));
- String agentUuid = agentNode.getProperty(SlcNames.SLC_UUID)
- .getString();
-
- // process
- SlcAgent slcAgent = agentFactory.getAgent(agentUuid);
- slcAgent.process(process);
- }
+ SlcAgent slcAgent = findAgent(processNode);
+ if (slcAgent == null)
+ throw new SlcException("Cannot find agent for " + processNode);
+ slcAgent.process(process);
return process;
} catch (Exception e) {
if (!process.getStatus().equals(ExecutionProcess.ERROR))
}
}
+ public void kill(Node processNode) {
+ JcrExecutionProcess process = new JcrExecutionProcess(processNode);
+ try {
+ SlcAgent slcAgent = findAgent(processNode);
+ if (slcAgent == null)
+ throw new SlcException("Cannot find agent for " + processNode);
+ slcAgent.kill(process);
+ } catch (Exception e) {
+ if (!process.getStatus().equals(ExecutionProcess.ERROR))
+ process.setStatus(ExecutionProcess.ERROR);
+ throw new SlcException("Cannot execute " + processNode, e);
+ }
+ }
+
+ protected SlcAgent findAgent(Node processNode) throws RepositoryException {
+ // we currently only deal with single agents
+ Node realizedFlowNode = processNode.getNode(SlcNames.SLC_FLOW);
+ NodeIterator nit = realizedFlowNode.getNodes();
+ if (nit.hasNext()) {
+ // TODO find a better way to determine which agent to use
+ // currently we check the agent of the first registered flow
+ Node firstRealizedFlow = nit.nextNode();
+ // we assume there is an nt:address
+ String firstFlowPath = firstRealizedFlow
+ .getNode(SlcNames.SLC_ADDRESS)
+ .getProperty(Property.JCR_PATH).getString();
+ Node flowNode = processNode.getSession().getNode(firstFlowPath);
+ String agentFactoryPath = SlcJcrUtils
+ .flowAgentFactoryPath(firstFlowPath);
+ if (!agentFactories.containsKey(agentFactoryPath))
+ throw new SlcException("No agent factory registered under "
+ + agentFactoryPath);
+ SlcAgentFactory agentFactory = agentFactories.get(agentFactoryPath);
+ Node agentNode = ((Node) flowNode
+ .getAncestor(SlcJcrUtils.AGENT_FACTORY_DEPTH + 1));
+ String agentUuid = agentNode.getProperty(SlcNames.SLC_UUID)
+ .getString();
+
+ // process
+ return agentFactory.getAgent(agentUuid);
+ }
+ return null;
+ }
+
public synchronized void register(SlcAgentFactory agentFactory,
Map<String, String> properties) {
String path = properties.get(SlcJcrConstants.PROPERTY_PATH);
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.UUID;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
import javax.jcr.Property;
import javax.jcr.RepositoryException;
-import javax.jcr.Session;
import javax.jcr.nodetype.NodeType;
import javax.jcr.observation.Event;
import javax.jcr.observation.EventListener;
import org.eclipse.swt.widgets.Composite;
import org.eclipse.swt.widgets.Label;
import org.eclipse.swt.widgets.Table;
-import org.eclipse.ui.IWorkbenchPage;
-import org.eclipse.ui.PlatformUI;
import org.eclipse.ui.forms.AbstractFormPart;
import org.eclipse.ui.forms.IManagedForm;
import org.eclipse.ui.forms.editor.FormPage;
Event.PROPERTY_CHANGED, processNode.getPath(), true, null,
null, false);
+ // make sure all controls are in line with status
+ statusChanged();
+
// add initial flows
addInitialFlows();
run.addSelectionListener(new SelectionListener() {
public void widgetSelected(SelectionEvent e) {
if (isFinished(getProcessStatus())) {
- relaunch();
+ ((ProcessEditor) getEditor()).relaunch();
+ } else if (isRunning(getProcessStatus())) {
+ ((ProcessEditor) getEditor()).kill();
} else {
((ProcessEditor) getEditor()).process();
}
statusComposite.setLayout(new FillLayout());
statusLabel = tk.createLabel(statusComposite, getProcessStatus());
- // make sure all controls are in line with status
- statusChanged();
}
protected void createBuilder(Composite parent) {
/*
* CONTROLLERS
*/
- /** Opens a new editor with a copy of this process */
- protected void relaunch() {
- try {
- Node duplicatedNode = duplicateProcess();
- IWorkbenchPage activePage = PlatformUI.getWorkbench()
- .getActiveWorkbenchWindow().getActivePage();
- activePage.openEditor(
- new ProcessEditorInput(duplicatedNode.getPath()),
- ProcessEditor.ID);
- getEditor().close(false);
- } catch (Exception e1) {
- throw new SlcException("Cannot relaunch " + processNode, e1);
- }
- }
-
- /** Duplicates the process */
- protected Node duplicateProcess() {
- try {
- Session session = processNode.getSession();
- String uuid = UUID.randomUUID().toString();
- String destPath = SlcJcrUtils.createExecutionProcessPath(uuid);
- Node newNode = JcrUtils.mkdirs(session, destPath,
- SlcTypes.SLC_PROCESS);
- JcrUtils.copy(processNode, newNode);
- // session.getWorkspace().copy(processNode.getPath(), destPath);
- // Node newNode = session.getNode(destPath);
- // make sure that we kept the mixins
- // newNode.addMixin(NodeType.MIX_CREATED);
- // newNode.addMixin(NodeType.MIX_LAST_MODIFIED);
- newNode.setProperty(SLC_UUID, uuid);
- newNode.setProperty(SLC_STATUS, ExecutionProcess.INITIALIZED);
- session.save();
- return newNode;
- } catch (RepositoryException e) {
- throw new SlcException("Cannot duplicate process", e);
- }
- }
-
/** Reflects a status change */
protected void statusChanged() {
String status = getProcessStatus();
statusLabel.setText(status);
Boolean isEditable = isEditable(status);
- run.setEnabled(isEditable);
+ run.setEnabled(status.equals(ExecutionProcess.RUNNING) || isEditable);
remove.setEnabled(isEditable);
clear.setEnabled(isEditable);
// flowsViewer.getTree().setEnabled(isEditable);
- if (status.equals(ExecutionProcess.COMPLETED)
- || status.equals(ExecutionProcess.ERROR)) {
+ if (status.equals(ExecutionProcess.RUNNING)) {
+ run.setEnabled(true);
+ run.setImage(SlcImages.KILL);
+ run.setToolTipText("Kill");
+ } else if (isFinished(status)) {
run.setEnabled(true);
run.setImage(SlcImages.RELAUNCH);
run.setToolTipText("Relaunch");
}
+
+ if (flowsViewer != null)
+ flowsViewer.refresh();
}
/** Adds initial flows from the editor input if any */
}
/** Optimization so that we don't call the node each time */
- protected Boolean isEditable(String status) {
+ protected static Boolean isEditable(String status) {
return status.equals(ExecutionProcess.NEW)
|| status.equals(ExecutionProcess.INITIALIZED);
}
- protected Boolean isFinished(String status) {
+ protected static Boolean isFinished(String status) {
return status.equals(ExecutionProcess.COMPLETED)
- || status.equals(ExecutionProcess.ERROR);
+ || status.equals(ExecutionProcess.ERROR)
+ || status.equals(ExecutionProcess.KILLED);
+ }
+
+ protected static Boolean isRunning(String status) {
+ return status.equals(ExecutionProcess.RUNNING);
}
/*
Node node = (Node) element;
try {
if (node.isNodeType(SlcTypes.SLC_REALIZED_FLOW)) {
+ if (node.hasProperty(SLC_STATUS)) {
+ String status = node.getProperty(SLC_STATUS)
+ .getString();
+ // TODO: factorize with process view ?
+ if (status.equals(ExecutionProcess.RUNNING))
+ return SlcImages.PROCESS_RUNNING;
+ else if (status.equals(ExecutionProcess.ERROR)
+ || status.equals(ExecutionProcess.KILLED))
+ return SlcImages.PROCESS_ERROR;
+ else if (status.equals(ExecutionProcess.COMPLETED))
+ return SlcImages.PROCESS_COMPLETED;
+ }
return SlcImages.FLOW;
}
} catch (RepositoryException e) {
import java.util.UUID;
import javax.jcr.Node;
+import javax.jcr.NodeIterator;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.ui.IEditorInput;
import org.eclipse.ui.IEditorSite;
+import org.eclipse.ui.IWorkbenchPage;
import org.eclipse.ui.PartInitException;
+import org.eclipse.ui.PlatformUI;
import org.eclipse.ui.forms.editor.FormEditor;
+/** Editor for an execution process. */
public class ProcessEditor extends FormEditor implements
ExecutionProcessNotifier, SlcTypes, SlcNames {
public final static String ID = ClientUiPlugin.ID + ".processEditor";
}
/** Actually runs the process. */
- public void process() {
+ void process() {
// the modifications have to be saved before execution
try {
processNode.setProperty(SLC_STATUS, ExecutionProcess.SCHEDULED);
}
doSave(null);
try {
+ // show log
+ // setActivePage(logPage.getId());
+
ExecutionProcess process = processController.process(processNode);
Map<String, String> properties = new HashMap<String, String>();
properties.put(ExecutionModulesManager.SLC_PROCESS_ID,
}
}
+ void kill() {
+ processController.kill(processNode);
+ }
+
+ /** Opens a new editor with a copy of this process */
+ void relaunch() {
+ try {
+ Node duplicatedNode = duplicateProcess();
+ IWorkbenchPage activePage = PlatformUI.getWorkbench()
+ .getActiveWorkbenchWindow().getActivePage();
+ activePage.openEditor(
+ new ProcessEditorInput(duplicatedNode.getPath()),
+ ProcessEditor.ID);
+ close(false);
+ } catch (Exception e1) {
+ throw new SlcException("Cannot relaunch " + processNode, e1);
+ }
+ }
+
+ /** Duplicates the process */
+ protected Node duplicateProcess() {
+ try {
+ Session session = processNode.getSession();
+ String uuid = UUID.randomUUID().toString();
+ String destPath = SlcJcrUtils.createExecutionProcessPath(uuid);
+ Node newNode = JcrUtils.mkdirs(session, destPath,
+ SlcTypes.SLC_PROCESS);
+
+ // copy node
+ JcrUtils.copy(processNode, newNode);
+
+ newNode.setProperty(SLC_UUID, uuid);
+ newNode.setProperty(SLC_STATUS, ExecutionProcess.INITIALIZED);
+
+ // reset realized flow status
+ Node rootRealizedFlowNode = newNode.getNode(SLC_FLOW);
+ // we just manage one level for the time being
+ NodeIterator nit = rootRealizedFlowNode.getNodes(SLC_FLOW);
+ while (nit.hasNext()) {
+ nit.nextNode().setProperty(SLC_STATUS,
+ ExecutionProcess.INITIALIZED);
+ }
+
+ session.save();
+ return newNode;
+ } catch (RepositoryException e) {
+ throw new SlcException("Cannot duplicate process", e);
+ }
+ }
+
@Override
protected void addPages() {
try {
return SlcImages.PROCESS_COMPLETED;
else if (status.equals(ExecutionProcess.RUNNING))
return SlcImages.PROCESS_RUNNING;
+ else if (status.equals(ExecutionProcess.KILLED))
+ return SlcImages.PROCESS_ERROR;
else
throw new SlcException("Unkown status " + status);
} catch (RepositoryException e) {
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
private List<FilteredNotifier> filteredNotifiers = Collections
.synchronizedList(new ArrayList<FilteredNotifier>());
- private ThreadGroup processesThreadGroup = new ThreadGroup("SLC Processes");
-
protected abstract ExecutionFlow findExecutionFlow(String moduleName,
String moduleVersion, String flowName);
filteredNotifier.getNotifier().updateStatus(process, oldStatus,
newStatus);
}
+
}
public void dispatchAddSteps(ExecutionProcess process,
filteredNotifiers.add(new FilteredNotifier(notifier, properties));
}
+ public void unregisterProcessNotifier(ExecutionProcessNotifier notifier,
+ Map<String, String> properties) {
+ filteredNotifiers.remove(notifier);
+ }
+
public void setSlcExecutionNotifiers(
List<SlcExecutionNotifier> slcExecutionNotifiers) {
this.slcExecutionNotifiers = slcExecutionNotifiers;
return slcExecutionNotifiers;
}
- public ThreadGroup getProcessesThreadGroup() {
- return processesThreadGroup;
- }
-
protected class FilteredNotifier {
private final ExecutionProcessNotifier notifier;
private final String processId;
Map<String, String> properties) {
super();
this.notifier = notifier;
+ if (properties == null)
+ properties = new HashMap<String, String>();
if (properties.containsKey(SLC_PROCESS_ID))
processId = properties.get(SLC_PROCESS_ID);
else
@Override
public boolean equals(Object obj) {
- return notifier.equals(obj);
+ if (obj instanceof FilteredNotifier) {
+ FilteredNotifier fn = (FilteredNotifier) obj;
+ return notifier.equals(fn.notifier);
+ } else if (obj instanceof ExecutionProcessNotifier) {
+ ExecutionProcessNotifier epn = (ExecutionProcessNotifier) obj;
+ return notifier.equals(epn);
+ } else
+ return false;
}
public ExecutionProcessNotifier getNotifier() {
import org.springframework.beans.factory.InitializingBean;
import org.springframework.validation.MapBindingResult;
+/** Default implementation of an execution flow. */
public class DefaultExecutionFlow implements ExecutionFlow, InitializingBean,
BeanNameAware {
private final static Log log = LogFactory
ExecutionSpecAttribute attr = executionSpec.getAttributes()
.get(key);
- if (attr.getIsParameter() && !isSetAsParameter(key)) {
- errors.rejectValue(key, "Parameter not set");
+ if (attr.getIsImmutable() && !isSetAsParameter(key)) {
+ errors.rejectValue(key, "Immutable but not set");
break;
}
- if (attr.getIsFrozen() && !isSetAsParameter(key)) {
- errors.rejectValue(key, "Frozen but not set as parameter");
+ if (attr.getIsConstant() && !isSetAsParameter(key)) {
+ errors.rejectValue(key, "Constant but not set as parameter");
break;
}
public void run() {
try {
for (Runnable executable : executables) {
+ if (Thread.interrupted()) {
+ log.error("Flow '" + getName() + "' killed before '"
+ + executable + "'");
+ Thread.currentThread().interrupt();
+ return;
+ // throw new ThreadDeath();
+ }
this.doExecuteRunnable(executable);
}
} catch (RuntimeException e) {
+ if (Thread.interrupted()) {
+ log.error("Flow '" + getName()
+ + "' killed while receiving an unrelated exception", e);
+ Thread.currentThread().interrupt();
+ return;
+ // throw new ThreadDeath();
+ }
if (failOnError)
throw e;
else {
public void run() {
if (getContextClassLoader() != null) {
if (log.isTraceEnabled())
- log.debug("Context class loader set to "
+ log.trace("Context class loader set to "
+ getContextClassLoader());
}
package org.argeo.slc.core.execution;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
private final ExecutionProcess process;
private final ProcessThreadGroup processThreadGroup;
- private Set<ExecutionThread> executionThreads = new HashSet<ExecutionThread>();
+ private Set<ExecutionThread> executionThreads = Collections
+ .synchronizedSet(new HashSet<ExecutionThread>());
private Boolean hadAnError = false;
+ private Boolean killed = false;
- public ProcessThread(ExecutionModulesManager executionModulesManager,
+ public ProcessThread(ThreadGroup processesThreadGroup,
+ ExecutionModulesManager executionModulesManager,
ExecutionProcess process) {
- super(executionModulesManager.getProcessesThreadGroup(),
- "SLC Process #" + process.getUuid());
+ super(processesThreadGroup, "SLC Process #" + process.getUuid());
this.executionModulesManager = executionModulesManager;
this.process = process;
processThreadGroup = new ProcessThreadGroup(executionModulesManager,
this);
}
- public void run() {
+ public final void run() {
log.info("\n##\n## SLC Process #" + process.getUuid()
+ " STARTED\n##\n");
- process.setStatus(SlcExecution.RUNNING);
- executionModulesManager.dispatchUpdateStatus(process,
- SlcExecution.SCHEDULED, SlcExecution.RUNNING);
+ String oldStatus = process.getStatus();
+ process.setStatus(ExecutionProcess.RUNNING);
+ executionModulesManager.dispatchUpdateStatus(process, oldStatus,
+ ExecutionProcess.RUNNING);
- process();
+ try {
+ process();
+ } catch (InterruptedException e) {
+ die();
+ return;
+ }
// waits for all execution threads to complete (in case they were
// started asynchronously)
try {
executionThread.join();
} catch (InterruptedException e) {
- log.error("Execution thread " + executionThread
- + " was interrupted");
+ die();
+ return;
}
}
}
+ computeFinalStatus();
+ }
+
+ /** Make sure this is called BEFORE all the threads are interrupted. */
+ private void computeFinalStatus() {
+ String oldStatus = process.getStatus();
// TODO: error management at flow level?
- if (hadAnError)
- process.setStatus(SlcExecution.ERROR);
+ if (killed)
+ process.setStatus(ExecutionProcess.KILLED);
+ else if (hadAnError)
+ process.setStatus(ExecutionProcess.ERROR);
else
- process.setStatus(SlcExecution.COMPLETED);
- executionModulesManager.dispatchUpdateStatus(process,
- SlcExecution.RUNNING, process.getStatus());
+ process.setStatus(ExecutionProcess.COMPLETED);
+ executionModulesManager.dispatchUpdateStatus(process, oldStatus,
+ process.getStatus());
+ log.info("\n## SLC Process #" + process.getUuid() + " "
+ + process.getStatus() + "\n");
+ }
- log.info("\n## SLC Process #" + process.getUuid() + " COMPLETED\n");
+ /** Called when being killed */
+ private synchronized void die() {
+ killed = true;
+ computeFinalStatus();
+ for (ExecutionThread executionThread : executionThreads) {
+ try {
+ executionThread.interrupt();
+ } catch (Exception e) {
+ log.error("Cannot interrupt " + executionThread);
+ }
+ }
+ processThreadGroup.interrupt();
}
/**
* Implementation specific execution. To be overridden in order to deal with
* custom process types. Default expects an {@link SlcExecution}.
*/
- protected void process() {
+ @SuppressWarnings("deprecation")
+ protected void process() throws InterruptedException {
if (!(process instanceof SlcExecution))
throw new SlcException("Unsupported process type "
+ process.getClass());
}
/** @return the (distinct) thread used for this execution */
- protected Thread execute(RealizedFlow realizedFlow, Boolean synchronous) {
+ protected final void execute(RealizedFlow realizedFlow, Boolean synchronous)
+ throws InterruptedException {
+ if (killed)
+ return;
+
ExecutionThread thread = new ExecutionThread(this, realizedFlow);
executionThreads.add(thread);
thread.start();
- if (synchronous) {
- try {
- thread.join();
- } catch (InterruptedException e) {
- log.error("Flow " + realizedFlow + " was interrupted", e);
- }
- }
- return thread;
-
- // synchronized (this) {
- // try {
- // wait();
- // } catch (InterruptedException e) {
- // // silent
- // }
- // }
+ if (synchronous)
+ thread.join();
+
+ return;
}
public void notifyError() {
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.argeo.slc.SlcException;
import org.argeo.slc.execution.ExecutionModuleDescriptor;
import org.argeo.slc.execution.ExecutionModulesManager;
import org.argeo.slc.execution.ExecutionProcess;
+import org.argeo.slc.execution.ExecutionProcessNotifier;
+import org.argeo.slc.execution.ExecutionStep;
import org.argeo.slc.process.SlcExecution;
import org.argeo.slc.runtime.SlcAgent;
import org.argeo.slc.runtime.SlcAgentDescriptor;
/** Implements the base methods of an SLC agent. */
-public class DefaultAgent implements SlcAgent {
+public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
private SlcAgentDescriptor agentDescriptor;
private ExecutionModulesManager modulesManager;
+ private ThreadGroup processesThreadGroup;
+ private Map<String, ProcessThread> runningProcesses = Collections
+ .synchronizedMap(new HashMap<String, ProcessThread>());
+
/*
* LIFECYCLE
*/
} catch (UnknownHostException e) {
throw new SlcException("Unable to create agent descriptor.", e);
}
+ processesThreadGroup = new ThreadGroup("SLC Processes of Agent #"
+ + agentDescriptor.getUuid());
+ modulesManager.registerProcessNotifier(this,
+ new HashMap<String, String>());
}
public void dispose() {
-
+ modulesManager.unregisterProcessNotifier(this,
+ new HashMap<String, String>());
}
/**
}
public void process(ExecutionProcess process) {
- ProcessThread processThread = createProcessThread(modulesManager,
- process);
+ ProcessThread processThread = createProcessThread(processesThreadGroup,
+ modulesManager, process);
processThread.start();
+ runningProcesses.put(process.getUuid(), processThread);
+ // FIXME find a way to remove them from this register
+ }
+
+ public void kill(ExecutionProcess process) {
+ String processUuid = process.getUuid();
+ if (runningProcesses.containsKey(processUuid)) {
+ runningProcesses.get(processUuid).interrupt();
+ }
}
/** Creates the thread which will coordinate the execution for this agent. */
protected ProcessThread createProcessThread(
+ ThreadGroup processesThreadGroup,
ExecutionModulesManager modulesManager, ExecutionProcess process) {
if (!(process instanceof SlcExecution))
throw new SlcException("Unsupported process type "
+ process.getClass());
- ProcessThread processThread = new ProcessThread(modulesManager,
- (SlcExecution) process);
+ ProcessThread processThread = new ProcessThread(processesThreadGroup,
+ modulesManager, (SlcExecution) process);
return processThread;
}
return true;
}
+ /*
+ * PROCESS NOTIFIER
+ */
+ public void updateStatus(ExecutionProcess process, String oldStatus,
+ String newStatus) {
+ if (newStatus.equals(ExecutionProcess.COMPLETED)
+ || newStatus.equals(ExecutionProcess.ERROR)
+ || newStatus.equals(ExecutionProcess.KILLED)) {
+ runningProcesses.remove(process.getUuid());
+ }
+ }
+
+ public void addSteps(ExecutionProcess process, List<ExecutionStep> steps) {
+ }
+
/*
* BEAN
*/
import org.argeo.slc.runtime.SlcAgent;
import org.argeo.slc.runtime.SlcAgentFactory;
+/** @deprecated old prototype, should be removed */
public class SimpleAgentFactory implements SlcAgentFactory {
private List<SlcAgent> agents;
import org.argeo.slc.deploy.ModulesManager;
import org.argeo.slc.process.RealizedFlow;
-import org.argeo.slc.process.SlcExecutionStep;
/** Provides access to the execution modules */
public interface ExecutionModulesManager extends ModulesManager {
*/
public List<ExecutionModuleDescriptor> listExecutionModules();
- /** The thread group to which all process threads will belong. */
- public ThreadGroup getProcessesThreadGroup();
-
/** Synchronously finds and executes an {@link ExecutionFlow}. */
public void execute(RealizedFlow realizedFlow);
*/
public void registerProcessNotifier(ExecutionProcessNotifier notifier,
Map<String, String> properties);
+
+ /** Unregisters a notifier */
+ public void unregisterProcessNotifier(ExecutionProcessNotifier notifier,
+ Map<String, String> properties);
}
* during and after the execution actually took place, providing an entry point
* for the definition of executions, their monitoring (e.g. logging) and
* tracking. A process can be distributed or parallelized.
+ * <br/>
+ * NEW => INITIALIZED => SCHEDULED => RUNNING<br/>
+ * RUNNING => {COMPLETED | ERROR | KILLED}<br/>
+ * {COMPLETED | ERROR | KILLED} => PURGED<br/>
+ * UNKOWN : this is a bug if this status occurs<br/>
*/
public interface ExecutionProcess {
/** The process is not yet usable. */
public final static String COMPLETED = "COMPLETED";
/** The process failed because of an unexpected error. */
public final static String ERROR = "ERROR";
+ /** The process was killed explicitly or through a crash. */
+ public final static String KILLED = "KILLED";
/** The status cannot be retrieved (probably because of unexpected errors). */
public final static String UNKOWN = "UNKOWN";
\r
import org.argeo.slc.execution.ExecutionProcess;\r
\r
+/** @deprecated use other implementations of {@link ExecutionProcess} */\r
public class SlcExecution implements ExecutionProcess, Serializable {\r
private static final long serialVersionUID = -7607457971382118466L;\r
\r
/** Agent unique identifier */
public String getAgentUuid();
+ /** Execute / take part to this process */
+ public void process(ExecutionProcess process);
+
+ /** Kills this process */
+ public void kill(ExecutionProcess process);
+
public ExecutionModuleDescriptor getExecutionModuleDescriptor(
String moduleName, String version);
/** @deprecated Use {@link #process(ExecutionProcess)} instead. */
public void runSlcExecution(SlcExecution slcExecution);
- public void process(ExecutionProcess process);
-
/** @return true if still alive. */
public boolean ping();
}
}
}
+ public void kill(ExecutionProcess process) {
+ throw new UnsupportedOperationException();
+ }
+
protected Object sendReceive(AgentMC messageCreator) {
long begin = System.currentTimeMillis();
Object res;
*/
@Override
protected ProcessThread createProcessThread(
+ ThreadGroup processesThreadGroup,
ExecutionModulesManager modulesManager, ExecutionProcess process) {
- return new JcrProcessThread(modulesManager,
+ return new JcrProcessThread(processesThreadGroup, modulesManager,
(JcrExecutionProcess) process);
}
import org.argeo.slc.core.execution.ProcessThread;
import org.argeo.slc.execution.ExecutionFlowDescriptor;
import org.argeo.slc.execution.ExecutionModulesManager;
+import org.argeo.slc.execution.ExecutionProcess;
import org.argeo.slc.execution.ExecutionSpecAttribute;
import org.argeo.slc.jcr.SlcJcrUtils;
import org.argeo.slc.jcr.SlcNames;
/** Where the actual execution takes place */
public class JcrProcessThread extends ProcessThread implements SlcNames {
- public JcrProcessThread(ExecutionModulesManager executionModulesManager,
+ public JcrProcessThread(ThreadGroup processesThreadGroup,
+ ExecutionModulesManager executionModulesManager,
JcrExecutionProcess process) {
- super(executionModulesManager, process);
+ super(processesThreadGroup, executionModulesManager, process);
}
@Override
- protected void process() {
+ protected void process() throws InterruptedException {
try {
- Node realizedFlowNode = getNode().getNode(SLC_FLOW);
+ Node rootRealizedFlowNode = getNode().getNode(SLC_FLOW);
// we just manage one level for the time being
- NodeIterator nit = realizedFlowNode.getNodes(SLC_FLOW);
+ NodeIterator nit = rootRealizedFlowNode.getNodes(SLC_FLOW);
while (nit.hasNext()) {
- process(nit.nextNode());
+ Node realizedFlowNode = nit.nextNode();
+
+ // set status on realized flow
+ realizedFlowNode.setProperty(SLC_STATUS,
+ ExecutionProcess.RUNNING);
+ realizedFlowNode.getSession().save();
+ try {
+ execute(realizedFlowNode);
+
+ // set status on realized flow
+ realizedFlowNode.setProperty(SLC_STATUS,
+ ExecutionProcess.COMPLETED);
+ realizedFlowNode.getSession().save();
+ } catch (RepositoryException e) {
+ throw e;
+ } catch (InterruptedException e) {
+ // set status on realized flow
+ realizedFlowNode.setProperty(SLC_STATUS,
+ ExecutionProcess.KILLED);
+ realizedFlowNode.getSession().save();
+ throw e;
+ } catch (RuntimeException e) {
+ // set status on realized flow
+ realizedFlowNode.setProperty(SLC_STATUS,
+ ExecutionProcess.ERROR);
+ realizedFlowNode.getSession().save();
+ throw e;
+ }
}
} catch (RepositoryException e) {
throw new ArgeoException("Cannot process " + getNode(), e);
}
/** Configure the realized flows */
- protected void process(Node realizedFlowNode) throws RepositoryException {
+ protected void execute(Node realizedFlowNode) throws RepositoryException,
+ InterruptedException {
if (realizedFlowNode.hasNode(SLC_ADDRESS)) {
String flowPath = realizedFlowNode.getNode(SLC_ADDRESS)
.getProperty(Property.JCR_PATH).getString();
values, executionSpec);
realizedFlow.setFlowDescriptor(efd);
+ //
+ // EXECUTE THE FLOW
+ //
execute(realizedFlow, true);
+ //
}
}
// - slc:spec (STRING)
- slc:started (DATE)
- slc:completed (DATE)
+//- slc:status (STRING)
+ slc:address (nt:address)
+ slc:flow (slc:realizedFlow) *
// the realized execution spec attributes