1 package org
.argeo
.slc
.core
.execution
;
3 import java
.util
.ArrayList
;
6 import org
.argeo
.slc
.core
.execution
.internal
.ProcessThread
;
7 import org
.argeo
.slc
.execution
.ExecutionModulesManager
;
8 import org
.argeo
.slc
.process
.SlcExecution
;
9 import org
.argeo
.slc
.process
.SlcExecutionNotifier
;
11 public abstract class AbstractExecutionModulesManager
implements
12 ExecutionModulesManager
{
13 private List
<SlcExecutionNotifier
> slcExecutionNotifiers
= new ArrayList
<SlcExecutionNotifier
>();
14 private ThreadGroup processesThreadGroup
= new ThreadGroup("Processes");
16 public void process(SlcExecution slcExecution
) {
17 new ProcessThread(this, slcExecution
).start();
20 protected void dispatchUpdateStatus(SlcExecution slcExecution,
21 String oldStatus, String newStatus) {
22 for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
23 .iterator(); it.hasNext();) {
24 it.next().updateStatus(slcExecution, oldStatus, newStatus);
28 protected synchronized void dispatchAddStep(SlcExecution slcExecution,
29 SlcExecutionStep step) {
30 slcExecution.getSteps().add(step);
31 List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();
33 for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
34 .iterator(); it.hasNext();) {
35 it.next().addSteps(slcExecution, steps);
39 public void setSlcExecutionNotifiers(
40 List
<SlcExecutionNotifier
> slcExecutionNotifiers
) {
41 this.slcExecutionNotifiers
= slcExecutionNotifiers
;
44 protected static void addFlowsToDescriptor(ExecutionModuleDescriptor md,
45 Map<String, ExecutionFlow> executionFlows) {
46 // TODO: put this in a separate configurable object
47 for (String name : executionFlows.keySet()) {
48 ExecutionFlow executionFlow = executionFlows.get(name);
50 Assert.notNull(executionFlow.getName());
51 Assert.state(name.equals(executionFlow.getName()));
53 ExecutionSpec executionSpec = executionFlow.getExecutionSpec();
54 Assert.notNull(executionSpec);
55 Assert.notNull(executionSpec.getName());
57 Map<String, Object> values = new TreeMap<String, Object>();
58 for (String key : executionSpec.getAttributes().keySet()) {
59 ExecutionSpecAttribute attribute = executionSpec
60 .getAttributes().get(key);
62 if (executionFlow.isSetAsParameter(key)) {
63 Object value = executionFlow.getParameter(key);
64 if (attribute instanceof PrimitiveSpecAttribute) {
65 PrimitiveValue primitiveValue = new PrimitiveValue();
67 .setType(((PrimitiveSpecAttribute) attribute)
69 primitiveValue.setValue(value);
70 values.put(key, primitiveValue);
71 } else if (attribute instanceof RefSpecAttribute) {
72 RefValue refValue = new RefValue();
73 if (value instanceof ScopedObject) {
74 refValue.setLabel("RUNTIME "
75 + value.getClass().getName());
77 refValue.setLabel("STATIC "
78 + value.getClass().getName());
80 values.put(key, refValue);
82 throw new SlcException("Unkown spec attribute type "
83 + attribute.getClass());
89 ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(name,
90 values, executionSpec);
91 if (executionFlow.getPath() != null)
92 efd.setPath(executionFlow.getPath());
94 // Add execution spec if necessary
95 if (!md.getExecutionSpecs().contains(executionSpec))
96 md.getExecutionSpecs().add(executionSpec);
99 md.getExecutionFlows().add(efd);
104 * Thread of the SLC Process, starting the sub executions. private class
105 * ProcessThread extends Thread { private final SlcExecution slcProcess;
106 * private final ThreadGroup processThreadGroup; private final
107 * List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
109 * public ProcessThread(ThreadGroup processesThreadGroup, SlcExecution
110 * slcExecution) { super(processesThreadGroup, "SLC Process #" +
111 * slcExecution.getUuid()); this.slcProcess = slcExecution;
112 * processThreadGroup = new ThreadGroup("SLC Process #" +
113 * slcExecution.getUuid() + " thread group"); }
115 * public void run() { log.info("\n##\n## Process SLC Execution " +
116 * slcProcess + "\n##\n");
118 * slcProcess.setStatus(SlcExecution.STATUS_RUNNING);
119 * dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED,
120 * SlcExecution.STATUS_RUNNING);
122 * flowsToProcess.addAll(slcProcess.getRealizedFlows());
124 * while (flowsToProcess.size() > 0) { RealizedFlow flow =
125 * flowsToProcess.remove(0); ExecutionThread thread = new
126 * ExecutionThread(this, flow); thread.start();
128 * synchronized (this) { try { wait(); } catch (InterruptedException e) { //
131 * slcProcess.setStatus(SlcExecution.STATUS_FINISHED);
132 * dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING,
133 * SlcExecution.STATUS_FINISHED); }
135 * public synchronized void flowCompleted() { notifyAll(); }
137 * public SlcExecution getSlcProcess() { return slcProcess; }
139 * public ThreadGroup getProcessThreadGroup() { return processThreadGroup; }
144 * Thread of a single execution private class ExecutionThread extends Thread
145 * { private final RealizedFlow realizedFlow; private final ProcessThread
148 * public ExecutionThread(ProcessThread processThread, RealizedFlow
149 * realizedFlow) { super(processThread.getProcessThreadGroup(), "Flow " +
150 * realizedFlow.getFlowDescriptor().getName()); this.realizedFlow =
151 * realizedFlow; this.processThread = processThread; }
153 * public void run() { ExecutionFlowDescriptor executionFlowDescriptor =
154 * realizedFlow .getFlowDescriptor(); String flowName =
155 * executionFlowDescriptor.getName();
157 * dispatchAddStep(processThread.getSlcProcess(), new
158 * SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START, "Flow " + flowName));
160 * try { execute(realizedFlow); } catch (Exception e) { // TODO: re-throw
161 * exception ? String msg = "Execution of flow " + flowName + " failed.";
162 * log.error(msg, e); dispatchAddStep(processThread.getSlcProcess(), new
163 * SlcExecutionStep(msg + " " + e.getMessage())); } finally {
164 * processThread.flowCompleted();
165 * dispatchAddStep(processThread.getSlcProcess(), new
166 * SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END, "Flow " + flowName)); }
170 public List
<SlcExecutionNotifier
> getSlcExecutionNotifiers() {
171 return slcExecutionNotifiers
;
174 public ThreadGroup
getProcessesThreadGroup() {
175 return processesThreadGroup
;