1 package org
.argeo
.slc
.runtime
;
3 import java
.security
.AccessControlContext
;
4 import java
.security
.AccessController
;
5 import java
.security
.PrivilegedActionException
;
6 import java
.security
.PrivilegedExceptionAction
;
7 import java
.util
.ArrayList
;
8 import java
.util
.Collections
;
9 import java
.util
.HashSet
;
10 import java
.util
.List
;
13 import javax
.security
.auth
.Subject
;
15 import org
.apache
.commons
.logging
.Log
;
16 import org
.apache
.commons
.logging
.LogFactory
;
17 import org
.argeo
.slc
.SlcException
;
18 import org
.argeo
.slc
.execution
.ExecutionModulesManager
;
19 import org
.argeo
.slc
.execution
.ExecutionProcess
;
20 import org
.argeo
.slc
.execution
.ExecutionStep
;
21 import org
.argeo
.slc
.execution
.RealizedFlow
;
24 * Main thread coordinating an {@link ExecutionProcess}, launching parallel or
25 * sequential {@link ExecutionThread}s.
27 public class ProcessThread
extends Thread
{
28 private final static Log log
= LogFactory
.getLog(ProcessThread
.class);
30 private final ExecutionModulesManager executionModulesManager
;
31 private final ExecutionProcess process
;
32 private final ProcessThreadGroup processThreadGroup
;
34 private Set
<ExecutionThread
> executionThreads
= Collections
.synchronizedSet(new HashSet
<ExecutionThread
>());
36 // private Boolean hadAnError = false;
37 private Boolean killed
= false;
39 private final AccessControlContext accessControlContext
;
41 public ProcessThread(ThreadGroup processesThreadGroup
, ExecutionModulesManager executionModulesManager
,
42 ExecutionProcess process
) {
43 super(processesThreadGroup
, "SLC Process #" + process
.getUuid());
44 this.executionModulesManager
= executionModulesManager
;
45 this.process
= process
;
46 processThreadGroup
= new ProcessThreadGroup(process
);
47 accessControlContext
= AccessController
.getContext();
50 public final void run() {
51 // authenticate thread
52 // Authentication authentication = getProcessThreadGroup()
53 // .getAuthentication();
54 // if (authentication == null)
55 // throw new SlcException("Can only execute authenticated threads");
56 // SecurityContextHolder.getContext().setAuthentication(authentication);
58 log
.info("\n##\n## SLC Process #" + process
.getUuid() + " STARTED\n##\n");
61 new LoggingThread().start();
63 process
.setStatus(ExecutionProcess
.RUNNING
);
65 Subject subject
= Subject
.getSubject(accessControlContext
);
67 Subject
.doAs(subject
, new PrivilegedExceptionAction
<Void
>() {
70 public Void
run() throws Exception
{
76 } catch (PrivilegedActionException privilegedActionException
) {
77 Throwable cause
= privilegedActionException
.getCause();
78 if (cause
instanceof InterruptedException
)
79 throw (InterruptedException
) cause
;
81 throw new SlcException("Cannot process", cause
);
84 } catch (InterruptedException e
) {
87 } catch (Exception e
) {
88 String msg
= "Process " + getProcess().getUuid() + " failed unexpectedly.";
90 getProcessThreadGroup()
91 .dispatchAddStep(new ExecutionStep("Process", ExecutionStep
.ERROR
, msg
+ " " + e
.getMessage()));
94 // waits for all execution threads to complete (in case they were
95 // started asynchronously)
96 for (ExecutionThread executionThread
: executionThreads
) {
97 if (executionThread
.isAlive()) {
99 executionThread
.join();
100 } catch (InterruptedException e
) {
107 computeFinalStatus();
110 /** Make sure this is called BEFORE all the threads are interrupted. */
111 private void computeFinalStatus() {
112 // String oldStatus = process.getStatus();
113 // TODO: error management at flow level?
115 process
.setStatus(ExecutionProcess
.KILLED
);
116 else if (processThreadGroup
.hadAnError())
117 process
.setStatus(ExecutionProcess
.ERROR
);
119 process
.setStatus(ExecutionProcess
.COMPLETED
);
120 // executionModulesManager.dispatchUpdateStatus(process, oldStatus,
121 // process.getStatus());
122 log
.info("\n## SLC Process #" + process
.getUuid() + " " + process
.getStatus() + "\n");
125 /** Called when being killed */
126 private synchronized void die() {
128 computeFinalStatus();
129 for (ExecutionThread executionThread
: executionThreads
) {
131 executionThread
.interrupt();
132 } catch (Exception e
) {
133 log
.error("Cannot interrupt " + executionThread
);
136 processThreadGroup
.interrupt();
140 * Implementation specific execution. To be overridden in order to deal with
141 * custom process types. Default expects an {@link SlcExecution}.
143 protected void process() throws InterruptedException
{
144 List
<RealizedFlow
> flowsToProcess
= new ArrayList
<RealizedFlow
>();
145 flowsToProcess
.addAll(process
.getRealizedFlows());
146 while (flowsToProcess
.size() > 0) {
147 RealizedFlow realizedFlow
= flowsToProcess
.remove(0);
148 execute(realizedFlow
, true);
152 /** @return the (distinct) thread used for this execution */
153 protected final void execute(RealizedFlow realizedFlow
, Boolean synchronous
) throws InterruptedException
{
157 ExecutionThread thread
= new ExecutionThread(processThreadGroup
, executionModulesManager
, realizedFlow
);
158 executionThreads
.add(thread
);
167 // public void notifyError() {
168 // hadAnError = true;
171 // public synchronized void flowCompleted() {
175 public ExecutionProcess
getProcess() {
179 public ProcessThreadGroup
getProcessThreadGroup() {
180 return processThreadGroup
;
183 public ExecutionModulesManager
getExecutionModulesManager() {
184 return executionModulesManager
;
187 private class LoggingThread
extends Thread
{
189 public LoggingThread() {
190 super("SLC Process Logger #" + process
.getUuid());
196 List
<ExecutionStep
> newSteps
= new ArrayList
<ExecutionStep
>();
197 processThreadGroup
.getSteps().drainTo(newSteps
);
198 if (newSteps
.size() > 0) {
199 // System.out.println(steps.size() + " steps");
200 process
.addSteps(newSteps
);
205 } catch (InterruptedException e
) {
209 if (!ProcessThread
.this.isAlive() && processThreadGroup
.getSteps().size() == 0)