]> git.argeo.org Git - gpl/argeo-slc.git/blob - ProcessThread.java
8e7584b2bff8bb7fbf4b790cffa7eec0d59fe79c
[gpl/argeo-slc.git] / ProcessThread.java
1 /*
2 * Copyright (C) 2007-2012 Argeo GmbH
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.argeo.slc.core.execution;
17
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.Set;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.argeo.slc.SlcException;
27 import org.argeo.slc.execution.ExecutionModulesManager;
28 import org.argeo.slc.execution.ExecutionProcess;
29 import org.argeo.slc.execution.ExecutionStep;
30 import org.argeo.slc.execution.RealizedFlow;
31 import org.springframework.security.Authentication;
32 import org.springframework.security.context.SecurityContextHolder;
33
34 public class ProcessThread extends Thread {
35 private final static Log log = LogFactory.getLog(ProcessThread.class);
36
37 private final ExecutionModulesManager executionModulesManager;
38 private final ExecutionProcess process;
39 private final ProcessThreadGroup processThreadGroup;
40
41 private Set<ExecutionThread> executionThreads = Collections
42 .synchronizedSet(new HashSet<ExecutionThread>());
43
44 private Boolean hadAnError = false;
45 private Boolean killed = false;
46
47 public ProcessThread(ThreadGroup processesThreadGroup,
48 ExecutionModulesManager executionModulesManager,
49 ExecutionProcess process) {
50 super(processesThreadGroup, "SLC Process #" + process.getUuid());
51 this.executionModulesManager = executionModulesManager;
52 this.process = process;
53 processThreadGroup = new ProcessThreadGroup(executionModulesManager,
54 this);
55 }
56
57 public final void run() {
58 // authenticate thread
59 Authentication authentication = getProcessThreadGroup()
60 .getAuthentication();
61 if (authentication == null)
62 throw new SlcException("Can only execute authenticated threads");
63 SecurityContextHolder.getContext().setAuthentication(authentication);
64
65 // log.info("\n##\n## SLC Process #" + process.getUuid() +
66 // " STARTED by "
67 // + authentication.getName() + "\n##\n");
68 log.info("\n##\n## SLC Process #" + process.getUuid()
69 + " STARTED\n##\n");
70
71 // Start logging
72 new LoggingThread().start();
73
74 String oldStatus = process.getStatus();
75 process.setStatus(ExecutionProcess.RUNNING);
76 executionModulesManager.dispatchUpdateStatus(process, oldStatus,
77 ExecutionProcess.RUNNING);
78
79 try {
80 process();
81 } catch (InterruptedException e) {
82 die();
83 return;
84 }
85
86 // waits for all execution threads to complete (in case they were
87 // started asynchronously)
88 for (ExecutionThread executionThread : executionThreads) {
89 if (executionThread.isAlive()) {
90 try {
91 executionThread.join();
92 } catch (InterruptedException e) {
93 die();
94 return;
95 }
96 }
97 }
98
99 computeFinalStatus();
100 }
101
102 /** Make sure this is called BEFORE all the threads are interrupted. */
103 private void computeFinalStatus() {
104 String oldStatus = process.getStatus();
105 // TODO: error management at flow level?
106 if (killed)
107 process.setStatus(ExecutionProcess.KILLED);
108 else if (hadAnError)
109 process.setStatus(ExecutionProcess.ERROR);
110 else
111 process.setStatus(ExecutionProcess.COMPLETED);
112 executionModulesManager.dispatchUpdateStatus(process, oldStatus,
113 process.getStatus());
114 log.info("\n## SLC Process #" + process.getUuid() + " "
115 + process.getStatus() + "\n");
116 }
117
118 /** Called when being killed */
119 private synchronized void die() {
120 killed = true;
121 computeFinalStatus();
122 for (ExecutionThread executionThread : executionThreads) {
123 try {
124 executionThread.interrupt();
125 } catch (Exception e) {
126 log.error("Cannot interrupt " + executionThread);
127 }
128 }
129 processThreadGroup.interrupt();
130 }
131
132 /**
133 * Implementation specific execution. To be overridden in order to deal with
134 * custom process types. Default expects an {@link SlcExecution}.
135 */
136 protected void process() throws InterruptedException {
137 List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
138 flowsToProcess.addAll(process.getRealizedFlows());
139 while (flowsToProcess.size() > 0) {
140 RealizedFlow realizedFlow = flowsToProcess.remove(0);
141 execute(realizedFlow, true);
142 }
143 }
144
145 /** @return the (distinct) thread used for this execution */
146 protected final void execute(RealizedFlow realizedFlow, Boolean synchronous)
147 throws InterruptedException {
148 if (killed)
149 return;
150
151 ExecutionThread thread = new ExecutionThread(this, realizedFlow);
152 executionThreads.add(thread);
153 thread.start();
154
155 if (synchronous)
156 thread.join();
157
158 return;
159 }
160
161 public void notifyError() {
162 hadAnError = true;
163 }
164
165 public synchronized void flowCompleted() {
166 // notifyAll();
167 }
168
169 public ExecutionProcess getProcess() {
170 return process;
171 }
172
173 public ProcessThreadGroup getProcessThreadGroup() {
174 return processThreadGroup;
175 }
176
177 public ExecutionModulesManager getExecutionModulesManager() {
178 return executionModulesManager;
179 }
180
181 private class LoggingThread extends Thread {
182
183 public LoggingThread() {
184 super("SLC Process Logger #" + process.getUuid());
185 }
186
187 public void run() {
188 boolean run = true;
189 while (run) {
190 List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
191 processThreadGroup.getSteps().drainTo(newSteps);
192 if (newSteps.size() > 0) {
193 // System.out.println(steps.size() + " steps");
194 process.addSteps(newSteps);
195 }
196
197 try {
198 Thread.sleep(1000);
199 } catch (InterruptedException e) {
200 break;
201 }
202
203 if (!ProcessThread.this.isAlive()
204 && processThreadGroup.getSteps().size() == 0)
205 run = false;
206 }
207 }
208
209 }
210 }