]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/ProcessThread.java
Move default agent to execution package
[gpl/argeo-slc.git] / runtime / org.argeo.slc.core / src / main / java / org / argeo / slc / core / execution / ProcessThread.java
1 /*
2 * Copyright (C) 2007-2012 Argeo GmbH
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.argeo.slc.core.execution;
17
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.Set;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.argeo.slc.SlcException;
27 import org.argeo.slc.execution.ExecutionModulesManager;
28 import org.argeo.slc.execution.ExecutionProcess;
29 import org.argeo.slc.execution.ExecutionStep;
30 import org.argeo.slc.execution.RealizedFlow;
31 import org.argeo.slc.process.SlcExecution;
32 import org.springframework.security.Authentication;
33 import org.springframework.security.context.SecurityContextHolder;
34
35 /** Thread of the SLC Process, starting the sub executions. */
36 @SuppressWarnings("deprecation")
37 public class ProcessThread extends Thread {
38 private final static Log log = LogFactory.getLog(ProcessThread.class);
39
40 private final ExecutionModulesManager executionModulesManager;
41 private final ExecutionProcess process;
42 private final ProcessThreadGroup processThreadGroup;
43
44 private Set<ExecutionThread> executionThreads = Collections
45 .synchronizedSet(new HashSet<ExecutionThread>());
46
47 private Boolean hadAnError = false;
48 private Boolean killed = false;
49
50 public ProcessThread(ThreadGroup processesThreadGroup,
51 ExecutionModulesManager executionModulesManager,
52 ExecutionProcess process) {
53 super(processesThreadGroup, "SLC Process #" + process.getUuid());
54 this.executionModulesManager = executionModulesManager;
55 this.process = process;
56 processThreadGroup = new ProcessThreadGroup(executionModulesManager,
57 this);
58 }
59
60 public final void run() {
61 // authenticate thread
62 Authentication authentication = getProcessThreadGroup()
63 .getAuthentication();
64 if (authentication == null)
65 throw new SlcException("Can only execute authenticated threads");
66 SecurityContextHolder.getContext().setAuthentication(authentication);
67
68 // log.info("\n##\n## SLC Process #" + process.getUuid() +
69 // " STARTED by "
70 // + authentication.getName() + "\n##\n");
71 log.info("\n##\n## SLC Process #" + process.getUuid()
72 + " STARTED\n##\n");
73
74 // Start logging
75 new LoggingThread().start();
76
77 String oldStatus = process.getStatus();
78 process.setStatus(ExecutionProcess.RUNNING);
79 executionModulesManager.dispatchUpdateStatus(process, oldStatus,
80 ExecutionProcess.RUNNING);
81
82 try {
83 process();
84 } catch (InterruptedException e) {
85 die();
86 return;
87 }
88
89 // waits for all execution threads to complete (in case they were
90 // started asynchronously)
91 for (ExecutionThread executionThread : executionThreads) {
92 if (executionThread.isAlive()) {
93 try {
94 executionThread.join();
95 } catch (InterruptedException e) {
96 die();
97 return;
98 }
99 }
100 }
101
102 computeFinalStatus();
103 }
104
105 /** Make sure this is called BEFORE all the threads are interrupted. */
106 private void computeFinalStatus() {
107 String oldStatus = process.getStatus();
108 // TODO: error management at flow level?
109 if (killed)
110 process.setStatus(ExecutionProcess.KILLED);
111 else if (hadAnError)
112 process.setStatus(ExecutionProcess.ERROR);
113 else
114 process.setStatus(ExecutionProcess.COMPLETED);
115 executionModulesManager.dispatchUpdateStatus(process, oldStatus,
116 process.getStatus());
117 log.info("\n## SLC Process #" + process.getUuid() + " "
118 + process.getStatus() + "\n");
119 }
120
121 /** Called when being killed */
122 private synchronized void die() {
123 killed = true;
124 computeFinalStatus();
125 for (ExecutionThread executionThread : executionThreads) {
126 try {
127 executionThread.interrupt();
128 } catch (Exception e) {
129 log.error("Cannot interrupt " + executionThread);
130 }
131 }
132 processThreadGroup.interrupt();
133 }
134
135 /**
136 * Implementation specific execution. To be overridden in order to deal with
137 * custom process types. Default expects an {@link SlcExecution}.
138 */
139 protected void process() throws InterruptedException {
140 if (!(process instanceof SlcExecution))
141 throw new SlcException("Unsupported process type "
142 + process.getClass());
143 SlcExecution slcExecution = (SlcExecution) process;
144 List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
145 flowsToProcess.addAll(slcExecution.getRealizedFlows());
146
147 while (flowsToProcess.size() > 0) {
148 RealizedFlow realizedFlow = flowsToProcess.remove(0);
149 execute(realizedFlow, true);
150 }
151 }
152
153 /** @return the (distinct) thread used for this execution */
154 protected final void execute(RealizedFlow realizedFlow, Boolean synchronous)
155 throws InterruptedException {
156 if (killed)
157 return;
158
159 ExecutionThread thread = new ExecutionThread(this, realizedFlow);
160 executionThreads.add(thread);
161 thread.start();
162
163 if (synchronous)
164 thread.join();
165
166 return;
167 }
168
169 public void notifyError() {
170 hadAnError = true;
171 }
172
173 public synchronized void flowCompleted() {
174 // notifyAll();
175 }
176
177 public ExecutionProcess getProcess() {
178 return process;
179 }
180
181 public ProcessThreadGroup getProcessThreadGroup() {
182 return processThreadGroup;
183 }
184
185 public ExecutionModulesManager getExecutionModulesManager() {
186 return executionModulesManager;
187 }
188
189 private class LoggingThread extends Thread {
190
191 public LoggingThread() {
192 super("SLC Process Logger #" + process.getUuid());
193 }
194
195 public void run() {
196 boolean run = true;
197 while (run) {
198 List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
199 processThreadGroup.getSteps().drainTo(newSteps);
200 if (newSteps.size() > 0) {
201 // System.out.println(steps.size() + " steps");
202 process.addSteps(newSteps);
203 }
204
205 try {
206 Thread.sleep(1000);
207 } catch (InterruptedException e) {
208 break;
209 }
210
211 if (!ProcessThread.this.isAlive()
212 && processThreadGroup.getSteps().size() == 0)
213 run = false;
214 }
215 }
216
217 }
218 }