]> git.argeo.org Git - gpl/argeo-slc.git/blob - org.argeo.slc.core/src/org/argeo/slc/core/execution/ProcessThread.java
Remove runner servlet
[gpl/argeo-slc.git] / org.argeo.slc.core / src / org / argeo / slc / core / execution / ProcessThread.java
1 /*
2 * Copyright (C) 2007-2012 Argeo GmbH
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.argeo.slc.core.execution;
17
18 import java.security.AccessControlContext;
19 import java.security.AccessController;
20 import java.security.PrivilegedActionException;
21 import java.security.PrivilegedExceptionAction;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Set;
27
28 import javax.security.auth.Subject;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.argeo.slc.SlcException;
33 import org.argeo.slc.execution.ExecutionModulesManager;
34 import org.argeo.slc.execution.ExecutionProcess;
35 import org.argeo.slc.execution.ExecutionStep;
36 import org.argeo.slc.execution.RealizedFlow;
37
38 /**
39 * Main thread coordinating an {@link ExecutionProcess}, launching parallel or
40 * sequential {@link ExecutionThread}s.
41 */
42 public class ProcessThread extends Thread {
43 private final static Log log = LogFactory.getLog(ProcessThread.class);
44
45 private final ExecutionModulesManager executionModulesManager;
46 private final ExecutionProcess process;
47 private final ProcessThreadGroup processThreadGroup;
48
49 private Set<ExecutionThread> executionThreads = Collections.synchronizedSet(new HashSet<ExecutionThread>());
50
51 // private Boolean hadAnError = false;
52 private Boolean killed = false;
53
54 private final AccessControlContext accessControlContext;
55
56 public ProcessThread(ThreadGroup processesThreadGroup, ExecutionModulesManager executionModulesManager,
57 ExecutionProcess process) {
58 super(processesThreadGroup, "SLC Process #" + process.getUuid());
59 this.executionModulesManager = executionModulesManager;
60 this.process = process;
61 processThreadGroup = new ProcessThreadGroup(process);
62 accessControlContext = AccessController.getContext();
63 }
64
65 public final void run() {
66 // authenticate thread
67 // Authentication authentication = getProcessThreadGroup()
68 // .getAuthentication();
69 // if (authentication == null)
70 // throw new SlcException("Can only execute authenticated threads");
71 // SecurityContextHolder.getContext().setAuthentication(authentication);
72
73 log.info("\n##\n## SLC Process #" + process.getUuid() + " STARTED\n##\n");
74
75 // Start logging
76 new LoggingThread().start();
77
78 process.setStatus(ExecutionProcess.RUNNING);
79 try {
80 Subject subject = Subject.getSubject(accessControlContext);
81 try {
82 Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
83
84 @Override
85 public Void run() throws Exception {
86 process();
87 return null;
88 }
89
90 });
91 } catch (PrivilegedActionException privilegedActionException) {
92 Throwable cause = privilegedActionException.getCause();
93 if (cause instanceof InterruptedException)
94 throw (InterruptedException) cause;
95 else
96 throw new SlcException("Cannot process", cause);
97 }
98 // process();
99 } catch (InterruptedException e) {
100 die();
101 return;
102 } catch (Exception e) {
103 String msg = "Process " + getProcess().getUuid() + " failed unexpectedly.";
104 log.error(msg, e);
105 getProcessThreadGroup()
106 .dispatchAddStep(new ExecutionStep("Process", ExecutionStep.ERROR, msg + " " + e.getMessage()));
107 }
108
109 // waits for all execution threads to complete (in case they were
110 // started asynchronously)
111 for (ExecutionThread executionThread : executionThreads) {
112 if (executionThread.isAlive()) {
113 try {
114 executionThread.join();
115 } catch (InterruptedException e) {
116 die();
117 return;
118 }
119 }
120 }
121
122 computeFinalStatus();
123 }
124
125 /** Make sure this is called BEFORE all the threads are interrupted. */
126 private void computeFinalStatus() {
127 // String oldStatus = process.getStatus();
128 // TODO: error management at flow level?
129 if (killed)
130 process.setStatus(ExecutionProcess.KILLED);
131 else if (processThreadGroup.hadAnError())
132 process.setStatus(ExecutionProcess.ERROR);
133 else
134 process.setStatus(ExecutionProcess.COMPLETED);
135 // executionModulesManager.dispatchUpdateStatus(process, oldStatus,
136 // process.getStatus());
137 log.info("\n## SLC Process #" + process.getUuid() + " " + process.getStatus() + "\n");
138 }
139
140 /** Called when being killed */
141 private synchronized void die() {
142 killed = true;
143 computeFinalStatus();
144 for (ExecutionThread executionThread : executionThreads) {
145 try {
146 executionThread.interrupt();
147 } catch (Exception e) {
148 log.error("Cannot interrupt " + executionThread);
149 }
150 }
151 processThreadGroup.interrupt();
152 }
153
154 /**
155 * Implementation specific execution. To be overridden in order to deal with
156 * custom process types. Default expects an {@link SlcExecution}.
157 */
158 protected void process() throws InterruptedException {
159 List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
160 flowsToProcess.addAll(process.getRealizedFlows());
161 while (flowsToProcess.size() > 0) {
162 RealizedFlow realizedFlow = flowsToProcess.remove(0);
163 execute(realizedFlow, true);
164 }
165 }
166
167 /** @return the (distinct) thread used for this execution */
168 protected final void execute(RealizedFlow realizedFlow, Boolean synchronous) throws InterruptedException {
169 if (killed)
170 return;
171
172 ExecutionThread thread = new ExecutionThread(processThreadGroup, executionModulesManager, realizedFlow);
173 executionThreads.add(thread);
174 thread.start();
175
176 if (synchronous)
177 thread.join();
178
179 return;
180 }
181
182 // public void notifyError() {
183 // hadAnError = true;
184 // }
185 //
186 // public synchronized void flowCompleted() {
187 // // notifyAll();
188 // }
189
190 public ExecutionProcess getProcess() {
191 return process;
192 }
193
194 public ProcessThreadGroup getProcessThreadGroup() {
195 return processThreadGroup;
196 }
197
198 public ExecutionModulesManager getExecutionModulesManager() {
199 return executionModulesManager;
200 }
201
202 private class LoggingThread extends Thread {
203
204 public LoggingThread() {
205 super("SLC Process Logger #" + process.getUuid());
206 }
207
208 public void run() {
209 boolean run = true;
210 while (run) {
211 List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
212 processThreadGroup.getSteps().drainTo(newSteps);
213 if (newSteps.size() > 0) {
214 // System.out.println(steps.size() + " steps");
215 process.addSteps(newSteps);
216 }
217
218 try {
219 Thread.sleep(1000);
220 } catch (InterruptedException e) {
221 break;
222 }
223
224 if (!ProcessThread.this.isAlive() && processThreadGroup.getSteps().size() == 0)
225 run = false;
226 }
227 }
228
229 }
230 }