]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java
Use commons 0.3.3
[gpl/argeo-slc.git] / runtime / org.argeo.slc.core / src / main / java / org / argeo / slc / core / execution / ProcessThread.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.core.execution;
18
19 import java.util.ArrayList;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.Set;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.argeo.slc.SlcException;
27 import org.argeo.slc.execution.ExecutionModulesManager;
28 import org.argeo.slc.execution.ExecutionProcess;
29 import org.argeo.slc.process.RealizedFlow;
30 import org.argeo.slc.process.SlcExecution;
31
32 /** Thread of the SLC Process, starting the sub executions. */
33 public class ProcessThread extends Thread {
34 private final static Log log = LogFactory.getLog(ProcessThread.class);
35
36 private final ExecutionModulesManager executionModulesManager;
37 private final ExecutionProcess process;
38 private final ProcessThreadGroup processThreadGroup;
39
40 private Set<ExecutionThread> executionThreads = new HashSet<ExecutionThread>();
41
42 private Boolean hadAnError = false;
43
44 public ProcessThread(ExecutionModulesManager executionModulesManager,
45 ExecutionProcess process) {
46 super(executionModulesManager.getProcessesThreadGroup(),
47 "SLC Process #" + process.getUuid());
48 this.executionModulesManager = executionModulesManager;
49 this.process = process;
50 processThreadGroup = new ProcessThreadGroup(executionModulesManager,
51 this);
52 }
53
54 public void run() {
55 log.info("\n##\n## SLC Process #" + process.getUuid()
56 + " STARTED\n##\n");
57
58 process.setStatus(SlcExecution.RUNNING);
59 executionModulesManager.dispatchUpdateStatus(process,
60 SlcExecution.SCHEDULED, SlcExecution.RUNNING);
61
62 process();
63
64 // waits for all execution threads to complete (in case they were
65 // started asynchronously)
66 for (ExecutionThread executionThread : executionThreads) {
67 if (executionThread.isAlive()) {
68 try {
69 executionThread.join();
70 } catch (InterruptedException e) {
71 log.error("Execution thread " + executionThread
72 + " was interrupted");
73 }
74 }
75 }
76
77 // TODO: error management at flow level?
78 if (hadAnError)
79 process.setStatus(SlcExecution.ERROR);
80 else
81 process.setStatus(SlcExecution.COMPLETED);
82 executionModulesManager.dispatchUpdateStatus(process,
83 SlcExecution.RUNNING, process.getStatus());
84
85 log.info("\n## SLC Process #" + process.getUuid() + " COMPLETED\n");
86 }
87
88 /**
89 * Implementation specific execution. To be overridden in order to deal with
90 * custom process types. Default expects an {@link SlcExecution}.
91 */
92 protected void process() {
93 if (!(process instanceof SlcExecution))
94 throw new SlcException("Unsupported process type "
95 + process.getClass());
96 SlcExecution slcExecution = (SlcExecution) process;
97 List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
98 flowsToProcess.addAll(slcExecution.getRealizedFlows());
99
100 while (flowsToProcess.size() > 0) {
101 RealizedFlow realizedFlow = flowsToProcess.remove(0);
102 execute(realizedFlow, true);
103 }
104 }
105
106 /** @return the (distinct) thread used for this execution */
107 protected Thread execute(RealizedFlow realizedFlow, Boolean synchronous) {
108 ExecutionThread thread = new ExecutionThread(this, realizedFlow);
109 executionThreads.add(thread);
110 thread.start();
111
112 if (synchronous) {
113 try {
114 thread.join();
115 } catch (InterruptedException e) {
116 log.error("Flow " + realizedFlow + " was interrupted", e);
117 }
118 }
119 return thread;
120
121 // synchronized (this) {
122 // try {
123 // wait();
124 // } catch (InterruptedException e) {
125 // // silent
126 // }
127 // }
128 }
129
130 public void notifyError() {
131 hadAnError = true;
132 }
133
134 public synchronized void flowCompleted() {
135 // notifyAll();
136 }
137
138 public ExecutionProcess getProcess() {
139 return process;
140 }
141
142 public ProcessThreadGroup getProcessThreadGroup() {
143 return processThreadGroup;
144 }
145
146 public ExecutionModulesManager getExecutionModulesManager() {
147 return executionModulesManager;
148 }
149 }