]> git.argeo.org Git - gpl/argeo-slc.git/blob - org.argeo.slc.runtime/src/org/argeo/slc/runtime/ProcessThread.java
Introduce mbox support
[gpl/argeo-slc.git] / org.argeo.slc.runtime / src / org / argeo / slc / runtime / ProcessThread.java
1 package org.argeo.slc.runtime;
2
3 import java.lang.System.Logger.Level;
4 import java.security.AccessControlContext;
5 import java.security.AccessController;
6 import java.security.PrivilegedActionException;
7 import java.security.PrivilegedExceptionAction;
8 import java.util.ArrayList;
9 import java.util.Collections;
10 import java.util.HashSet;
11 import java.util.List;
12 import java.util.Set;
13
14 import javax.security.auth.Subject;
15
16 import org.argeo.slc.SlcException;
17 import org.argeo.slc.execution.ExecutionModulesManager;
18 import org.argeo.slc.execution.ExecutionProcess;
19 import org.argeo.slc.execution.ExecutionStep;
20 import org.argeo.slc.execution.RealizedFlow;
21
22 /**
23 * Main thread coordinating an {@link ExecutionProcess}, launching parallel or
24 * sequential {@link ExecutionThread}s.
25 */
26 public class ProcessThread extends Thread {
27 private final static System.Logger logger = System.getLogger(ProcessThread.class.getName());
28
29 private final ExecutionModulesManager executionModulesManager;
30 private final ExecutionProcess process;
31 private final ProcessThreadGroup processThreadGroup;
32
33 private Set<ExecutionThread> executionThreads = Collections.synchronizedSet(new HashSet<ExecutionThread>());
34
35 // private Boolean hadAnError = false;
36 private Boolean killed = false;
37
38 private final AccessControlContext accessControlContext;
39
40 public ProcessThread(ThreadGroup processesThreadGroup, ExecutionModulesManager executionModulesManager,
41 ExecutionProcess process) {
42 super(processesThreadGroup, "SLC Process #" + process.getUuid());
43 this.executionModulesManager = executionModulesManager;
44 this.process = process;
45 processThreadGroup = new ProcessThreadGroup(process);
46 accessControlContext = AccessController.getContext();
47 }
48
49 public final void run() {
50 // authenticate thread
51 // Authentication authentication = getProcessThreadGroup()
52 // .getAuthentication();
53 // if (authentication == null)
54 // throw new SlcException("Can only execute authenticated threads");
55 // SecurityContextHolder.getContext().setAuthentication(authentication);
56
57 logger.log(Level.INFO, "\n##\n## SLC Process #" + process.getUuid() + " STARTED\n##\n");
58
59 // Start logging
60 new LoggingThread().start();
61
62 process.setStatus(ExecutionProcess.RUNNING);
63 try {
64 Subject subject = Subject.getSubject(accessControlContext);
65 try {
66 Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
67
68 @Override
69 public Void run() throws Exception {
70 process();
71 return null;
72 }
73
74 });
75 } catch (PrivilegedActionException privilegedActionException) {
76 Throwable cause = privilegedActionException.getCause();
77 if (cause instanceof InterruptedException)
78 throw (InterruptedException) cause;
79 else
80 throw new SlcException("Cannot process", cause);
81 }
82 // process();
83 } catch (InterruptedException e) {
84 die();
85 return;
86 } catch (Exception e) {
87 String msg = "Process " + getProcess().getUuid() + " failed unexpectedly.";
88 logger.log(Level.ERROR, msg, e);
89 getProcessThreadGroup()
90 .dispatchAddStep(new ExecutionStep("Process", ExecutionStep.ERROR, msg + " " + e.getMessage()));
91 }
92
93 // waits for all execution threads to complete (in case they were
94 // started asynchronously)
95 for (ExecutionThread executionThread : executionThreads) {
96 if (executionThread.isAlive()) {
97 try {
98 executionThread.join();
99 } catch (InterruptedException e) {
100 die();
101 return;
102 }
103 }
104 }
105
106 computeFinalStatus();
107 }
108
109 /** Make sure this is called BEFORE all the threads are interrupted. */
110 private void computeFinalStatus() {
111 // String oldStatus = process.getStatus();
112 // TODO: error management at flow level?
113 if (killed)
114 process.setStatus(ExecutionProcess.KILLED);
115 else if (processThreadGroup.hadAnError())
116 process.setStatus(ExecutionProcess.ERROR);
117 else
118 process.setStatus(ExecutionProcess.COMPLETED);
119 // executionModulesManager.dispatchUpdateStatus(process, oldStatus,
120 // process.getStatus());
121 logger.log(Level.INFO, "\n## SLC Process #" + process.getUuid() + " " + process.getStatus() + "\n");
122 }
123
124 /** Called when being killed */
125 private synchronized void die() {
126 killed = true;
127 computeFinalStatus();
128 for (ExecutionThread executionThread : executionThreads) {
129 try {
130 executionThread.interrupt();
131 } catch (Exception e) {
132 logger.log(Level.ERROR, "Cannot interrupt " + executionThread);
133 }
134 }
135 processThreadGroup.interrupt();
136 }
137
138 /**
139 * Implementation specific execution. To be overridden in order to deal with
140 * custom process types. Default expects an {@link SlcExecution}.
141 */
142 protected void process() throws InterruptedException {
143 List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
144 flowsToProcess.addAll(process.getRealizedFlows());
145 while (flowsToProcess.size() > 0) {
146 RealizedFlow realizedFlow = flowsToProcess.remove(0);
147 execute(realizedFlow, true);
148 }
149 }
150
151 /** @return the (distinct) thread used for this execution */
152 protected final void execute(RealizedFlow realizedFlow, Boolean synchronous) throws InterruptedException {
153 if (killed)
154 return;
155
156 ExecutionThread thread = new ExecutionThread(processThreadGroup, executionModulesManager, realizedFlow);
157 executionThreads.add(thread);
158 thread.start();
159
160 if (synchronous)
161 thread.join();
162
163 return;
164 }
165
166 // public void notifyError() {
167 // hadAnError = true;
168 // }
169 //
170 // public synchronized void flowCompleted() {
171 // // notifyAll();
172 // }
173
174 public ExecutionProcess getProcess() {
175 return process;
176 }
177
178 public ProcessThreadGroup getProcessThreadGroup() {
179 return processThreadGroup;
180 }
181
182 public ExecutionModulesManager getExecutionModulesManager() {
183 return executionModulesManager;
184 }
185
186 private class LoggingThread extends Thread {
187
188 public LoggingThread() {
189 super("SLC Process Logger #" + process.getUuid());
190 }
191
192 public void run() {
193 boolean run = true;
194 while (run) {
195 List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
196 processThreadGroup.getSteps().drainTo(newSteps);
197 if (newSteps.size() > 0) {
198 // System.out.println(steps.size() + " steps");
199 process.addSteps(newSteps);
200 }
201
202 try {
203 Thread.sleep(1000);
204 } catch (InterruptedException e) {
205 break;
206 }
207
208 if (!ProcessThread.this.isAlive() && processThreadGroup.getSteps().size() == 0)
209 run = false;
210 }
211 }
212
213 }
214 }