]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/core/execution/DefaultModulesManager.java
Introduce revision build numbers
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.simple / src / main / java / org / argeo / slc / core / execution / DefaultModulesManager.java
1 package org.argeo.slc.core.execution;
2
3 import java.util.ArrayList;
4 import java.util.Iterator;
5 import java.util.List;
6
7 import org.apache.commons.logging.Log;
8 import org.apache.commons.logging.LogFactory;
9 import org.argeo.slc.SlcException;
10 import org.argeo.slc.execution.ExecutionFlowDescriptor;
11 import org.argeo.slc.execution.ExecutionModule;
12 import org.argeo.slc.execution.ExecutionModuleDescriptor;
13 import org.argeo.slc.execution.ExecutionModulesManager;
14 import org.argeo.slc.process.RealizedFlow;
15 import org.argeo.slc.process.SlcExecution;
16 import org.argeo.slc.process.SlcExecutionNotifier;
17 import org.argeo.slc.process.SlcExecutionStep;
18
19 public class DefaultModulesManager implements ExecutionModulesManager {
20 private final static Log log = LogFactory
21 .getLog(DefaultModulesManager.class);
22
23 private List<ExecutionModule> executionModules = new ArrayList<ExecutionModule>();
24 private List<SlcExecutionNotifier> slcExecutionNotifiers = new ArrayList<SlcExecutionNotifier>();
25 private ThreadGroup processesThreadGroup = new ThreadGroup("Processes");
26
27 protected ExecutionModule getExecutionModule(String moduleName,
28 String version) {
29 for (ExecutionModule moduleT : executionModules) {
30 if (moduleT.getName().equals(moduleName)) {
31 if (moduleT.getVersion().equals(version)) {
32 return moduleT;
33 }
34 }
35 }
36 return null;
37 }
38
39 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
40 String moduleName, String version) {
41 ExecutionModule module = getExecutionModule(moduleName, version);
42
43 if (module == null)
44 throw new SlcException("Module " + moduleName + " (" + version
45 + ") not found");
46
47 return module.getDescriptor();
48 }
49
50 public List<ExecutionModule> listExecutionModules() {
51 return executionModules;
52 }
53
54 public void setExecutionModules(List<ExecutionModule> executionModules) {
55 this.executionModules = executionModules;
56 }
57
58 public void process(SlcExecution slcExecution) {
59 new ProcessThread(processesThreadGroup, slcExecution).start();
60 }
61
62 protected void dispatchUpdateStatus(SlcExecution slcExecution,
63 String oldStatus, String newStatus) {
64 for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
65 .iterator(); it.hasNext();) {
66 it.next().updateStatus(slcExecution, oldStatus, newStatus);
67 }
68 }
69
70 protected synchronized void dispatchAddStep(SlcExecution slcExecution,
71 SlcExecutionStep step) {
72 slcExecution.getSteps().add(step);
73 List<SlcExecutionStep> steps = new ArrayList<SlcExecutionStep>();
74 steps.add(step);
75 for (Iterator<SlcExecutionNotifier> it = slcExecutionNotifiers
76 .iterator(); it.hasNext();) {
77 it.next().addSteps(slcExecution, steps);
78 }
79 }
80
81 public void setSlcExecutionNotifiers(
82 List<SlcExecutionNotifier> slcExecutionNotifiers) {
83 this.slcExecutionNotifiers = slcExecutionNotifiers;
84 }
85
86 /** Thread of the SLC Process, starting the sub executions. */
87 private class ProcessThread extends Thread {
88 private final SlcExecution slcProcess;
89 private final ThreadGroup processThreadGroup;
90 private final List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
91
92 public ProcessThread(ThreadGroup processesThreadGroup,
93 SlcExecution slcExecution) {
94 super(processesThreadGroup, "SLC Process #"
95 + slcExecution.getUuid());
96 this.slcProcess = slcExecution;
97 processThreadGroup = new ThreadGroup("SLC Process #"
98 + slcExecution.getUuid() + " thread group");
99 }
100
101 public void run() {
102 log.info("\n##\n## Process SLC Execution " + slcProcess + "\n##\n");
103
104 // FIXME: hack to let the SlcExecution be registered on server
105 try {
106 Thread.sleep(500);
107 } catch (InterruptedException e1) {
108 // silent
109 }
110
111 slcProcess.setStatus(SlcExecution.STATUS_RUNNING);
112 dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_SCHEDULED,
113 SlcExecution.STATUS_RUNNING);
114
115 flowsToProcess.addAll(slcProcess.getRealizedFlows());
116
117 while (flowsToProcess.size() > 0) {
118 RealizedFlow flow = flowsToProcess.remove(0);
119 ExecutionModule module = getExecutionModule(flow
120 .getModuleName(), flow.getModuleVersion());
121 if (module != null) {
122 ExecutionThread thread = new ExecutionThread(this, flow
123 .getFlowDescriptor(), module);
124 thread.start();
125 } else {
126 throw new SlcException("ExecutionModule "
127 + flow.getModuleName() + ", version "
128 + flow.getModuleVersion() + " not found.");
129 }
130
131 synchronized (this) {
132 try {
133 wait();
134 } catch (InterruptedException e) {
135 // silent
136 }
137 }
138 }
139
140 slcProcess.setStatus(SlcExecution.STATUS_FINISHED);
141 dispatchUpdateStatus(slcProcess, SlcExecution.STATUS_RUNNING,
142 SlcExecution.STATUS_FINISHED);
143 }
144
145 public synchronized void flowCompleted() {
146 notifyAll();
147 }
148
149 public SlcExecution getSlcProcess() {
150 return slcProcess;
151 }
152
153 public ThreadGroup getProcessThreadGroup() {
154 return processThreadGroup;
155 }
156 }
157
158 /** Thread of a single execution */
159 private class ExecutionThread extends Thread {
160 private final ExecutionFlowDescriptor executionFlowDescriptor;
161 private final ExecutionModule executionModule;
162 private final ProcessThread processThread;
163
164 public ExecutionThread(ProcessThread processThread,
165 ExecutionFlowDescriptor executionFlowDescriptor,
166 ExecutionModule executionModule) {
167 super(processThread.getProcessThreadGroup(), "Flow "
168 + executionFlowDescriptor.getName());
169 this.executionFlowDescriptor = executionFlowDescriptor;
170 this.executionModule = executionModule;
171 this.processThread = processThread;
172 }
173
174 public void run() {
175 dispatchAddStep(processThread.getSlcProcess(),
176 new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_START,
177 "Flow " + executionFlowDescriptor.getName()));
178
179 try {
180 executionModule.execute(executionFlowDescriptor);
181 } catch (Exception e) {
182 // TODO: re-throw exception ?
183 String msg = "Execution of flow "
184 + executionFlowDescriptor.getName() + " failed.";
185 log.error(msg, e);
186 dispatchAddStep(processThread.getSlcProcess(),
187 new SlcExecutionStep(msg + " " + e.getMessage()));
188 } finally {
189 processThread.flowCompleted();
190 dispatchAddStep(processThread.getSlcProcess(),
191 new SlcExecutionStep(SlcExecutionStep.TYPE_PHASE_END,
192 "Flow " + executionFlowDescriptor.getName()));
193 }
194 }
195 }
196 }