2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org
.argeo
.slc
.core
.execution
;
19 import java
.util
.ArrayList
;
20 import java
.util
.HashSet
;
21 import java
.util
.List
;
24 import org
.apache
.commons
.logging
.Log
;
25 import org
.apache
.commons
.logging
.LogFactory
;
26 import org
.argeo
.slc
.SlcException
;
27 import org
.argeo
.slc
.execution
.ExecutionModulesManager
;
28 import org
.argeo
.slc
.execution
.ExecutionProcess
;
29 import org
.argeo
.slc
.process
.RealizedFlow
;
30 import org
.argeo
.slc
.process
.SlcExecution
;
32 /** Thread of the SLC Process, starting the sub executions. */
33 public class ProcessThread
extends Thread
{
34 private final static Log log
= LogFactory
.getLog(ProcessThread
.class);
36 private final ExecutionModulesManager executionModulesManager
;
37 private final ExecutionProcess process
;
38 private final ProcessThreadGroup processThreadGroup
;
40 private Set
<ExecutionThread
> executionThreads
= new HashSet
<ExecutionThread
>();
42 private Boolean hadAnError
= false;
44 public ProcessThread(ExecutionModulesManager executionModulesManager
,
45 ExecutionProcess process
) {
46 super(executionModulesManager
.getProcessesThreadGroup(),
47 "SLC Process #" + process
.getUuid());
48 this.executionModulesManager
= executionModulesManager
;
49 this.process
= process
;
50 processThreadGroup
= new ProcessThreadGroup(executionModulesManager
,
55 log
.info("\n##\n## SLC Process #" + process
.getUuid()
58 process
.setStatus(SlcExecution
.RUNNING
);
59 executionModulesManager
.dispatchUpdateStatus(process
,
60 SlcExecution
.SCHEDULED
, SlcExecution
.RUNNING
);
64 // waits for all execution threads to complete (in case they were
65 // started asynchronously)
66 for (ExecutionThread executionThread
: executionThreads
) {
67 if (executionThread
.isAlive()) {
69 executionThread
.join();
70 } catch (InterruptedException e
) {
71 log
.error("Execution thread " + executionThread
72 + " was interrupted");
77 // TODO: error management at flow level?
79 process
.setStatus(SlcExecution
.ERROR
);
81 process
.setStatus(SlcExecution
.COMPLETED
);
82 executionModulesManager
.dispatchUpdateStatus(process
,
83 SlcExecution
.RUNNING
, process
.getStatus());
85 log
.info("\n## SLC Process #" + process
.getUuid() + " COMPLETED\n");
89 * Implementation specific execution. To be overridden in order to deal with
90 * custom process types. Default expects an {@link SlcExecution}.
92 protected void process() {
93 if (!(process
instanceof SlcExecution
))
94 throw new SlcException("Unsupported process type "
95 + process
.getClass());
96 SlcExecution slcExecution
= (SlcExecution
) process
;
97 List
<RealizedFlow
> flowsToProcess
= new ArrayList
<RealizedFlow
>();
98 flowsToProcess
.addAll(slcExecution
.getRealizedFlows());
100 while (flowsToProcess
.size() > 0) {
101 RealizedFlow realizedFlow
= flowsToProcess
.remove(0);
102 execute(realizedFlow
, true);
106 protected void execute(RealizedFlow realizedFlow
, Boolean synchronous
) {
107 ExecutionThread thread
= new ExecutionThread(this, realizedFlow
);
108 executionThreads
.add(thread
);
114 } catch (InterruptedException e
) {
115 log
.error("Flow " + realizedFlow
+ " was interrupted", e
);
119 // synchronized (this) {
122 // } catch (InterruptedException e) {
128 public void notifyError() {
132 public synchronized void flowCompleted() {
136 public ExecutionProcess
getProcess() {
140 public ProcessThreadGroup
getProcessThreadGroup() {
141 return processThreadGroup
;
144 public ExecutionModulesManager
getExecutionModulesManager() {
145 return executionModulesManager
;