1 package org
.argeo
.slc
.core
.execution
;
3 import java
.util
.ArrayList
;
4 import java
.util
.Iterator
;
7 import org
.apache
.commons
.logging
.Log
;
8 import org
.apache
.commons
.logging
.LogFactory
;
9 import org
.argeo
.slc
.SlcException
;
10 import org
.argeo
.slc
.execution
.ExecutionFlowDescriptor
;
11 import org
.argeo
.slc
.execution
.ExecutionModule
;
12 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
13 import org
.argeo
.slc
.execution
.ExecutionModulesManager
;
14 import org
.argeo
.slc
.process
.RealizedFlow
;
15 import org
.argeo
.slc
.process
.SlcExecution
;
16 import org
.argeo
.slc
.process
.SlcExecutionNotifier
;
17 import org
.springframework
.util
.Assert
;
19 public class DefaultModulesManager
implements ExecutionModulesManager
{
20 private final static Log log
= LogFactory
21 .getLog(DefaultModulesManager
.class);
23 private List
<ExecutionModule
> executionModules
= new ArrayList
<ExecutionModule
>();
24 private List
<SlcExecutionNotifier
> slcExecutionNotifiers
= new ArrayList
<SlcExecutionNotifier
>();
26 protected ExecutionModule
getExecutionModule(String moduleName
,
28 for (ExecutionModule moduleT
: executionModules
) {
29 if (moduleT
.getName().equals(moduleName
)) {
30 if (moduleT
.getVersion().equals(version
)) {
38 public ExecutionModuleDescriptor
getExecutionModuleDescriptor(
39 String moduleName
, String version
) {
40 ExecutionModule module
= getExecutionModule(moduleName
, version
);
42 Assert
.notNull(module
);
44 return module
.getDescriptor();
47 public List
<ExecutionModule
> listExecutionModules() {
48 return executionModules
;
51 public void setExecutionModules(List
<ExecutionModule
> executionModules
) {
52 this.executionModules
= executionModules
;
55 public void process(SlcExecution slcExecution
) {
56 new ProcessThread(slcExecution
).start();
59 protected void dispatchUpdateStatus(SlcExecution slcExecution
,
60 String oldStatus
, String newStatus
) {
61 for (Iterator
<SlcExecutionNotifier
> it
= slcExecutionNotifiers
62 .iterator(); it
.hasNext();) {
63 it
.next().updateStatus(slcExecution
, oldStatus
, newStatus
);
67 public void setSlcExecutionNotifiers(
68 List
<SlcExecutionNotifier
> slcExecutionNotifiers
) {
69 this.slcExecutionNotifiers
= slcExecutionNotifiers
;
72 private class ProcessThread
extends Thread
{
73 private final SlcExecution slcExecution
;
74 private final List
<RealizedFlow
> flowsToProcess
= new ArrayList
<RealizedFlow
>();
76 public ProcessThread(SlcExecution slcExecution
) {
77 this.slcExecution
= slcExecution
;
81 log
.info("\n##\n## Process SLC Execution " + slcExecution
84 // FIXME: hack to let the SlcExecution be registered on server
87 } catch (InterruptedException e1
) {
91 slcExecution
.setStatus(SlcExecution
.STATUS_RUNNING
);
92 dispatchUpdateStatus(slcExecution
, SlcExecution
.STATUS_SCHEDULED
,
93 SlcExecution
.STATUS_RUNNING
);
95 flowsToProcess
.addAll(slcExecution
.getRealizedFlows());
97 while (flowsToProcess
.size() > 0) {
98 RealizedFlow flow
= flowsToProcess
.remove(0);
99 ExecutionModule module
= getExecutionModule(flow
100 .getModuleName(), flow
.getModuleVersion());
101 if (module
!= null) {
102 ExecutionThread thread
= new ExecutionThread(this, flow
103 .getFlowDescriptor(), module
);
106 throw new SlcException("ExecutionModule "
107 + flow
.getModuleName() + ", version "
108 + flow
.getModuleVersion() + " not found.");
111 synchronized (this) {
114 } catch (InterruptedException e
) {
120 slcExecution
.setStatus(SlcExecution
.STATUS_RUNNING
);
121 dispatchUpdateStatus(slcExecution
, SlcExecution
.STATUS_RUNNING
,
122 SlcExecution
.STATUS_FINISHED
);
124 * for (RealizedFlow flow : slcExecution.getRealizedFlows()) {
125 * ExecutionModule module = getExecutionModule(flow.getModuleName(),
126 * flow.getModuleVersion()); if (module != null) { ExecutionThread
127 * thread = new ExecutionThread(flow .getFlowDescriptor(), module);
128 * thread.start(); } else { throw new
129 * SlcException("ExecutionModule " + flow.getModuleName() +
130 * ", version " + flow.getModuleVersion() + " not found."); } }
134 public synchronized void flowCompleted() {
139 private class ExecutionThread
extends Thread
{
140 private final ExecutionFlowDescriptor executionFlowDescriptor
;
141 private final ExecutionModule executionModule
;
142 private final ProcessThread processThread
;
144 public ExecutionThread(ProcessThread processThread
,
145 ExecutionFlowDescriptor executionFlowDescriptor
,
146 ExecutionModule executionModule
) {
147 super("SLC Execution #" /* + executionContext.getUuid() */);
148 this.executionFlowDescriptor
= executionFlowDescriptor
;
149 this.executionModule
= executionModule
;
150 this.processThread
= processThread
;
155 executionModule
.execute(executionFlowDescriptor
);
156 } catch (Exception e
) {
157 // TODO: re-throw exception ?
158 log
.error("Execution "/* + executionContext.getUuid() */
161 processThread
.flowCompleted();