2 * Copyright (C) 2007-2012 Argeo GmbH
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org
.argeo
.slc
.core
.execution
;
18 import java
.security
.AccessControlContext
;
19 import java
.security
.AccessController
;
20 import java
.security
.PrivilegedActionException
;
21 import java
.security
.PrivilegedExceptionAction
;
22 import java
.util
.ArrayList
;
23 import java
.util
.Collections
;
24 import java
.util
.HashSet
;
25 import java
.util
.List
;
28 import javax
.security
.auth
.Subject
;
30 import org
.apache
.commons
.logging
.Log
;
31 import org
.apache
.commons
.logging
.LogFactory
;
32 import org
.argeo
.slc
.SlcException
;
33 import org
.argeo
.slc
.execution
.ExecutionModulesManager
;
34 import org
.argeo
.slc
.execution
.ExecutionProcess
;
35 import org
.argeo
.slc
.execution
.ExecutionStep
;
36 import org
.argeo
.slc
.execution
.RealizedFlow
;
39 * Main thread coordinating an {@link ExecutionProcess}, launching parallel or
40 * sequential {@link ExecutionThread}s.
42 public class ProcessThread
extends Thread
{
43 private final static Log log
= LogFactory
.getLog(ProcessThread
.class);
45 private final ExecutionModulesManager executionModulesManager
;
46 private final ExecutionProcess process
;
47 private final ProcessThreadGroup processThreadGroup
;
49 private Set
<ExecutionThread
> executionThreads
= Collections
.synchronizedSet(new HashSet
<ExecutionThread
>());
51 // private Boolean hadAnError = false;
52 private Boolean killed
= false;
54 private final AccessControlContext accessControlContext
;
56 public ProcessThread(ThreadGroup processesThreadGroup
, ExecutionModulesManager executionModulesManager
,
57 ExecutionProcess process
) {
58 super(processesThreadGroup
, "SLC Process #" + process
.getUuid());
59 this.executionModulesManager
= executionModulesManager
;
60 this.process
= process
;
61 processThreadGroup
= new ProcessThreadGroup(process
);
62 accessControlContext
= AccessController
.getContext();
65 public final void run() {
66 // authenticate thread
67 // Authentication authentication = getProcessThreadGroup()
68 // .getAuthentication();
69 // if (authentication == null)
70 // throw new SlcException("Can only execute authenticated threads");
71 // SecurityContextHolder.getContext().setAuthentication(authentication);
73 log
.info("\n##\n## SLC Process #" + process
.getUuid() + " STARTED\n##\n");
76 new LoggingThread().start();
78 process
.setStatus(ExecutionProcess
.RUNNING
);
80 Subject subject
= Subject
.getSubject(accessControlContext
);
82 Subject
.doAs(subject
, new PrivilegedExceptionAction
<Void
>() {
85 public Void
run() throws Exception
{
91 } catch (PrivilegedActionException privilegedActionException
) {
92 Throwable cause
= privilegedActionException
.getCause();
93 if (cause
instanceof InterruptedException
)
94 throw (InterruptedException
) cause
;
96 throw new SlcException("Cannot process", cause
);
99 } catch (InterruptedException e
) {
102 } catch (Exception e
) {
103 String msg
= "Process " + getProcess().getUuid() + " failed unexpectedly.";
105 getProcessThreadGroup()
106 .dispatchAddStep(new ExecutionStep("Process", ExecutionStep
.ERROR
, msg
+ " " + e
.getMessage()));
109 // waits for all execution threads to complete (in case they were
110 // started asynchronously)
111 for (ExecutionThread executionThread
: executionThreads
) {
112 if (executionThread
.isAlive()) {
114 executionThread
.join();
115 } catch (InterruptedException e
) {
122 computeFinalStatus();
125 /** Make sure this is called BEFORE all the threads are interrupted. */
126 private void computeFinalStatus() {
127 // String oldStatus = process.getStatus();
128 // TODO: error management at flow level?
130 process
.setStatus(ExecutionProcess
.KILLED
);
131 else if (processThreadGroup
.hadAnError())
132 process
.setStatus(ExecutionProcess
.ERROR
);
134 process
.setStatus(ExecutionProcess
.COMPLETED
);
135 // executionModulesManager.dispatchUpdateStatus(process, oldStatus,
136 // process.getStatus());
137 log
.info("\n## SLC Process #" + process
.getUuid() + " " + process
.getStatus() + "\n");
140 /** Called when being killed */
141 private synchronized void die() {
143 computeFinalStatus();
144 for (ExecutionThread executionThread
: executionThreads
) {
146 executionThread
.interrupt();
147 } catch (Exception e
) {
148 log
.error("Cannot interrupt " + executionThread
);
151 processThreadGroup
.interrupt();
155 * Implementation specific execution. To be overridden in order to deal with
156 * custom process types. Default expects an {@link SlcExecution}.
158 protected void process() throws InterruptedException
{
159 List
<RealizedFlow
> flowsToProcess
= new ArrayList
<RealizedFlow
>();
160 flowsToProcess
.addAll(process
.getRealizedFlows());
161 while (flowsToProcess
.size() > 0) {
162 RealizedFlow realizedFlow
= flowsToProcess
.remove(0);
163 execute(realizedFlow
, true);
167 /** @return the (distinct) thread used for this execution */
168 protected final void execute(RealizedFlow realizedFlow
, Boolean synchronous
) throws InterruptedException
{
172 ExecutionThread thread
= new ExecutionThread(processThreadGroup
, executionModulesManager
, realizedFlow
);
173 executionThreads
.add(thread
);
182 // public void notifyError() {
183 // hadAnError = true;
186 // public synchronized void flowCompleted() {
190 public ExecutionProcess
getProcess() {
194 public ProcessThreadGroup
getProcessThreadGroup() {
195 return processThreadGroup
;
198 public ExecutionModulesManager
getExecutionModulesManager() {
199 return executionModulesManager
;
202 private class LoggingThread
extends Thread
{
204 public LoggingThread() {
205 super("SLC Process Logger #" + process
.getUuid());
211 List
<ExecutionStep
> newSteps
= new ArrayList
<ExecutionStep
>();
212 processThreadGroup
.getSteps().drainTo(newSteps
);
213 if (newSteps
.size() > 0) {
214 // System.out.println(steps.size() + " steps");
215 process
.addSteps(newSteps
);
220 } catch (InterruptedException e
) {
224 if (!ProcessThread
.this.isAlive() && processThreadGroup
.getSteps().size() == 0)