2 * Copyright (C) 2007-2012 Argeo GmbH
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org
.argeo
.slc
.core
.execution
;
18 import java
.util
.ArrayList
;
19 import java
.util
.Collections
;
20 import java
.util
.HashSet
;
21 import java
.util
.List
;
24 import org
.apache
.commons
.logging
.Log
;
25 import org
.apache
.commons
.logging
.LogFactory
;
26 import org
.argeo
.slc
.SlcException
;
27 import org
.argeo
.slc
.execution
.ExecutionModulesManager
;
28 import org
.argeo
.slc
.execution
.ExecutionProcess
;
29 import org
.argeo
.slc
.execution
.ExecutionStep
;
30 import org
.argeo
.slc
.execution
.RealizedFlow
;
31 import org
.springframework
.security
.Authentication
;
32 import org
.springframework
.security
.context
.SecurityContextHolder
;
34 public class ProcessThread
extends Thread
{
35 private final static Log log
= LogFactory
.getLog(ProcessThread
.class);
37 private final ExecutionModulesManager executionModulesManager
;
38 private final ExecutionProcess process
;
39 private final ProcessThreadGroup processThreadGroup
;
41 private Set
<ExecutionThread
> executionThreads
= Collections
42 .synchronizedSet(new HashSet
<ExecutionThread
>());
44 private Boolean hadAnError
= false;
45 private Boolean killed
= false;
47 public ProcessThread(ThreadGroup processesThreadGroup
,
48 ExecutionModulesManager executionModulesManager
,
49 ExecutionProcess process
) {
50 super(processesThreadGroup
, "SLC Process #" + process
.getUuid());
51 this.executionModulesManager
= executionModulesManager
;
52 this.process
= process
;
53 processThreadGroup
= new ProcessThreadGroup(executionModulesManager
,
57 public final void run() {
58 // authenticate thread
59 Authentication authentication
= getProcessThreadGroup()
61 if (authentication
== null)
62 throw new SlcException("Can only execute authenticated threads");
63 SecurityContextHolder
.getContext().setAuthentication(authentication
);
65 // log.info("\n##\n## SLC Process #" + process.getUuid() +
67 // + authentication.getName() + "\n##\n");
68 log
.info("\n##\n## SLC Process #" + process
.getUuid()
72 new LoggingThread().start();
74 String oldStatus
= process
.getStatus();
75 process
.setStatus(ExecutionProcess
.RUNNING
);
76 executionModulesManager
.dispatchUpdateStatus(process
, oldStatus
,
77 ExecutionProcess
.RUNNING
);
81 } catch (InterruptedException e
) {
86 // waits for all execution threads to complete (in case they were
87 // started asynchronously)
88 for (ExecutionThread executionThread
: executionThreads
) {
89 if (executionThread
.isAlive()) {
91 executionThread
.join();
92 } catch (InterruptedException e
) {
102 /** Make sure this is called BEFORE all the threads are interrupted. */
103 private void computeFinalStatus() {
104 String oldStatus
= process
.getStatus();
105 // TODO: error management at flow level?
107 process
.setStatus(ExecutionProcess
.KILLED
);
109 process
.setStatus(ExecutionProcess
.ERROR
);
111 process
.setStatus(ExecutionProcess
.COMPLETED
);
112 executionModulesManager
.dispatchUpdateStatus(process
, oldStatus
,
113 process
.getStatus());
114 log
.info("\n## SLC Process #" + process
.getUuid() + " "
115 + process
.getStatus() + "\n");
118 /** Called when being killed */
119 private synchronized void die() {
121 computeFinalStatus();
122 for (ExecutionThread executionThread
: executionThreads
) {
124 executionThread
.interrupt();
125 } catch (Exception e
) {
126 log
.error("Cannot interrupt " + executionThread
);
129 processThreadGroup
.interrupt();
133 * Implementation specific execution. To be overridden in order to deal with
134 * custom process types. Default expects an {@link SlcExecution}.
136 protected void process() throws InterruptedException
{
137 List
<RealizedFlow
> flowsToProcess
= new ArrayList
<RealizedFlow
>();
138 flowsToProcess
.addAll(process
.getRealizedFlows());
139 while (flowsToProcess
.size() > 0) {
140 RealizedFlow realizedFlow
= flowsToProcess
.remove(0);
141 execute(realizedFlow
, true);
145 /** @return the (distinct) thread used for this execution */
146 protected final void execute(RealizedFlow realizedFlow
, Boolean synchronous
)
147 throws InterruptedException
{
151 ExecutionThread thread
= new ExecutionThread(this, realizedFlow
);
152 executionThreads
.add(thread
);
161 public void notifyError() {
165 public synchronized void flowCompleted() {
169 public ExecutionProcess
getProcess() {
173 public ProcessThreadGroup
getProcessThreadGroup() {
174 return processThreadGroup
;
177 public ExecutionModulesManager
getExecutionModulesManager() {
178 return executionModulesManager
;
181 private class LoggingThread
extends Thread
{
183 public LoggingThread() {
184 super("SLC Process Logger #" + process
.getUuid());
190 List
<ExecutionStep
> newSteps
= new ArrayList
<ExecutionStep
>();
191 processThreadGroup
.getSteps().drainTo(newSteps
);
192 if (newSteps
.size() > 0) {
193 // System.out.println(steps.size() + " steps");
194 process
.addSteps(newSteps
);
199 } catch (InterruptedException e
) {
203 if (!ProcessThread
.this.isAlive()
204 && processThreadGroup
.getSteps().size() == 0)