2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org
.argeo
.slc
.core
.execution
;
19 import java
.util
.ArrayList
;
20 import java
.util
.Collections
;
21 import java
.util
.HashSet
;
22 import java
.util
.List
;
24 import java
.util
.concurrent
.ArrayBlockingQueue
;
25 import java
.util
.concurrent
.BlockingQueue
;
27 import org
.apache
.commons
.logging
.Log
;
28 import org
.apache
.commons
.logging
.LogFactory
;
29 import org
.argeo
.slc
.SlcException
;
30 import org
.argeo
.slc
.execution
.ExecutionModulesManager
;
31 import org
.argeo
.slc
.execution
.ExecutionProcess
;
32 import org
.argeo
.slc
.execution
.ExecutionStep
;
33 import org
.argeo
.slc
.process
.RealizedFlow
;
34 import org
.argeo
.slc
.process
.SlcExecution
;
36 /** Thread of the SLC Process, starting the sub executions. */
37 public class ProcessThread
extends Thread
{
38 private final static Log log
= LogFactory
.getLog(ProcessThread
.class);
40 private final ExecutionModulesManager executionModulesManager
;
41 private final ExecutionProcess process
;
42 private final ProcessThreadGroup processThreadGroup
;
44 private Set
<ExecutionThread
> executionThreads
= Collections
45 .synchronizedSet(new HashSet
<ExecutionThread
>());
47 private Boolean hadAnError
= false;
48 private Boolean killed
= false;
50 private final static Integer STEPS_BUFFER_CAPACITY
= 10000;
51 private BlockingQueue
<ExecutionStep
> steps
= new ArrayBlockingQueue
<ExecutionStep
>(
52 STEPS_BUFFER_CAPACITY
);
54 public ProcessThread(ThreadGroup processesThreadGroup
,
55 ExecutionModulesManager executionModulesManager
,
56 ExecutionProcess process
) {
57 super(processesThreadGroup
, "SLC Process #" + process
.getUuid());
58 this.executionModulesManager
= executionModulesManager
;
59 this.process
= process
;
60 processThreadGroup
= new ProcessThreadGroup(executionModulesManager
,
64 public final void run() {
65 log
.info("\n##\n## SLC Process #" + process
.getUuid()
69 new LoggingThread().start();
71 String oldStatus
= process
.getStatus();
72 process
.setStatus(ExecutionProcess
.RUNNING
);
73 executionModulesManager
.dispatchUpdateStatus(process
, oldStatus
,
74 ExecutionProcess
.RUNNING
);
78 } catch (InterruptedException e
) {
83 // waits for all execution threads to complete (in case they were
84 // started asynchronously)
85 for (ExecutionThread executionThread
: executionThreads
) {
86 if (executionThread
.isAlive()) {
88 executionThread
.join();
89 } catch (InterruptedException e
) {
99 /** Make sure this is called BEFORE all the threads are interrupted. */
100 private void computeFinalStatus() {
101 String oldStatus
= process
.getStatus();
102 // TODO: error management at flow level?
104 process
.setStatus(ExecutionProcess
.KILLED
);
106 process
.setStatus(ExecutionProcess
.ERROR
);
108 process
.setStatus(ExecutionProcess
.COMPLETED
);
109 executionModulesManager
.dispatchUpdateStatus(process
, oldStatus
,
110 process
.getStatus());
111 log
.info("\n## SLC Process #" + process
.getUuid() + " "
112 + process
.getStatus() + "\n");
115 /** Called when being killed */
116 private synchronized void die() {
118 computeFinalStatus();
119 for (ExecutionThread executionThread
: executionThreads
) {
121 executionThread
.interrupt();
122 } catch (Exception e
) {
123 log
.error("Cannot interrupt " + executionThread
);
126 processThreadGroup
.interrupt();
130 * Implementation specific execution. To be overridden in order to deal with
131 * custom process types. Default expects an {@link SlcExecution}.
133 @SuppressWarnings("deprecation")
134 protected void process() throws InterruptedException
{
135 if (!(process
instanceof SlcExecution
))
136 throw new SlcException("Unsupported process type "
137 + process
.getClass());
138 SlcExecution slcExecution
= (SlcExecution
) process
;
139 List
<RealizedFlow
> flowsToProcess
= new ArrayList
<RealizedFlow
>();
140 flowsToProcess
.addAll(slcExecution
.getRealizedFlows());
142 while (flowsToProcess
.size() > 0) {
143 RealizedFlow realizedFlow
= flowsToProcess
.remove(0);
144 execute(realizedFlow
, true);
148 /** @return the (distinct) thread used for this execution */
149 protected final void execute(RealizedFlow realizedFlow
, Boolean synchronous
)
150 throws InterruptedException
{
154 ExecutionThread thread
= new ExecutionThread(this, realizedFlow
);
155 executionThreads
.add(thread
);
164 public void notifyError() {
168 public synchronized void flowCompleted() {
172 public ExecutionProcess
getProcess() {
176 public ProcessThreadGroup
getProcessThreadGroup() {
177 return processThreadGroup
;
180 public ExecutionModulesManager
getExecutionModulesManager() {
181 return executionModulesManager
;
184 private class LoggingThread
extends Thread
{
186 public LoggingThread() {
187 super("SLC Process Logger #" + process
.getUuid());
193 List
<ExecutionStep
> newSteps
= new ArrayList
<ExecutionStep
>();
194 processThreadGroup
.getSteps().drainTo(newSteps
);
195 if (newSteps
.size() > 0) {
196 // System.out.println(steps.size() + " steps");
197 process
.addSteps(newSteps
);
202 } catch (InterruptedException e
) {
206 if (!ProcessThread
.this.isAlive()
207 && processThreadGroup
.getSteps().size() == 0)