1 package org
.argeo
.slc
.core
.execution
;
3 import java
.util
.ArrayList
;
4 import java
.util
.Iterator
;
7 import org
.apache
.commons
.logging
.Log
;
8 import org
.apache
.commons
.logging
.LogFactory
;
9 import org
.argeo
.slc
.SlcException
;
10 import org
.argeo
.slc
.execution
.ExecutionFlowDescriptor
;
11 import org
.argeo
.slc
.execution
.ExecutionModule
;
12 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
13 import org
.argeo
.slc
.execution
.ExecutionModulesManager
;
14 import org
.argeo
.slc
.process
.RealizedFlow
;
15 import org
.argeo
.slc
.process
.SlcExecution
;
16 import org
.argeo
.slc
.process
.SlcExecutionNotifier
;
17 import org
.springframework
.util
.Assert
;
19 public class DefaultModulesManager
implements ExecutionModulesManager
{
20 private final static Log log
= LogFactory
21 .getLog(DefaultModulesManager
.class);
23 private List
<ExecutionModule
> executionModules
= new ArrayList
<ExecutionModule
>();
24 private List
<SlcExecutionNotifier
> slcExecutionNotifiers
= new ArrayList
<SlcExecutionNotifier
>();
26 protected ExecutionModule
getExecutionModule(String moduleName
,
28 for (ExecutionModule moduleT
: executionModules
) {
29 if (moduleT
.getName().equals(moduleName
)) {
30 if (moduleT
.getVersion().equals(version
)) {
38 public ExecutionModuleDescriptor
getExecutionModuleDescriptor(
39 String moduleName
, String version
) {
40 ExecutionModule module
= getExecutionModule(moduleName
, version
);
43 throw new SlcException("Module "+moduleName
+" ("+version
+") not found");
45 return module
.getDescriptor();
48 public List
<ExecutionModule
> listExecutionModules() {
49 return executionModules
;
52 public void setExecutionModules(List
<ExecutionModule
> executionModules
) {
53 this.executionModules
= executionModules
;
56 public void process(SlcExecution slcExecution
) {
57 new ProcessThread(slcExecution
).start();
60 protected void dispatchUpdateStatus(SlcExecution slcExecution
,
61 String oldStatus
, String newStatus
) {
62 for (Iterator
<SlcExecutionNotifier
> it
= slcExecutionNotifiers
63 .iterator(); it
.hasNext();) {
64 it
.next().updateStatus(slcExecution
, oldStatus
, newStatus
);
68 public void setSlcExecutionNotifiers(
69 List
<SlcExecutionNotifier
> slcExecutionNotifiers
) {
70 this.slcExecutionNotifiers
= slcExecutionNotifiers
;
73 private class ProcessThread
extends Thread
{
74 private final SlcExecution slcExecution
;
75 private final List
<RealizedFlow
> flowsToProcess
= new ArrayList
<RealizedFlow
>();
77 public ProcessThread(SlcExecution slcExecution
) {
78 this.slcExecution
= slcExecution
;
82 log
.info("\n##\n## Process SLC Execution " + slcExecution
85 // FIXME: hack to let the SlcExecution be registered on server
88 } catch (InterruptedException e1
) {
92 slcExecution
.setStatus(SlcExecution
.STATUS_RUNNING
);
93 dispatchUpdateStatus(slcExecution
, SlcExecution
.STATUS_SCHEDULED
,
94 SlcExecution
.STATUS_RUNNING
);
96 flowsToProcess
.addAll(slcExecution
.getRealizedFlows());
98 while (flowsToProcess
.size() > 0) {
99 RealizedFlow flow
= flowsToProcess
.remove(0);
100 ExecutionModule module
= getExecutionModule(flow
101 .getModuleName(), flow
.getModuleVersion());
102 if (module
!= null) {
103 ExecutionThread thread
= new ExecutionThread(this, flow
104 .getFlowDescriptor(), module
);
107 throw new SlcException("ExecutionModule "
108 + flow
.getModuleName() + ", version "
109 + flow
.getModuleVersion() + " not found.");
112 synchronized (this) {
115 } catch (InterruptedException e
) {
121 slcExecution
.setStatus(SlcExecution
.STATUS_RUNNING
);
122 dispatchUpdateStatus(slcExecution
, SlcExecution
.STATUS_RUNNING
,
123 SlcExecution
.STATUS_FINISHED
);
125 * for (RealizedFlow flow : slcExecution.getRealizedFlows()) {
126 * ExecutionModule module = getExecutionModule(flow.getModuleName(),
127 * flow.getModuleVersion()); if (module != null) { ExecutionThread
128 * thread = new ExecutionThread(flow .getFlowDescriptor(), module);
129 * thread.start(); } else { throw new
130 * SlcException("ExecutionModule " + flow.getModuleName() +
131 * ", version " + flow.getModuleVersion() + " not found."); } }
135 public synchronized void flowCompleted() {
140 private class ExecutionThread
extends Thread
{
141 private final ExecutionFlowDescriptor executionFlowDescriptor
;
142 private final ExecutionModule executionModule
;
143 private final ProcessThread processThread
;
145 public ExecutionThread(ProcessThread processThread
,
146 ExecutionFlowDescriptor executionFlowDescriptor
,
147 ExecutionModule executionModule
) {
148 super("SLC Execution #" /* + executionContext.getUuid() */);
149 this.executionFlowDescriptor
= executionFlowDescriptor
;
150 this.executionModule
= executionModule
;
151 this.processThread
= processThread
;
156 executionModule
.execute(executionFlowDescriptor
);
157 } catch (Exception e
) {
158 // TODO: re-throw exception ?
159 log
.error("Execution "/* + executionContext.getUuid() */
162 processThread
.flowCompleted();