]> git.argeo.org Git - gpl/argeo-slc.git/blob - org.argeo.slc.core/src/org/argeo/slc/core/execution/ProcessThread.java
Disable trace logging
[gpl/argeo-slc.git] / org.argeo.slc.core / src / org / argeo / slc / core / execution / ProcessThread.java
1 /*
2 * Copyright (C) 2007-2012 Argeo GmbH
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.argeo.slc.core.execution;
17
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.Set;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.argeo.slc.SlcException;
27 import org.argeo.slc.execution.ExecutionModulesManager;
28 import org.argeo.slc.execution.ExecutionProcess;
29 import org.argeo.slc.execution.ExecutionStep;
30 import org.argeo.slc.execution.RealizedFlow;
31 import org.springframework.security.core.Authentication;
32 import org.springframework.security.core.context.SecurityContextHolder;
33
34 /**
35 * Main thread coordinating an {@link ExecutionProcess}, launching parallel or
36 * sequential {@link ExecutionThread}s.
37 */
38 public class ProcessThread extends Thread {
39 private final static Log log = LogFactory.getLog(ProcessThread.class);
40
41 private final ExecutionModulesManager executionModulesManager;
42 private final ExecutionProcess process;
43 private final ProcessThreadGroup processThreadGroup;
44
45 private Set<ExecutionThread> executionThreads = Collections
46 .synchronizedSet(new HashSet<ExecutionThread>());
47
48 // private Boolean hadAnError = false;
49 private Boolean killed = false;
50
51 public ProcessThread(ThreadGroup processesThreadGroup,
52 ExecutionModulesManager executionModulesManager,
53 ExecutionProcess process) {
54 super(processesThreadGroup, "SLC Process #" + process.getUuid());
55 this.executionModulesManager = executionModulesManager;
56 this.process = process;
57 processThreadGroup = new ProcessThreadGroup(process);
58 }
59
60 public final void run() {
61 // authenticate thread
62 Authentication authentication = getProcessThreadGroup()
63 .getAuthentication();
64 if (authentication == null)
65 throw new SlcException("Can only execute authenticated threads");
66 SecurityContextHolder.getContext().setAuthentication(authentication);
67
68 log.info("\n##\n## SLC Process #" + process.getUuid()
69 + " STARTED\n##\n");
70
71 // Start logging
72 new LoggingThread().start();
73
74 process.setStatus(ExecutionProcess.RUNNING);
75 try {
76 process();
77 } catch (InterruptedException e) {
78 die();
79 return;
80 } catch (Exception e) {
81 String msg = "Process " + getProcess().getUuid()
82 + " failed unexpectedly.";
83 log.error(msg, e);
84 getProcessThreadGroup().dispatchAddStep(
85 new ExecutionStep("Process", ExecutionStep.ERROR, msg + " "
86 + e.getMessage()));
87 }
88
89 // waits for all execution threads to complete (in case they were
90 // started asynchronously)
91 for (ExecutionThread executionThread : executionThreads) {
92 if (executionThread.isAlive()) {
93 try {
94 executionThread.join();
95 } catch (InterruptedException e) {
96 die();
97 return;
98 }
99 }
100 }
101
102 computeFinalStatus();
103 }
104
105 /** Make sure this is called BEFORE all the threads are interrupted. */
106 private void computeFinalStatus() {
107 // String oldStatus = process.getStatus();
108 // TODO: error management at flow level?
109 if (killed)
110 process.setStatus(ExecutionProcess.KILLED);
111 else if (processThreadGroup.hadAnError())
112 process.setStatus(ExecutionProcess.ERROR);
113 else
114 process.setStatus(ExecutionProcess.COMPLETED);
115 // executionModulesManager.dispatchUpdateStatus(process, oldStatus,
116 // process.getStatus());
117 log.info("\n## SLC Process #" + process.getUuid() + " "
118 + process.getStatus() + "\n");
119 }
120
121 /** Called when being killed */
122 private synchronized void die() {
123 killed = true;
124 computeFinalStatus();
125 for (ExecutionThread executionThread : executionThreads) {
126 try {
127 executionThread.interrupt();
128 } catch (Exception e) {
129 log.error("Cannot interrupt " + executionThread);
130 }
131 }
132 processThreadGroup.interrupt();
133 }
134
135 /**
136 * Implementation specific execution. To be overridden in order to deal with
137 * custom process types. Default expects an {@link SlcExecution}.
138 */
139 protected void process() throws InterruptedException {
140 List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
141 flowsToProcess.addAll(process.getRealizedFlows());
142 while (flowsToProcess.size() > 0) {
143 RealizedFlow realizedFlow = flowsToProcess.remove(0);
144 execute(realizedFlow, true);
145 }
146 }
147
148 /** @return the (distinct) thread used for this execution */
149 protected final void execute(RealizedFlow realizedFlow, Boolean synchronous)
150 throws InterruptedException {
151 if (killed)
152 return;
153
154 ExecutionThread thread = new ExecutionThread(processThreadGroup,
155 executionModulesManager, realizedFlow);
156 executionThreads.add(thread);
157 thread.start();
158
159 if (synchronous)
160 thread.join();
161
162 return;
163 }
164
165 // public void notifyError() {
166 // hadAnError = true;
167 // }
168 //
169 // public synchronized void flowCompleted() {
170 // // notifyAll();
171 // }
172
173 public ExecutionProcess getProcess() {
174 return process;
175 }
176
177 public ProcessThreadGroup getProcessThreadGroup() {
178 return processThreadGroup;
179 }
180
181 public ExecutionModulesManager getExecutionModulesManager() {
182 return executionModulesManager;
183 }
184
185 private class LoggingThread extends Thread {
186
187 public LoggingThread() {
188 super("SLC Process Logger #" + process.getUuid());
189 }
190
191 public void run() {
192 boolean run = true;
193 while (run) {
194 List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
195 processThreadGroup.getSteps().drainTo(newSteps);
196 if (newSteps.size() > 0) {
197 // System.out.println(steps.size() + " steps");
198 process.addSteps(newSteps);
199 }
200
201 try {
202 Thread.sleep(1000);
203 } catch (InterruptedException e) {
204 break;
205 }
206
207 if (!ProcessThread.this.isAlive()
208 && processThreadGroup.getSteps().size() == 0)
209 run = false;
210 }
211 }
212
213 }
214 }