]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/AbstractExecutionModulesManager.java
Refactor runtime
[gpl/argeo-slc.git] / runtime / org.argeo.slc.core / src / main / java / org / argeo / slc / core / execution / AbstractExecutionModulesManager.java
1 package org.argeo.slc.core.execution;
2
3 import java.util.ArrayList;
4 import java.util.Iterator;
5 import java.util.List;
6 import java.util.Map;
7 import java.util.TreeMap;
8
9 import org.apache.commons.logging.Log;
10 import org.apache.commons.logging.LogFactory;
11 import org.argeo.slc.SlcException;
12 import org.argeo.slc.execution.ExecutionFlow;
13 import org.argeo.slc.execution.ExecutionFlowDescriptor;
14 import org.argeo.slc.execution.ExecutionModuleDescriptor;
15 import org.argeo.slc.execution.ExecutionModulesManager;
16 import org.argeo.slc.execution.ExecutionSpec;
17 import org.argeo.slc.execution.ExecutionSpecAttribute;
18 import org.argeo.slc.process.RealizedFlow;
19 import org.argeo.slc.process.SlcExecution;
20 import org.argeo.slc.process.SlcExecutionNotifier;
21 import org.argeo.slc.process.SlcExecutionStep;
22 import org.springframework.aop.scope.ScopedObject;
23 import org.springframework.util.Assert;
24
25 public abstract class AbstractExecutionModulesManager implements
26 ExecutionModulesManager {
27 private final static Log log = LogFactory
28 .getLog(AbstractExecutionModulesManager.class);
29
30 private List<SlcExecutionNotifier> slcExecutionNotifiers = new ArrayList<SlcExecutionNotifier>();
31 private ThreadGroup processesThreadGroup = new ThreadGroup("Processes");
32
33 public void process(SlcExecution slcExecution) {
34 new ProcessThread(processesThreadGroup, slcExecution).start();
35 }
36
37 protected void dispatchUpdateStatus(SlcExecution slcExecution,
38 String oldStatus, String newStatus) {
39 for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
40 .iterator(); it.hasNext();) {
41 it.next().updateStatus(slcExecution, oldStatus, newStatus);
42 }
43 }
44
45 protected synchronized void dispatchAddStep(SlcExecution slcExecution,
46 SlcExecutionStep step) {
47 slcExecution.getSteps().add(step);
48 List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();
49 steps.add(step);
50 for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
51 .iterator(); it.hasNext();) {
52 it.next().addSteps(slcExecution, steps);
53 }
54 }
55
56 public void setSlcExecutionNotifiers(
57 List<SlcExecutionNotifier> slcExecutionNotifiers) {
58 this.slcExecutionNotifiers = slcExecutionNotifiers;
59 }
60
61 protected static ExecutionModuleDescriptor createDescriptor(
62 String moduleName, String moduleVersion,
63 Map<String, ExecutionFlow> executionFlows) {
64 // TODO: put this in a separate configurable object
65 ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
66 md.setName(moduleName);
67 md.setVersion(moduleVersion);
68
69 for (String name : executionFlows.keySet()) {
70 ExecutionFlow executionFlow = executionFlows.get(name);
71
72 Assert.notNull(executionFlow.getName());
73 Assert.state(name.equals(executionFlow.getName()));
74
75 ExecutionSpec executionSpec = executionFlow.getExecutionSpec();
76 Assert.notNull(executionSpec);
77 Assert.notNull(executionSpec.getName());
78
79 Map<String, Object> values = new TreeMap<String, Object>();
80 for (String key : executionSpec.getAttributes().keySet()) {
81 ExecutionSpecAttribute attribute = executionSpec
82 .getAttributes().get(key);
83
84 if (executionFlow.isSetAsParameter(key)) {
85 Object value = executionFlow.getParameter(key);
86 if (attribute instanceof PrimitiveSpecAttribute) {
87 PrimitiveValue primitiveValue = new PrimitiveValue();
88 primitiveValue
89 .setType(((PrimitiveSpecAttribute) attribute)
90 .getType());
91 primitiveValue.setValue(value);
92 values.put(key, primitiveValue);
93 } else if (attribute instanceof RefSpecAttribute) {
94 RefValue refValue = new RefValue();
95 if (value instanceof ScopedObject) {
96 refValue.setLabel("RUNTIME "
97 + value.getClass().getName());
98 } else {
99 refValue.setLabel("STATIC "
100 + value.getClass().getName());
101 }
102 values.put(key, refValue);
103 } else {
104 throw new SlcException("Unkown spec attribute type "
105 + attribute.getClass());
106 }
107 }
108
109 }
110
111 ExecutionFlowDescriptor efd = new ExecutionFlowDescriptor(name,
112 values, executionSpec);
113 if (executionFlow.getPath() != null)
114 efd.setPath(executionFlow.getPath());
115
116 // Add execution spec if necessary
117 if (!md.getExecutionSpecs().contains(executionSpec))
118 md.getExecutionSpecs().add(executionSpec);
119
120 // Add execution flow
121 md.getExecutionFlows().add(efd);
122 }
123
124 return md;
125 }
126
127 /** Thread of the SLC Process, starting the sub executions. */
128 private class ProcessThread extends Thread {
129 private final SlcExecution slcProcess;
130 private final ThreadGroup processThreadGroup;
131 private final List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
132
133 public ProcessThread(ThreadGroup processesThreadGroup,
134 SlcExecution slcExecution) {
135 super(processesThreadGroup, "SLC Process #"
136 + slcExecution.getUuid());
137 this.slcProcess = slcExecution;
138 processThreadGroup = new ThreadGroup("SLC Process #"
139 + slcExecution.getUuid() + " thread group");
140 }
141
142 public void run() {
143 log.info("\n##\n## Process SLC Execution " + slcProcess + "\n##\n");
144
145 // FIXME: hack to let the SlcExecution be registered on server
146 try {
147 Thread.sleep(500);
148 } catch (InterruptedException e1) {
149 // silent
150 }
151
152 slcProcess.setStatus(SlcExecution.STATUS_RUNNING);
153 dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED,
154 SlcExecution.STATUS_RUNNING);
155
156 flowsToProcess.addAll(slcProcess.getRealizedFlows());
157
158 while (flowsToProcess.size() > 0) {
159 RealizedFlow flow = flowsToProcess.remove(0);
160 ExecutionThread thread = new ExecutionThread(this, flow);
161 thread.start();
162
163 synchronized (this) {
164 try {
165 wait();
166 } catch (InterruptedException e) {
167 // silent
168 }
169 }
170 }
171
172 slcProcess.setStatus(SlcExecution.STATUS_FINISHED);
173 dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING,
174 SlcExecution.STATUS_FINISHED);
175 }
176
177 public synchronized void flowCompleted() {
178 notifyAll();
179 }
180
181 public SlcExecution getSlcProcess() {
182 return slcProcess;
183 }
184
185 public ThreadGroup getProcessThreadGroup() {
186 return processThreadGroup;
187 }
188 }
189
190 /** Thread of a single execution */
191 private class ExecutionThread extends Thread {
192 private final RealizedFlow realizedFlow;
193 private final ProcessThread processThread;
194
195 public ExecutionThread(ProcessThread processThread,
196 RealizedFlow realizedFlow) {
197 super(processThread.getProcessThreadGroup(), "Flow "
198 + realizedFlow.getFlowDescriptor().getName());
199 this.realizedFlow = realizedFlow;
200 this.processThread = processThread;
201 }
202
203 public void run() {
204 ExecutionFlowDescriptor executionFlowDescriptor = realizedFlow
205 .getFlowDescriptor();
206 String flowName = executionFlowDescriptor.getName();
207
208 dispatchAddStep(processThread.getSlcProcess(),
209 new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START,
210 "Flow " + flowName));
211
212 try {
213 execute(realizedFlow);
214 } catch (Exception e) {
215 // TODO: re-throw exception ?
216 String msg = "Execution of flow " + flowName + " failed.";
217 log.error(msg, e);
218 dispatchAddStep(processThread.getSlcProcess(),
219 new SlcExecutionStep(msg + " " + e.getMessage()));
220 } finally {
221 processThread.flowCompleted();
222 dispatchAddStep(processThread.getSlcProcess(),
223 new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END,
224 "Flow " + flowName));
225 }
226 }
227 }
228
229 }