1 package org
.argeo
.slc
.core
.execution
;
3 import java
.util
.ArrayList
;
4 import java
.util
.Iterator
;
7 import org
.apache
.commons
.logging
.Log
;
8 import org
.apache
.commons
.logging
.LogFactory
;
9 import org
.argeo
.slc
.SlcException
;
10 import org
.argeo
.slc
.execution
.ExecutionFlowDescriptor
;
11 import org
.argeo
.slc
.execution
.ExecutionModule
;
12 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
13 import org
.argeo
.slc
.execution
.ExecutionModulesManager
;
14 import org
.argeo
.slc
.process
.RealizedFlow
;
15 import org
.argeo
.slc
.process
.SlcExecution
;
16 import org
.argeo
.slc
.process
.SlcExecutionNotifier
;
17 import org
.argeo
.slc
.process
.SlcExecutionStep
;
19 public class DefaultModulesManager
implements ExecutionModulesManager
{
20 private final static Log log
= LogFactory
21 .getLog(DefaultModulesManager
.class);
23 private List
<ExecutionModule
> executionModules
= new ArrayList
<ExecutionModule
>();
24 private List
<SlcExecutionNotifier
> slcExecutionNotifiers
= new ArrayList
<SlcExecutionNotifier
>();
25 private ThreadGroup processesThreadGroup
= new ThreadGroup("Processes");
27 protected ExecutionModule
getExecutionModule(String moduleName
,
29 for (ExecutionModule moduleT
: executionModules
) {
30 if (moduleT
.getName().equals(moduleName
)) {
31 if (moduleT
.getVersion().equals(version
)) {
39 public ExecutionModuleDescriptor
getExecutionModuleDescriptor(
40 String moduleName
, String version
) {
41 ExecutionModule module
= getExecutionModule(moduleName
, version
);
44 throw new SlcException("Module " + moduleName
+ " (" + version
47 return module
.getDescriptor();
50 public List
<ExecutionModule
> listExecutionModules() {
51 return executionModules
;
54 public void setExecutionModules(List
<ExecutionModule
> executionModules
) {
55 this.executionModules
= executionModules
;
58 public void process(SlcExecution slcExecution
) {
59 new ProcessThread(processesThreadGroup
, slcExecution
).start();
62 protected void dispatchUpdateStatus(SlcExecution slcExecution
,
63 String oldStatus
, String newStatus
) {
64 for (Iterator
<SlcExecutionNotifier
> it
= slcExecutionNotifiers
65 .iterator(); it
.hasNext();) {
66 it
.next().updateStatus(slcExecution
, oldStatus
, newStatus
);
70 protected synchronized void dispatchAddStep(SlcExecution slcExecution
,
71 SlcExecutionStep step
) {
72 slcExecution
.getSteps().add(step
);
73 List
<SlcExecutionStep
> steps
= new ArrayList
<SlcExecutionStep
>();
75 for (Iterator
<SlcExecutionNotifier
> it
= slcExecutionNotifiers
76 .iterator(); it
.hasNext();) {
77 it
.next().addSteps(slcExecution
, steps
);
81 public void setSlcExecutionNotifiers(
82 List
<SlcExecutionNotifier
> slcExecutionNotifiers
) {
83 this.slcExecutionNotifiers
= slcExecutionNotifiers
;
86 /** Thread of the SLC Process, starting the sub executions. */
87 private class ProcessThread
extends Thread
{
88 private final SlcExecution slcProcess
;
89 private final ThreadGroup processThreadGroup
;
90 private final List
<RealizedFlow
> flowsToProcess
= new ArrayList
<RealizedFlow
>();
92 public ProcessThread(ThreadGroup processesThreadGroup
,
93 SlcExecution slcExecution
) {
94 super(processesThreadGroup
, "SLC Process #"
95 + slcExecution
.getUuid());
96 this.slcProcess
= slcExecution
;
97 processThreadGroup
= new ThreadGroup("SLC Process #"
98 + slcExecution
.getUuid() + " thread group");
102 log
.info("\n##\n## Process SLC Execution " + slcProcess
+ "\n##\n");
104 // FIXME: hack to let the SlcExecution be registered on server
107 } catch (InterruptedException e1
) {
111 slcProcess
.setStatus(SlcExecution
.STATUS_RUNNING
);
112 dispatchUpdateStatus(slcProcess
, SlcExecution
.STATUS_SCHEDULED
,
113 SlcExecution
.STATUS_RUNNING
);
115 flowsToProcess
.addAll(slcProcess
.getRealizedFlows());
117 while (flowsToProcess
.size() > 0) {
118 RealizedFlow flow
= flowsToProcess
.remove(0);
119 ExecutionModule module
= getExecutionModule(flow
120 .getModuleName(), flow
.getModuleVersion());
121 if (module
!= null) {
122 ExecutionThread thread
= new ExecutionThread(this, flow
123 .getFlowDescriptor(), module
);
126 throw new SlcException("ExecutionModule "
127 + flow
.getModuleName() + ", version "
128 + flow
.getModuleVersion() + " not found.");
131 synchronized (this) {
134 } catch (InterruptedException e
) {
140 slcProcess
.setStatus(SlcExecution
.STATUS_FINISHED
);
141 dispatchUpdateStatus(slcProcess
, SlcExecution
.STATUS_RUNNING
,
142 SlcExecution
.STATUS_FINISHED
);
145 public synchronized void flowCompleted() {
149 public SlcExecution
getSlcProcess() {
153 public ThreadGroup
getProcessThreadGroup() {
154 return processThreadGroup
;
158 /** Thread of a single execution */
159 private class ExecutionThread
extends Thread
{
160 private final ExecutionFlowDescriptor executionFlowDescriptor
;
161 private final ExecutionModule executionModule
;
162 private final ProcessThread processThread
;
164 public ExecutionThread(ProcessThread processThread
,
165 ExecutionFlowDescriptor executionFlowDescriptor
,
166 ExecutionModule executionModule
) {
167 super(processThread
.getProcessThreadGroup(), "Flow "
168 + executionFlowDescriptor
.getName());
169 this.executionFlowDescriptor
= executionFlowDescriptor
;
170 this.executionModule
= executionModule
;
171 this.processThread
= processThread
;
175 dispatchAddStep(processThread
.getSlcProcess(),
176 new SlcExecutionStep(SlcExecutionStep
.TYPE_PHASE_START
,
177 "Flow " + executionFlowDescriptor
.getName()));
180 executionModule
.execute(executionFlowDescriptor
);
181 } catch (Exception e
) {
182 // TODO: re-throw exception ?
183 String msg
= "Execution of flow "
184 + executionFlowDescriptor
.getName() + " failed.";
186 dispatchAddStep(processThread
.getSlcProcess(),
187 new SlcExecutionStep(msg
+ " " + e
.getMessage()));
189 processThread
.flowCompleted();
190 dispatchAddStep(processThread
.getSlcProcess(),
191 new SlcExecutionStep(SlcExecutionStep
.TYPE_PHASE_END
,
192 "Flow " + executionFlowDescriptor
.getName()));