1 package org
.argeo
.slc
.core
.execution
;
3 import java
.util
.ArrayList
;
6 import org
.argeo
.slc
.execution
.ExecutionModulesManager
;
7 import org
.argeo
.slc
.process
.SlcExecution
;
8 import org
.argeo
.slc
.process
.SlcExecutionNotifier
;
10 public abstract class AbstractExecutionModulesManager
implements
11 ExecutionModulesManager
{
12 private List
<SlcExecutionNotifier
> slcExecutionNotifiers
= new ArrayList
<SlcExecutionNotifier
>();
13 private ThreadGroup processesThreadGroup
= new ThreadGroup("Processes");
15 public void process(SlcExecution slcExecution
) {
16 new ProcessThread(this, slcExecution
).start();
19 protected void dispatchUpdateStatus(SlcExecution slcExecution,
20 String oldStatus, String newStatus) {
21 for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
22 .iterator(); it.hasNext();) {
23 it.next().updateStatus(slcExecution, oldStatus, newStatus);
27 protected synchronized void dispatchAddStep(SlcExecution slcExecution,
28 SlcExecutionStep step) {
29 slcExecution.getSteps().add(step);
30 List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();
32 for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
33 .iterator(); it.hasNext();) {
34 it.next().addSteps(slcExecution, steps);
38 public void setSlcExecutionNotifiers(
39 List
<SlcExecutionNotifier
> slcExecutionNotifiers
) {
40 this.slcExecutionNotifiers
= slcExecutionNotifiers
;
43 protected static void addFlowsToDescriptor(ExecutionModuleDescriptor md,
44 Map<String, ExecutionFlow> executionFlows) {
45 // TODO: put this in a separate configurable object
46 for (String name : executionFlows.keySet()) {
47 ExecutionFlow executionFlow = executionFlows.get(name);
49 Assert.notNull(executionFlow.getName());
50 Assert.state(name.equals(executionFlow.getName()));
52 ExecutionSpec executionSpec = executionFlow.getExecutionSpec();
53 Assert.notNull(executionSpec);
54 Assert.notNull(executionSpec.getName());
56 Map<String, Object> values = new TreeMap<String, Object>();
57 for (String key : executionSpec.getAttributes().keySet()) {
58 ExecutionSpecAttribute attribute = executionSpec
59 .getAttributes().get(key);
61 if (executionFlow.isSetAsParameter(key)) {
62 Object value = executionFlow.getParameter(key);
63 if (attribute instanceof PrimitiveSpecAttribute) {
64 PrimitiveValue primitiveValue = new PrimitiveValue();
66 .setType(((PrimitiveSpecAttribute) attribute)
68 primitiveValue.setValue(value);
69 values.put(key, primitiveValue);
70 } else if (attribute instanceof RefSpecAttribute) {
71 RefValue refValue = new RefValue();
72 if (value instanceof ScopedObject) {
73 refValue.setLabel("RUNTIME "
74 + value.getClass().getName());
76 refValue.setLabel("STATIC "
77 + value.getClass().getName());
79 values.put(key, refValue);
81 throw new SlcException("Unkown spec attribute type "
82 + attribute.getClass());
88 ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(name,
89 values, executionSpec);
90 if (executionFlow.getPath() != null)
91 efd.setPath(executionFlow.getPath());
93 // Add execution spec if necessary
94 if (!md.getExecutionSpecs().contains(executionSpec))
95 md.getExecutionSpecs().add(executionSpec);
98 md.getExecutionFlows().add(efd);
103 * Thread of the SLC Process, starting the sub executions. private class
104 * ProcessThread extends Thread { private final SlcExecution slcProcess;
105 * private final ThreadGroup processThreadGroup; private final
106 * List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
108 * public ProcessThread(ThreadGroup processesThreadGroup, SlcExecution
109 * slcExecution) { super(processesThreadGroup, "SLC Process #" +
110 * slcExecution.getUuid()); this.slcProcess = slcExecution;
111 * processThreadGroup = new ThreadGroup("SLC Process #" +
112 * slcExecution.getUuid() + " thread group"); }
114 * public void run() { log.info("\n##\n## Process SLC Execution " +
115 * slcProcess + "\n##\n");
117 * slcProcess.setStatus(SlcExecution.STATUS_RUNNING);
118 * dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED,
119 * SlcExecution.STATUS_RUNNING);
121 * flowsToProcess.addAll(slcProcess.getRealizedFlows());
123 * while (flowsToProcess.size() > 0) { RealizedFlow flow =
124 * flowsToProcess.remove(0); ExecutionThread thread = new
125 * ExecutionThread(this, flow); thread.start();
127 * synchronized (this) { try { wait(); } catch (InterruptedException e) { //
130 * slcProcess.setStatus(SlcExecution.STATUS_FINISHED);
131 * dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING,
132 * SlcExecution.STATUS_FINISHED); }
134 * public synchronized void flowCompleted() { notifyAll(); }
136 * public SlcExecution getSlcProcess() { return slcProcess; }
138 * public ThreadGroup getProcessThreadGroup() { return processThreadGroup; }
143 * Thread of a single execution private class ExecutionThread extends Thread
144 * { private final RealizedFlow realizedFlow; private final ProcessThread
147 * public ExecutionThread(ProcessThread processThread, RealizedFlow
148 * realizedFlow) { super(processThread.getProcessThreadGroup(), "Flow " +
149 * realizedFlow.getFlowDescriptor().getName()); this.realizedFlow =
150 * realizedFlow; this.processThread = processThread; }
152 * public void run() { ExecutionFlowDescriptor executionFlowDescriptor =
153 * realizedFlow .getFlowDescriptor(); String flowName =
154 * executionFlowDescriptor.getName();
156 * dispatchAddStep(processThread.getSlcProcess(), new
157 * SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START, "Flow " + flowName));
159 * try { execute(realizedFlow); } catch (Exception e) { // TODO: re-throw
160 * exception ? String msg = "Execution of flow " + flowName + " failed.";
161 * log.error(msg, e); dispatchAddStep(processThread.getSlcProcess(), new
162 * SlcExecutionStep(msg + " " + e.getMessage())); } finally {
163 * processThread.flowCompleted();
164 * dispatchAddStep(processThread.getSlcProcess(), new
165 * SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END, "Flow " + flowName)); }
169 public List
<SlcExecutionNotifier
> getSlcExecutionNotifiers() {
170 return slcExecutionNotifiers
;
173 public ThreadGroup
getProcessesThreadGroup() {
174 return processesThreadGroup
;