package org.argeo.slc.jcr.execution; import java.util.HashMap; import java.util.Map; import javax.jcr.Node; import javax.jcr.NodeIterator; import javax.jcr.Property; import javax.jcr.RepositoryException; import org.argeo.ArgeoException; import org.argeo.slc.SlcException; import org.argeo.slc.core.execution.DefaultExecutionSpec; import org.argeo.slc.core.execution.PrimitiveSpecAttribute; import org.argeo.slc.core.execution.PrimitiveUtils; import org.argeo.slc.core.execution.ProcessThread; import org.argeo.slc.execution.ExecutionFlowDescriptor; import org.argeo.slc.execution.ExecutionModulesManager; import org.argeo.slc.execution.ExecutionProcess; import org.argeo.slc.execution.ExecutionSpecAttribute; import org.argeo.slc.jcr.SlcJcrUtils; import org.argeo.slc.jcr.SlcNames; import org.argeo.slc.jcr.SlcTypes; import org.argeo.slc.process.RealizedFlow; /** Where the actual execution takes place */ public class JcrProcessThread extends ProcessThread implements SlcNames { public JcrProcessThread(ThreadGroup processesThreadGroup, ExecutionModulesManager executionModulesManager, JcrExecutionProcess process) { super(processesThreadGroup, executionModulesManager, process); } @Override protected void process() throws InterruptedException { try { Node rootRealizedFlowNode = getNode().getNode(SLC_FLOW); // we just manage one level for the time being NodeIterator nit = rootRealizedFlowNode.getNodes(SLC_FLOW); while (nit.hasNext()) { Node realizedFlowNode = nit.nextNode(); // set status on realized flow realizedFlowNode.setProperty(SLC_STATUS, ExecutionProcess.RUNNING); realizedFlowNode.getSession().save(); try { execute(realizedFlowNode); // set status on realized flow realizedFlowNode.setProperty(SLC_STATUS, ExecutionProcess.COMPLETED); realizedFlowNode.getSession().save(); } catch (RepositoryException e) { throw e; } catch (InterruptedException e) { // set status on realized flow realizedFlowNode.setProperty(SLC_STATUS, ExecutionProcess.KILLED); realizedFlowNode.getSession().save(); throw e; } catch (RuntimeException e) { // set status on realized flow realizedFlowNode.setProperty(SLC_STATUS, ExecutionProcess.ERROR); realizedFlowNode.getSession().save(); throw e; } } } catch (RepositoryException e) { throw new ArgeoException("Cannot process " + getNode(), e); } } /** Configure the realized flows */ protected void execute(Node realizedFlowNode) throws RepositoryException, InterruptedException { if (realizedFlowNode.hasNode(SLC_ADDRESS)) { String flowPath = realizedFlowNode.getNode(SLC_ADDRESS) .getProperty(Property.JCR_PATH).getString(); // TODO: convert to local path if remote Node flowNode = realizedFlowNode.getSession().getNode(flowPath); String flowName = flowNode.getProperty(SLC_NAME).getString(); String executionModuleName = SlcJcrUtils .flowExecutionModuleName(flowPath); String executionModuleVersion = SlcJcrUtils .flowExecutionModuleVersion(flowPath); RealizedFlow realizedFlow = new RealizedFlow(); realizedFlow.setModuleName(executionModuleName); realizedFlow.setModuleVersion(executionModuleVersion); // retrieve execution spec DefaultExecutionSpec executionSpec = null; if (flowNode.hasProperty(SlcNames.SLC_SPEC)) { Node executionSpecNode = flowNode.getProperty(SLC_SPEC) .getNode(); executionSpec = new DefaultExecutionSpec(); executionSpec.setBeanName(executionSpecNode.getProperty( SLC_NAME).getString()); executionSpec .setAttributes(readExecutionSpecAttributes(executionSpecNode)); } // TODO: will with original attr Map attrs = readExecutionSpecAttributes(realizedFlowNode); Map values = new HashMap(); for (String attrName : attrs.keySet()) { // if (flowNode.hasNode(attrName)) { // // we assume this is a primitive // // since ref are not yet implemented // Node valueNode = flowNode.getNode(attrName); // String type = valueNode.getProperty(SLC_TYPE).getString(); // String valueStr = valueNode.getProperty(SLC_VALUE) // .getString(); // Object value = PrimitiveUtils.convert(type, valueStr); // values.put(attrName, value); // } else { ExecutionSpecAttribute attr = attrs.get(attrName); Object value = attr.getValue(); values.put(attrName, value); // } } // if(executionSpec!=null) // executionSpec.setAttributes(attrs); ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(flowName, values, executionSpec); realizedFlow.setFlowDescriptor(efd); // // EXECUTE THE FLOW // execute(realizedFlow, true); // } } protected Map readExecutionSpecAttributes( Node node) { try { Map attrs = new HashMap(); for (NodeIterator nit = node.getNodes(); nit.hasNext();) { Node specAttrNode = nit.nextNode(); if (specAttrNode .isNodeType(SlcTypes.SLC_PRIMITIVE_SPEC_ATTRIBUTE)) { String type = specAttrNode.getProperty(SLC_TYPE) .getString(); Object value = null; if (specAttrNode.hasProperty(SLC_VALUE)) { String valueStr = specAttrNode.getProperty(SLC_VALUE) .getString(); value = PrimitiveUtils.convert(type, valueStr); } PrimitiveSpecAttribute specAttr = new PrimitiveSpecAttribute( type, value); attrs.put(specAttrNode.getName(), specAttr); } } return attrs; } catch (RepositoryException e) { throw new SlcException("Cannot read spec attributes from " + node, e); } } protected Node getNode() { return ((JcrExecutionProcess) getProcess()).getNode(); } }