]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java
Use ObjectList everywhere
[gpl/argeo-slc.git] / runtime / org.argeo.slc.core / src / main / java / org / argeo / slc / core / execution / AbstractExecutionModulesManager.java
1 package org.argeo.slc.core.execution;
2
3 import java.util.ArrayList;
4 import java.util.Iterator;
5 import java.util.List;
6 import java.util.Map;
7 import java.util.TreeMap;
8
9 import org.apache.commons.logging.Log;
10 import org.apache.commons.logging.LogFactory;
11 import org.argeo.slc.SlcException;
12 import org.argeo.slc.execution.ExecutionFlow;
13 import org.argeo.slc.execution.ExecutionFlowDescriptor;
14 import org.argeo.slc.execution.ExecutionModuleDescriptor;
15 import org.argeo.slc.execution.ExecutionModulesManager;
16 import org.argeo.slc.execution.ExecutionSpec;
17 import org.argeo.slc.execution.ExecutionSpecAttribute;
18 import org.argeo.slc.process.RealizedFlow;
19 import org.argeo.slc.process.SlcExecution;
20 import org.argeo.slc.process.SlcExecutionNotifier;
21 import org.argeo.slc.process.SlcExecutionStep;
22 import org.springframework.aop.scope.ScopedObject;
23 import org.springframework.util.Assert;
24
25 public abstract class AbstractExecutionModulesManager implements
26 ExecutionModulesManager {
27 private final static Log log = LogFactory
28 .getLog(AbstractExecutionModulesManager.class);
29
30 private List<SlcExecutionNotifier> slcExecutionNotifiers = new ArrayList<SlcExecutionNotifier>();
31 private ThreadGroup processesThreadGroup = new ThreadGroup("Processes");
32
33 public void process(SlcExecution slcExecution) {
34 new ProcessThread(processesThreadGroup, slcExecution).start();
35 }
36
37 protected void dispatchUpdateStatus(SlcExecution slcExecution,
38 String oldStatus, String newStatus) {
39 for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
40 .iterator(); it.hasNext();) {
41 it.next().updateStatus(slcExecution, oldStatus, newStatus);
42 }
43 }
44
45 protected synchronized void dispatchAddStep(SlcExecution slcExecution,
46 SlcExecutionStep step) {
47 slcExecution.getSteps().add(step);
48 List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();
49 steps.add(step);
50 for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
51 .iterator(); it.hasNext();) {
52 it.next().addSteps(slcExecution, steps);
53 }
54 }
55
56 public void setSlcExecutionNotifiers(
57 List<SlcExecutionNotifier> slcExecutionNotifiers) {
58 this.slcExecutionNotifiers = slcExecutionNotifiers;
59 }
60
61 protected static ExecutionModuleDescriptor createDescriptor(
62 String moduleName, String moduleVersion,
63 Map<String, ExecutionFlow> executionFlows) {
64 // TODO: put this in a separate configurable object
65 ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
66 md.setName(moduleName);
67 md.setVersion(moduleVersion);
68
69 for (String name : executionFlows.keySet()) {
70 ExecutionFlow executionFlow = executionFlows.get(name);
71
72 Assert.notNull(executionFlow.getName());
73 Assert.state(name.equals(executionFlow.getName()));
74
75 ExecutionSpec executionSpec = executionFlow.getExecutionSpec();
76 Assert.notNull(executionSpec);
77 Assert.notNull(executionSpec.getName());
78
79 Map<String, Object> values = new TreeMap<String, Object>();
80 for (String key : executionSpec.getAttributes().keySet()) {
81 ExecutionSpecAttribute attribute = executionSpec
82 .getAttributes().get(key);
83
84 if (executionFlow.isSetAsParameter(key)) {
85 Object value = executionFlow.getParameter(key);
86 if (attribute instanceof PrimitiveSpecAttribute) {
87 PrimitiveValue primitiveValue = new PrimitiveValue();
88 primitiveValue
89 .setType(((PrimitiveSpecAttribute) attribute)
90 .getType());
91 primitiveValue.setValue(value);
92 values.put(key, primitiveValue);
93 } else if (attribute instanceof RefSpecAttribute) {
94 RefValue refValue = new RefValue();
95 if (value instanceof ScopedObject) {
96 refValue.setLabel("RUNTIME "
97 + value.getClass().getName());
98 } else {
99 refValue.setLabel("STATIC "
100 + value.getClass().getName());
101 }
102 values.put(key, refValue);
103 } else {
104 throw new SlcException("Unkown spec attribute type "
105 + attribute.getClass());
106 }
107 }
108
109 }
110
111 ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(name,
112 values, executionSpec);
113 if (executionFlow.getPath() != null)
114 efd.setPath(executionFlow.getPath());
115
116 // Add execution spec if necessary
117 if (!md.getExecutionSpecs().contains(executionSpec))
118 md.getExecutionSpecs().add(executionSpec);
119
120 // Add execution flow
121 md.getExecutionFlows().add(efd);
122 }
123
124 return md;
125 }
126
127 /** Thread of the SLC Process, starting the sub executions. */
128 private class ProcessThread extends Thread {
129 private final SlcExecution slcProcess;
130 private final ThreadGroup processThreadGroup;
131 private final List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
132
133 public ProcessThread(ThreadGroup processesThreadGroup,
134 SlcExecution slcExecution) {
135 super(processesThreadGroup, "SLC Process #"
136 + slcExecution.getUuid());
137 this.slcProcess = slcExecution;
138 processThreadGroup = new ThreadGroup("SLC Process #"
139 + slcExecution.getUuid() + " thread group");
140 }
141
142 public void run() {
143 log.info("\n##\n## Process SLC Execution " + slcProcess + "\n##\n");
144
145 slcProcess.setStatus(SlcExecution.STATUS_RUNNING);
146 dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED,
147 SlcExecution.STATUS_RUNNING);
148
149 flowsToProcess.addAll(slcProcess.getRealizedFlows());
150
151 while (flowsToProcess.size() > 0) {
152 RealizedFlow flow = flowsToProcess.remove(0);
153 ExecutionThread thread = new ExecutionThread(this, flow);
154 thread.start();
155
156 synchronized (this) {
157 try {
158 wait();
159 } catch (InterruptedException e) {
160 // silent
161 }
162 }
163 }
164
165 slcProcess.setStatus(SlcExecution.STATUS_FINISHED);
166 dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING,
167 SlcExecution.STATUS_FINISHED);
168 }
169
170 public synchronized void flowCompleted() {
171 notifyAll();
172 }
173
174 public SlcExecution getSlcProcess() {
175 return slcProcess;
176 }
177
178 public ThreadGroup getProcessThreadGroup() {
179 return processThreadGroup;
180 }
181 }
182
183 /** Thread of a single execution */
184 private class ExecutionThread extends Thread {
185 private final RealizedFlow realizedFlow;
186 private final ProcessThread processThread;
187
188 public ExecutionThread(ProcessThread processThread,
189 RealizedFlow realizedFlow) {
190 super(processThread.getProcessThreadGroup(), "Flow "
191 + realizedFlow.getFlowDescriptor().getName());
192 this.realizedFlow = realizedFlow;
193 this.processThread = processThread;
194 }
195
196 public void run() {
197 ExecutionFlowDescriptor executionFlowDescriptor = realizedFlow
198 .getFlowDescriptor();
199 String flowName = executionFlowDescriptor.getName();
200
201 dispatchAddStep(processThread.getSlcProcess(),
202 new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START,
203 "Flow " + flowName));
204
205 try {
206 execute(realizedFlow);
207 } catch (Exception e) {
208 // TODO: re-throw exception ?
209 String msg = "Execution of flow " + flowName + " failed.";
210 log.error(msg, e);
211 dispatchAddStep(processThread.getSlcProcess(),
212 new SlcExecutionStep(msg + " " + e.getMessage()));
213 } finally {
214 processThread.flowCompleted();
215 dispatchAddStep(processThread.getSlcProcess(),
216 new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END,
217 "Flow " + flowName));
218 }
219 }
220 }
221
222 }