]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java
Improve versioning driver
[gpl/argeo-slc.git] / runtime / org.argeo.slc.core / src / main / java / org / argeo / slc / core / execution / ProcessThread.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.core.execution;
18
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Set;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.argeo.slc.SlcException;
28 import org.argeo.slc.execution.ExecutionModulesManager;
29 import org.argeo.slc.execution.ExecutionProcess;
30 import org.argeo.slc.execution.ExecutionStep;
31 import org.argeo.slc.process.RealizedFlow;
32 import org.argeo.slc.process.SlcExecution;
33
34 /** Thread of the SLC Process, starting the sub executions. */
35 @SuppressWarnings("deprecation")
36 public class ProcessThread extends Thread {
37 private final static Log log = LogFactory.getLog(ProcessThread.class);
38
39 private final ExecutionModulesManager executionModulesManager;
40 private final ExecutionProcess process;
41 private final ProcessThreadGroup processThreadGroup;
42
43 private Set<ExecutionThread> executionThreads = Collections
44 .synchronizedSet(new HashSet<ExecutionThread>());
45
46 private Boolean hadAnError = false;
47 private Boolean killed = false;
48
49 public ProcessThread(ThreadGroup processesThreadGroup,
50 ExecutionModulesManager executionModulesManager,
51 ExecutionProcess process) {
52 super(processesThreadGroup, "SLC Process #" + process.getUuid());
53 this.executionModulesManager = executionModulesManager;
54 this.process = process;
55 processThreadGroup = new ProcessThreadGroup(executionModulesManager,
56 this);
57 }
58
59 public final void run() {
60 log.info("\n##\n## SLC Process #" + process.getUuid()
61 + " STARTED\n##\n");
62
63 // Start logging
64 new LoggingThread().start();
65
66 String oldStatus = process.getStatus();
67 process.setStatus(ExecutionProcess.RUNNING);
68 executionModulesManager.dispatchUpdateStatus(process, oldStatus,
69 ExecutionProcess.RUNNING);
70
71 try {
72 process();
73 } catch (InterruptedException e) {
74 die();
75 return;
76 }
77
78 // waits for all execution threads to complete (in case they were
79 // started asynchronously)
80 for (ExecutionThread executionThread : executionThreads) {
81 if (executionThread.isAlive()) {
82 try {
83 executionThread.join();
84 } catch (InterruptedException e) {
85 die();
86 return;
87 }
88 }
89 }
90
91 computeFinalStatus();
92 }
93
94 /** Make sure this is called BEFORE all the threads are interrupted. */
95 private void computeFinalStatus() {
96 String oldStatus = process.getStatus();
97 // TODO: error management at flow level?
98 if (killed)
99 process.setStatus(ExecutionProcess.KILLED);
100 else if (hadAnError)
101 process.setStatus(ExecutionProcess.ERROR);
102 else
103 process.setStatus(ExecutionProcess.COMPLETED);
104 executionModulesManager.dispatchUpdateStatus(process, oldStatus,
105 process.getStatus());
106 log.info("\n## SLC Process #" + process.getUuid() + " "
107 + process.getStatus() + "\n");
108 }
109
110 /** Called when being killed */
111 private synchronized void die() {
112 killed = true;
113 computeFinalStatus();
114 for (ExecutionThread executionThread : executionThreads) {
115 try {
116 executionThread.interrupt();
117 } catch (Exception e) {
118 log.error("Cannot interrupt " + executionThread);
119 }
120 }
121 processThreadGroup.interrupt();
122 }
123
124 /**
125 * Implementation specific execution. To be overridden in order to deal with
126 * custom process types. Default expects an {@link SlcExecution}.
127 */
128 protected void process() throws InterruptedException {
129 if (!(process instanceof SlcExecution))
130 throw new SlcException("Unsupported process type "
131 + process.getClass());
132 SlcExecution slcExecution = (SlcExecution) process;
133 List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
134 flowsToProcess.addAll(slcExecution.getRealizedFlows());
135
136 while (flowsToProcess.size() > 0) {
137 RealizedFlow realizedFlow = flowsToProcess.remove(0);
138 execute(realizedFlow, true);
139 }
140 }
141
142 /** @return the (distinct) thread used for this execution */
143 protected final void execute(RealizedFlow realizedFlow, Boolean synchronous)
144 throws InterruptedException {
145 if (killed)
146 return;
147
148 ExecutionThread thread = new ExecutionThread(this, realizedFlow);
149 executionThreads.add(thread);
150 thread.start();
151
152 if (synchronous)
153 thread.join();
154
155 return;
156 }
157
158 public void notifyError() {
159 hadAnError = true;
160 }
161
162 public synchronized void flowCompleted() {
163 // notifyAll();
164 }
165
166 public ExecutionProcess getProcess() {
167 return process;
168 }
169
170 public ProcessThreadGroup getProcessThreadGroup() {
171 return processThreadGroup;
172 }
173
174 public ExecutionModulesManager getExecutionModulesManager() {
175 return executionModulesManager;
176 }
177
178 private class LoggingThread extends Thread {
179
180 public LoggingThread() {
181 super("SLC Process Logger #" + process.getUuid());
182 }
183
184 public void run() {
185 boolean run = true;
186 while (run) {
187 List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
188 processThreadGroup.getSteps().drainTo(newSteps);
189 if (newSteps.size() > 0) {
190 // System.out.println(steps.size() + " steps");
191 process.addSteps(newSteps);
192 }
193
194 try {
195 Thread.sleep(1000);
196 } catch (InterruptedException e) {
197 break;
198 }
199
200 if (!ProcessThread.this.isAlive()
201 && processThreadGroup.getSteps().size() == 0)
202 run = false;
203 }
204 }
205
206 }
207 }