1 package org
.argeo
.slc
.runtime
;
3 import java
.lang
.System
.Logger
.Level
;
4 import java
.security
.AccessControlContext
;
5 import java
.security
.AccessController
;
6 import java
.security
.PrivilegedActionException
;
7 import java
.security
.PrivilegedExceptionAction
;
8 import java
.util
.ArrayList
;
9 import java
.util
.Collections
;
10 import java
.util
.HashSet
;
11 import java
.util
.List
;
14 import javax
.security
.auth
.Subject
;
16 import org
.argeo
.slc
.SlcException
;
17 import org
.argeo
.slc
.execution
.ExecutionModulesManager
;
18 import org
.argeo
.slc
.execution
.ExecutionProcess
;
19 import org
.argeo
.slc
.execution
.ExecutionStep
;
20 import org
.argeo
.slc
.execution
.RealizedFlow
;
23 * Main thread coordinating an {@link ExecutionProcess}, launching parallel or
24 * sequential {@link ExecutionThread}s.
26 public class ProcessThread
extends Thread
{
27 private final static System
.Logger logger
= System
.getLogger(ProcessThread
.class.getName());
29 private final ExecutionModulesManager executionModulesManager
;
30 private final ExecutionProcess process
;
31 private final ProcessThreadGroup processThreadGroup
;
33 private Set
<ExecutionThread
> executionThreads
= Collections
.synchronizedSet(new HashSet
<ExecutionThread
>());
35 // private Boolean hadAnError = false;
36 private Boolean killed
= false;
38 private final AccessControlContext accessControlContext
;
40 public ProcessThread(ThreadGroup processesThreadGroup
, ExecutionModulesManager executionModulesManager
,
41 ExecutionProcess process
) {
42 super(processesThreadGroup
, "SLC Process #" + process
.getUuid());
43 this.executionModulesManager
= executionModulesManager
;
44 this.process
= process
;
45 processThreadGroup
= new ProcessThreadGroup(process
);
46 accessControlContext
= AccessController
.getContext();
49 public final void run() {
50 // authenticate thread
51 // Authentication authentication = getProcessThreadGroup()
52 // .getAuthentication();
53 // if (authentication == null)
54 // throw new SlcException("Can only execute authenticated threads");
55 // SecurityContextHolder.getContext().setAuthentication(authentication);
57 logger
.log(Level
.INFO
, "\n##\n## SLC Process #" + process
.getUuid() + " STARTED\n##\n");
60 new LoggingThread().start();
62 process
.setStatus(ExecutionProcess
.RUNNING
);
64 Subject subject
= Subject
.getSubject(accessControlContext
);
66 Subject
.doAs(subject
, new PrivilegedExceptionAction
<Void
>() {
69 public Void
run() throws Exception
{
75 } catch (PrivilegedActionException privilegedActionException
) {
76 Throwable cause
= privilegedActionException
.getCause();
77 if (cause
instanceof InterruptedException
)
78 throw (InterruptedException
) cause
;
80 throw new SlcException("Cannot process", cause
);
83 } catch (InterruptedException e
) {
86 } catch (Exception e
) {
87 String msg
= "Process " + getProcess().getUuid() + " failed unexpectedly.";
88 logger
.log(Level
.ERROR
, msg
, e
);
89 getProcessThreadGroup()
90 .dispatchAddStep(new ExecutionStep("Process", ExecutionStep
.ERROR
, msg
+ " " + e
.getMessage()));
93 // waits for all execution threads to complete (in case they were
94 // started asynchronously)
95 for (ExecutionThread executionThread
: executionThreads
) {
96 if (executionThread
.isAlive()) {
98 executionThread
.join();
99 } catch (InterruptedException e
) {
106 computeFinalStatus();
109 /** Make sure this is called BEFORE all the threads are interrupted. */
110 private void computeFinalStatus() {
111 // String oldStatus = process.getStatus();
112 // TODO: error management at flow level?
114 process
.setStatus(ExecutionProcess
.KILLED
);
115 else if (processThreadGroup
.hadAnError())
116 process
.setStatus(ExecutionProcess
.ERROR
);
118 process
.setStatus(ExecutionProcess
.COMPLETED
);
119 // executionModulesManager.dispatchUpdateStatus(process, oldStatus,
120 // process.getStatus());
121 logger
.log(Level
.INFO
, "\n## SLC Process #" + process
.getUuid() + " " + process
.getStatus() + "\n");
124 /** Called when being killed */
125 private synchronized void die() {
127 computeFinalStatus();
128 for (ExecutionThread executionThread
: executionThreads
) {
130 executionThread
.interrupt();
131 } catch (Exception e
) {
132 logger
.log(Level
.ERROR
, "Cannot interrupt " + executionThread
);
135 processThreadGroup
.interrupt();
139 * Implementation specific execution. To be overridden in order to deal with
140 * custom process types. Default expects an {@link SlcExecution}.
142 protected void process() throws InterruptedException
{
143 List
<RealizedFlow
> flowsToProcess
= new ArrayList
<RealizedFlow
>();
144 flowsToProcess
.addAll(process
.getRealizedFlows());
145 while (flowsToProcess
.size() > 0) {
146 RealizedFlow realizedFlow
= flowsToProcess
.remove(0);
147 execute(realizedFlow
, true);
151 /** @return the (distinct) thread used for this execution */
152 protected final void execute(RealizedFlow realizedFlow
, Boolean synchronous
) throws InterruptedException
{
156 ExecutionThread thread
= new ExecutionThread(processThreadGroup
, executionModulesManager
, realizedFlow
);
157 executionThreads
.add(thread
);
166 // public void notifyError() {
167 // hadAnError = true;
170 // public synchronized void flowCompleted() {
174 public ExecutionProcess
getProcess() {
178 public ProcessThreadGroup
getProcessThreadGroup() {
179 return processThreadGroup
;
182 public ExecutionModulesManager
getExecutionModulesManager() {
183 return executionModulesManager
;
186 private class LoggingThread
extends Thread
{
188 public LoggingThread() {
189 super("SLC Process Logger #" + process
.getUuid());
195 List
<ExecutionStep
> newSteps
= new ArrayList
<ExecutionStep
>();
196 processThreadGroup
.getSteps().drainTo(newSteps
);
197 if (newSteps
.size() > 0) {
198 // System.out.println(steps.size() + " steps");
199 process
.addSteps(newSteps
);
204 } catch (InterruptedException e
) {
208 if (!ProcessThread
.this.isAlive() && processThreadGroup
.getSteps().size() == 0)