import org.argeo.slc.core.execution.ProcessThread;
import org.argeo.slc.execution.ExecutionFlowDescriptor;
import org.argeo.slc.execution.ExecutionModulesManager;
+import org.argeo.slc.execution.ExecutionProcess;
import org.argeo.slc.execution.ExecutionSpecAttribute;
import org.argeo.slc.jcr.SlcJcrUtils;
import org.argeo.slc.jcr.SlcNames;
/** Where the actual execution takes place */
public class JcrProcessThread extends ProcessThread implements SlcNames {
- public JcrProcessThread(ExecutionModulesManager executionModulesManager,
+ public JcrProcessThread(ThreadGroup processesThreadGroup,
+ ExecutionModulesManager executionModulesManager,
JcrExecutionProcess process) {
- super(executionModulesManager, process);
+ super(processesThreadGroup, executionModulesManager, process);
}
@Override
- protected void process() {
+ protected void process() throws InterruptedException {
try {
- Node realizedFlowNode = getNode().getNode(SLC_FLOW);
+ Node rootRealizedFlowNode = getNode().getNode(SLC_FLOW);
// we just manage one level for the time being
- NodeIterator nit = realizedFlowNode.getNodes(SLC_FLOW);
+ NodeIterator nit = rootRealizedFlowNode.getNodes(SLC_FLOW);
while (nit.hasNext()) {
- process(nit.nextNode());
+ Node realizedFlowNode = nit.nextNode();
+
+ // set status on realized flow
+ realizedFlowNode.setProperty(SLC_STATUS,
+ ExecutionProcess.RUNNING);
+ realizedFlowNode.getSession().save();
+ try {
+ execute(realizedFlowNode);
+
+ // set status on realized flow
+ realizedFlowNode.setProperty(SLC_STATUS,
+ ExecutionProcess.COMPLETED);
+ realizedFlowNode.getSession().save();
+ } catch (RepositoryException e) {
+ throw e;
+ } catch (InterruptedException e) {
+ // set status on realized flow
+ realizedFlowNode.setProperty(SLC_STATUS,
+ ExecutionProcess.KILLED);
+ realizedFlowNode.getSession().save();
+ throw e;
+ } catch (RuntimeException e) {
+ // set status on realized flow
+ realizedFlowNode.setProperty(SLC_STATUS,
+ ExecutionProcess.ERROR);
+ realizedFlowNode.getSession().save();
+ throw e;
+ }
}
} catch (RepositoryException e) {
throw new ArgeoException("Cannot process " + getNode(), e);
}
/** Configure the realized flows */
- protected void process(Node realizedFlowNode) throws RepositoryException {
+ protected void execute(Node realizedFlowNode) throws RepositoryException,
+ InterruptedException {
if (realizedFlowNode.hasNode(SLC_ADDRESS)) {
String flowPath = realizedFlowNode.getNode(SLC_ADDRESS)
.getProperty(Property.JCR_PATH).getString();
Map<String, ExecutionSpecAttribute> attrs = readExecutionSpecAttributes(realizedFlowNode);
Map<String, Object> values = new HashMap<String, Object>();
for (String attrName : attrs.keySet()) {
- if (flowNode.hasNode(attrName)) {
- // we assume this is a primitive
- // since ref are not yet implemented
- Node valueNode = flowNode.getNode(attrName);
- String type = valueNode.getProperty(SLC_TYPE).getString();
- String valueStr = valueNode.getProperty(SLC_VALUE)
- .getString();
- Object value = PrimitiveUtils.convert(type, valueStr);
- values.put(attrName, value);
- } else {
+// if (flowNode.hasNode(attrName)) {
+// // we assume this is a primitive
+// // since ref are not yet implemented
+// Node valueNode = flowNode.getNode(attrName);
+// String type = valueNode.getProperty(SLC_TYPE).getString();
+// String valueStr = valueNode.getProperty(SLC_VALUE)
+// .getString();
+// Object value = PrimitiveUtils.convert(type, valueStr);
+// values.put(attrName, value);
+// } else {
ExecutionSpecAttribute attr = attrs.get(attrName);
Object value = attr.getValue();
values.put(attrName, value);
- }
+// }
}
// if(executionSpec!=null)
values, executionSpec);
realizedFlow.setFlowDescriptor(efd);
+ //
+ // EXECUTE THE FLOW
+ //
execute(realizedFlow, true);
+ //
}
}