1 package org
.argeo
.slc
.jcr
.execution
;
3 import java
.util
.HashMap
;
7 import javax
.jcr
.NodeIterator
;
8 import javax
.jcr
.Property
;
9 import javax
.jcr
.RepositoryException
;
11 import org
.argeo
.ArgeoException
;
12 import org
.argeo
.slc
.SlcException
;
13 import org
.argeo
.slc
.core
.execution
.DefaultExecutionSpec
;
14 import org
.argeo
.slc
.core
.execution
.PrimitiveSpecAttribute
;
15 import org
.argeo
.slc
.core
.execution
.PrimitiveUtils
;
16 import org
.argeo
.slc
.core
.execution
.ProcessThread
;
17 import org
.argeo
.slc
.core
.execution
.RefSpecAttribute
;
18 import org
.argeo
.slc
.execution
.ExecutionFlowDescriptor
;
19 import org
.argeo
.slc
.execution
.ExecutionModulesManager
;
20 import org
.argeo
.slc
.execution
.ExecutionProcess
;
21 import org
.argeo
.slc
.execution
.ExecutionSpecAttribute
;
22 import org
.argeo
.slc
.jcr
.SlcJcrUtils
;
23 import org
.argeo
.slc
.jcr
.SlcNames
;
24 import org
.argeo
.slc
.jcr
.SlcTypes
;
25 import org
.argeo
.slc
.process
.RealizedFlow
;
27 /** Where the actual execution takes place */
28 public class JcrProcessThread
extends ProcessThread
implements SlcNames
{
30 public JcrProcessThread(ThreadGroup processesThreadGroup
,
31 ExecutionModulesManager executionModulesManager
,
32 JcrExecutionProcess process
) {
33 super(processesThreadGroup
, executionModulesManager
, process
);
37 protected void process() throws InterruptedException
{
39 Node rootRealizedFlowNode
= getNode().getNode(SLC_FLOW
);
40 // we just manage one level for the time being
41 NodeIterator nit
= rootRealizedFlowNode
.getNodes(SLC_FLOW
);
42 while (nit
.hasNext()) {
43 Node realizedFlowNode
= nit
.nextNode();
45 // set status on realized flow
46 realizedFlowNode
.setProperty(SLC_STATUS
,
47 ExecutionProcess
.RUNNING
);
48 realizedFlowNode
.getSession().save();
50 execute(realizedFlowNode
);
52 // set status on realized flow
53 realizedFlowNode
.setProperty(SLC_STATUS
,
54 ExecutionProcess
.COMPLETED
);
55 realizedFlowNode
.getSession().save();
56 } catch (RepositoryException e
) {
58 } catch (InterruptedException e
) {
59 // set status on realized flow
60 realizedFlowNode
.setProperty(SLC_STATUS
,
61 ExecutionProcess
.KILLED
);
62 realizedFlowNode
.getSession().save();
64 } catch (RuntimeException e
) {
65 // set status on realized flow
66 realizedFlowNode
.setProperty(SLC_STATUS
,
67 ExecutionProcess
.ERROR
);
68 realizedFlowNode
.getSession().save();
72 } catch (RepositoryException e
) {
73 throw new ArgeoException("Cannot process " + getNode(), e
);
77 /** Configure the realized flows */
78 protected void execute(Node realizedFlowNode
) throws RepositoryException
,
79 InterruptedException
{
80 if (realizedFlowNode
.hasNode(SLC_ADDRESS
)) {
81 String flowPath
= realizedFlowNode
.getNode(SLC_ADDRESS
)
82 .getProperty(Property
.JCR_PATH
).getString();
83 // TODO: convert to local path if remote
85 Node flowNode
= realizedFlowNode
.getSession().getNode(flowPath
);
86 String flowName
= flowNode
.getProperty(SLC_NAME
).getString();
88 String executionModuleName
= SlcJcrUtils
89 .flowExecutionModuleName(flowPath
);
90 String executionModuleVersion
= SlcJcrUtils
91 .flowExecutionModuleVersion(flowPath
);
93 RealizedFlow realizedFlow
= new RealizedFlow();
94 realizedFlow
.setModuleName(executionModuleName
);
95 realizedFlow
.setModuleVersion(executionModuleVersion
);
97 // retrieve execution spec
98 DefaultExecutionSpec executionSpec
= new DefaultExecutionSpec();
99 Map
<String
, ExecutionSpecAttribute
> attrs
= readExecutionSpecAttributes(realizedFlowNode
);
100 executionSpec
.setAttributes(attrs
);
102 // set execution spec name
103 if (flowNode
.hasProperty(SlcNames
.SLC_SPEC
)) {
104 Node executionSpecNode
= flowNode
.getProperty(SLC_SPEC
)
106 executionSpec
.setBeanName(executionSpecNode
.getProperty(
107 SLC_NAME
).getString());
110 // explicitly retrieve values
111 Map
<String
, Object
> values
= new HashMap
<String
, Object
>();
112 for (String attrName
: attrs
.keySet()) {
113 ExecutionSpecAttribute attr
= attrs
.get(attrName
);
114 Object value
= attr
.getValue();
115 values
.put(attrName
, value
);
118 ExecutionFlowDescriptor efd
= new ExecutionFlowDescriptor(flowName
,
119 values
, executionSpec
);
120 realizedFlow
.setFlowDescriptor(efd
);
125 execute(realizedFlow
, true);
130 protected Map
<String
, ExecutionSpecAttribute
> readExecutionSpecAttributes(
133 Map
<String
, ExecutionSpecAttribute
> attrs
= new HashMap
<String
, ExecutionSpecAttribute
>();
134 for (NodeIterator nit
= node
.getNodes(); nit
.hasNext();) {
135 Node specAttrNode
= nit
.nextNode();
137 .isNodeType(SlcTypes
.SLC_PRIMITIVE_SPEC_ATTRIBUTE
)) {
138 String type
= specAttrNode
.getProperty(SLC_TYPE
)
141 if (specAttrNode
.hasProperty(SLC_VALUE
)) {
142 String valueStr
= specAttrNode
.getProperty(SLC_VALUE
)
144 value
= PrimitiveUtils
.convert(type
, valueStr
);
146 PrimitiveSpecAttribute specAttr
= new PrimitiveSpecAttribute(
148 attrs
.put(specAttrNode
.getName(), specAttr
);
149 } else if (specAttrNode
150 .isNodeType(SlcTypes
.SLC_REF_SPEC_ATTRIBUTE
)) {
151 if (!specAttrNode
.hasProperty(SLC_VALUE
)) {
154 Integer value
= (int) specAttrNode
.getProperty(SLC_VALUE
)
156 RefSpecAttribute specAttr
= new RefSpecAttribute();
157 NodeIterator children
= specAttrNode
.getNodes();
160 while (children
.hasNext()) {
161 Node child
= children
.nextNode();
163 id
= child
.getName();
166 specAttr
.setValue(id
);
167 attrs
.put(specAttrNode
.getName(), specAttr
);
169 // throw new SlcException("Unsupported spec attribute "
173 } catch (RepositoryException e
) {
174 throw new SlcException("Cannot read spec attributes from " + node
,
179 protected Node
getNode() {
180 return ((JcrExecutionProcess
) getProcess()).getNode();