]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/ProcessThread.java
Prepare next development cycle
[gpl/argeo-slc.git] / runtime / ProcessThread.java
1 package org.argeo.slc.runtime;
2
3 import java.security.AccessControlContext;
4 import java.security.AccessController;
5 import java.security.PrivilegedActionException;
6 import java.security.PrivilegedExceptionAction;
7 import java.util.ArrayList;
8 import java.util.Collections;
9 import java.util.HashSet;
10 import java.util.List;
11 import java.util.Set;
12
13 import javax.security.auth.Subject;
14
15 import org.apache.commons.logging.Log;
16 import org.apache.commons.logging.LogFactory;
17 import org.argeo.slc.SlcException;
18 import org.argeo.slc.execution.ExecutionModulesManager;
19 import org.argeo.slc.execution.ExecutionProcess;
20 import org.argeo.slc.execution.ExecutionStep;
21 import org.argeo.slc.execution.RealizedFlow;
22
23 /**
24 * Main thread coordinating an {@link ExecutionProcess}, launching parallel or
25 * sequential {@link ExecutionThread}s.
26 */
27 public class ProcessThread extends Thread {
28 private final static Log log = LogFactory.getLog(ProcessThread.class);
29
30 private final ExecutionModulesManager executionModulesManager;
31 private final ExecutionProcess process;
32 private final ProcessThreadGroup processThreadGroup;
33
34 private Set<ExecutionThread> executionThreads = Collections.synchronizedSet(new HashSet<ExecutionThread>());
35
36 // private Boolean hadAnError = false;
37 private Boolean killed = false;
38
39 private final AccessControlContext accessControlContext;
40
41 public ProcessThread(ThreadGroup processesThreadGroup, ExecutionModulesManager executionModulesManager,
42 ExecutionProcess process) {
43 super(processesThreadGroup, "SLC Process #" + process.getUuid());
44 this.executionModulesManager = executionModulesManager;
45 this.process = process;
46 processThreadGroup = new ProcessThreadGroup(process);
47 accessControlContext = AccessController.getContext();
48 }
49
50 public final void run() {
51 // authenticate thread
52 // Authentication authentication = getProcessThreadGroup()
53 // .getAuthentication();
54 // if (authentication == null)
55 // throw new SlcException("Can only execute authenticated threads");
56 // SecurityContextHolder.getContext().setAuthentication(authentication);
57
58 log.info("\n##\n## SLC Process #" + process.getUuid() + " STARTED\n##\n");
59
60 // Start logging
61 new LoggingThread().start();
62
63 process.setStatus(ExecutionProcess.RUNNING);
64 try {
65 Subject subject = Subject.getSubject(accessControlContext);
66 try {
67 Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
68
69 @Override
70 public Void run() throws Exception {
71 process();
72 return null;
73 }
74
75 });
76 } catch (PrivilegedActionException privilegedActionException) {
77 Throwable cause = privilegedActionException.getCause();
78 if (cause instanceof InterruptedException)
79 throw (InterruptedException) cause;
80 else
81 throw new SlcException("Cannot process", cause);
82 }
83 // process();
84 } catch (InterruptedException e) {
85 die();
86 return;
87 } catch (Exception e) {
88 String msg = "Process " + getProcess().getUuid() + " failed unexpectedly.";
89 log.error(msg, e);
90 getProcessThreadGroup()
91 .dispatchAddStep(new ExecutionStep("Process", ExecutionStep.ERROR, msg + " " + e.getMessage()));
92 }
93
94 // waits for all execution threads to complete (in case they were
95 // started asynchronously)
96 for (ExecutionThread executionThread : executionThreads) {
97 if (executionThread.isAlive()) {
98 try {
99 executionThread.join();
100 } catch (InterruptedException e) {
101 die();
102 return;
103 }
104 }
105 }
106
107 computeFinalStatus();
108 }
109
110 /** Make sure this is called BEFORE all the threads are interrupted. */
111 private void computeFinalStatus() {
112 // String oldStatus = process.getStatus();
113 // TODO: error management at flow level?
114 if (killed)
115 process.setStatus(ExecutionProcess.KILLED);
116 else if (processThreadGroup.hadAnError())
117 process.setStatus(ExecutionProcess.ERROR);
118 else
119 process.setStatus(ExecutionProcess.COMPLETED);
120 // executionModulesManager.dispatchUpdateStatus(process, oldStatus,
121 // process.getStatus());
122 log.info("\n## SLC Process #" + process.getUuid() + " " + process.getStatus() + "\n");
123 }
124
125 /** Called when being killed */
126 private synchronized void die() {
127 killed = true;
128 computeFinalStatus();
129 for (ExecutionThread executionThread : executionThreads) {
130 try {
131 executionThread.interrupt();
132 } catch (Exception e) {
133 log.error("Cannot interrupt " + executionThread);
134 }
135 }
136 processThreadGroup.interrupt();
137 }
138
139 /**
140 * Implementation specific execution. To be overridden in order to deal with
141 * custom process types. Default expects an {@link SlcExecution}.
142 */
143 protected void process() throws InterruptedException {
144 List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
145 flowsToProcess.addAll(process.getRealizedFlows());
146 while (flowsToProcess.size() > 0) {
147 RealizedFlow realizedFlow = flowsToProcess.remove(0);
148 execute(realizedFlow, true);
149 }
150 }
151
152 /** @return the (distinct) thread used for this execution */
153 protected final void execute(RealizedFlow realizedFlow, Boolean synchronous) throws InterruptedException {
154 if (killed)
155 return;
156
157 ExecutionThread thread = new ExecutionThread(processThreadGroup, executionModulesManager, realizedFlow);
158 executionThreads.add(thread);
159 thread.start();
160
161 if (synchronous)
162 thread.join();
163
164 return;
165 }
166
167 // public void notifyError() {
168 // hadAnError = true;
169 // }
170 //
171 // public synchronized void flowCompleted() {
172 // // notifyAll();
173 // }
174
175 public ExecutionProcess getProcess() {
176 return process;
177 }
178
179 public ProcessThreadGroup getProcessThreadGroup() {
180 return processThreadGroup;
181 }
182
183 public ExecutionModulesManager getExecutionModulesManager() {
184 return executionModulesManager;
185 }
186
187 private class LoggingThread extends Thread {
188
189 public LoggingThread() {
190 super("SLC Process Logger #" + process.getUuid());
191 }
192
193 public void run() {
194 boolean run = true;
195 while (run) {
196 List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
197 processThreadGroup.getSteps().drainTo(newSteps);
198 if (newSteps.size() > 0) {
199 // System.out.println(steps.size() + " steps");
200 process.addSteps(newSteps);
201 }
202
203 try {
204 Thread.sleep(1000);
205 } catch (InterruptedException e) {
206 break;
207 }
208
209 if (!ProcessThread.this.isAlive() && processThreadGroup.getSteps().size() == 0)
210 run = false;
211 }
212 }
213
214 }
215 }