1 package org
.argeo
.slc
.core
.execution
;
3 import java
.util
.ArrayList
;
4 import java
.util
.Iterator
;
7 import java
.util
.TreeMap
;
9 import org
.apache
.commons
.logging
.Log
;
10 import org
.apache
.commons
.logging
.LogFactory
;
11 import org
.argeo
.slc
.SlcException
;
12 import org
.argeo
.slc
.execution
.ExecutionFlow
;
13 import org
.argeo
.slc
.execution
.ExecutionFlowDescriptor
;
14 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
15 import org
.argeo
.slc
.execution
.ExecutionModulesManager
;
16 import org
.argeo
.slc
.execution
.ExecutionSpec
;
17 import org
.argeo
.slc
.execution
.ExecutionSpecAttribute
;
18 import org
.argeo
.slc
.process
.RealizedFlow
;
19 import org
.argeo
.slc
.process
.SlcExecution
;
20 import org
.argeo
.slc
.process
.SlcExecutionNotifier
;
21 import org
.argeo
.slc
.process
.SlcExecutionStep
;
22 import org
.springframework
.aop
.scope
.ScopedObject
;
23 import org
.springframework
.util
.Assert
;
25 public abstract class AbstractExecutionModulesManager
implements
26 ExecutionModulesManager
{
27 private final static Log log
= LogFactory
28 .getLog(AbstractExecutionModulesManager
.class);
30 private List
<SlcExecutionNotifier
> slcExecutionNotifiers
= new ArrayList
<SlcExecutionNotifier
>();
31 private ThreadGroup processesThreadGroup
= new ThreadGroup("Processes");
33 public void process(SlcExecution slcExecution
) {
34 new ProcessThread(processesThreadGroup
, slcExecution
).start();
37 protected void dispatchUpdateStatus(SlcExecution slcExecution
,
38 String oldStatus
, String newStatus
) {
39 for (Iterator
<SlcExecutionNotifier
> it
= slcExecutionNotifiers
40 .iterator(); it
.hasNext();) {
41 it
.next().updateStatus(slcExecution
, oldStatus
, newStatus
);
45 protected synchronized void dispatchAddStep(SlcExecution slcExecution
,
46 SlcExecutionStep step
) {
47 slcExecution
.getSteps().add(step
);
48 List
<SlcExecutionStep
> steps
= new ArrayList
<SlcExecutionStep
>();
50 for (Iterator
<SlcExecutionNotifier
> it
= slcExecutionNotifiers
51 .iterator(); it
.hasNext();) {
52 it
.next().addSteps(slcExecution
, steps
);
56 public void setSlcExecutionNotifiers(
57 List
<SlcExecutionNotifier
> slcExecutionNotifiers
) {
58 this.slcExecutionNotifiers
= slcExecutionNotifiers
;
61 protected static ExecutionModuleDescriptor
createDescriptor(
62 String moduleName
, String moduleVersion
,
63 Map
<String
, ExecutionFlow
> executionFlows
) {
64 // TODO: put this in a separate configurable object
65 ExecutionModuleDescriptor md
= new ExecutionModuleDescriptor();
66 md
.setName(moduleName
);
67 md
.setVersion(moduleVersion
);
69 for (String name
: executionFlows
.keySet()) {
70 ExecutionFlow executionFlow
= executionFlows
.get(name
);
72 Assert
.notNull(executionFlow
.getName());
73 Assert
.state(name
.equals(executionFlow
.getName()));
75 ExecutionSpec executionSpec
= executionFlow
.getExecutionSpec();
76 Assert
.notNull(executionSpec
);
77 Assert
.notNull(executionSpec
.getName());
79 Map
<String
, Object
> values
= new TreeMap
<String
, Object
>();
80 for (String key
: executionSpec
.getAttributes().keySet()) {
81 ExecutionSpecAttribute attribute
= executionSpec
82 .getAttributes().get(key
);
84 if (executionFlow
.isSetAsParameter(key
)) {
85 Object value
= executionFlow
.getParameter(key
);
86 if (attribute
instanceof PrimitiveSpecAttribute
) {
87 PrimitiveValue primitiveValue
= new PrimitiveValue();
89 .setType(((PrimitiveSpecAttribute
) attribute
)
91 primitiveValue
.setValue(value
);
92 values
.put(key
, primitiveValue
);
93 } else if (attribute
instanceof RefSpecAttribute
) {
94 RefValue refValue
= new RefValue();
95 if (value
instanceof ScopedObject
) {
96 refValue
.setLabel("RUNTIME "
97 + value
.getClass().getName());
99 refValue
.setLabel("STATIC "
100 + value
.getClass().getName());
102 values
.put(key
, refValue
);
104 throw new SlcException("Unkown spec attribute type "
105 + attribute
.getClass());
111 ExecutionFlowDescriptor efd
= new ExecutionFlowDescriptor(name
,
112 values
, executionSpec
);
113 if (executionFlow
.getPath() != null)
114 efd
.setPath(executionFlow
.getPath());
116 // Add execution spec if necessary
117 if (!md
.getExecutionSpecs().contains(executionSpec
))
118 md
.getExecutionSpecs().add(executionSpec
);
120 // Add execution flow
121 md
.getExecutionFlows().add(efd
);
127 /** Thread of the SLC Process, starting the sub executions. */
128 private class ProcessThread
extends Thread
{
129 private final SlcExecution slcProcess
;
130 private final ThreadGroup processThreadGroup
;
131 private final List
<RealizedFlow
> flowsToProcess
= new ArrayList
<RealizedFlow
>();
133 public ProcessThread(ThreadGroup processesThreadGroup
,
134 SlcExecution slcExecution
) {
135 super(processesThreadGroup
, "SLC Process #"
136 + slcExecution
.getUuid());
137 this.slcProcess
= slcExecution
;
138 processThreadGroup
= new ThreadGroup("SLC Process #"
139 + slcExecution
.getUuid() + " thread group");
143 log
.info("\n##\n## Process SLC Execution " + slcProcess
+ "\n##\n");
145 // FIXME: hack to let the SlcExecution be registered on server
148 } catch (InterruptedException e1
) {
152 slcProcess
.setStatus(SlcExecution
.STATUS_RUNNING
);
153 dispatchUpdateStatus(slcProcess
, SlcExecution
.STATUS_SCHEDULED
,
154 SlcExecution
.STATUS_RUNNING
);
156 flowsToProcess
.addAll(slcProcess
.getRealizedFlows());
158 while (flowsToProcess
.size() > 0) {
159 RealizedFlow flow
= flowsToProcess
.remove(0);
160 ExecutionThread thread
= new ExecutionThread(this, flow
);
163 synchronized (this) {
166 } catch (InterruptedException e
) {
172 slcProcess
.setStatus(SlcExecution
.STATUS_FINISHED
);
173 dispatchUpdateStatus(slcProcess
, SlcExecution
.STATUS_RUNNING
,
174 SlcExecution
.STATUS_FINISHED
);
177 public synchronized void flowCompleted() {
181 public SlcExecution
getSlcProcess() {
185 public ThreadGroup
getProcessThreadGroup() {
186 return processThreadGroup
;
190 /** Thread of a single execution */
191 private class ExecutionThread
extends Thread
{
192 private final RealizedFlow realizedFlow
;
193 private final ProcessThread processThread
;
195 public ExecutionThread(ProcessThread processThread
,
196 RealizedFlow realizedFlow
) {
197 super(processThread
.getProcessThreadGroup(), "Flow "
198 + realizedFlow
.getFlowDescriptor().getName());
199 this.realizedFlow
= realizedFlow
;
200 this.processThread
= processThread
;
204 ExecutionFlowDescriptor executionFlowDescriptor
= realizedFlow
205 .getFlowDescriptor();
206 String flowName
= executionFlowDescriptor
.getName();
208 dispatchAddStep(processThread
.getSlcProcess(),
209 new SlcExecutionStep(SlcExecutionStep
.TYPE_PHASE_START
,
210 "Flow " + flowName
));
213 execute(realizedFlow
);
214 } catch (Exception e
) {
215 // TODO: re-throw exception ?
216 String msg
= "Execution of flow " + flowName
+ " failed.";
218 dispatchAddStep(processThread
.getSlcProcess(),
219 new SlcExecutionStep(msg
+ " " + e
.getMessage()));
221 processThread
.flowCompleted();
222 dispatchAddStep(processThread
.getSlcProcess(),
223 new SlcExecutionStep(SlcExecutionStep
.TYPE_PHASE_END
,
224 "Flow " + flowName
));