]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/ExecutionThread.java
Prepare next development cycle
[gpl/argeo-slc.git] / runtime / ExecutionThread.java
1 package org.argeo.slc.runtime;
2
3 import java.security.AccessControlContext;
4 import java.security.AccessController;
5 import java.security.PrivilegedActionException;
6 import java.security.PrivilegedExceptionAction;
7 import java.util.ArrayList;
8 import java.util.List;
9
10 import javax.security.auth.Subject;
11
12 import org.apache.commons.logging.Log;
13 import org.apache.commons.logging.LogFactory;
14 import org.argeo.slc.execution.ExecutionFlowDescriptor;
15 import org.argeo.slc.execution.ExecutionModulesManager;
16 import org.argeo.slc.execution.ExecutionStep;
17 import org.argeo.slc.execution.FlowConfigurationException;
18 import org.argeo.slc.execution.RealizedFlow;
19
20 /** Thread of a single execution */
21 public class ExecutionThread extends Thread {
22 public final static String SYSPROP_EXECUTION_AUTO_UPGRADE = "slc.execution.autoupgrade";
23 private final static Log log = LogFactory.getLog(ExecutionThread.class);
24
25 private ExecutionModulesManager executionModulesManager;
26 private final RealizedFlow realizedFlow;
27 private final AccessControlContext accessControlContext;
28
29 private List<Runnable> destructionCallbacks = new ArrayList<Runnable>();
30
31 public ExecutionThread(ProcessThreadGroup processThreadGroup, ExecutionModulesManager executionModulesManager,
32 RealizedFlow realizedFlow) {
33 super(processThreadGroup, "Flow " + realizedFlow.getFlowDescriptor().getName());
34 this.realizedFlow = realizedFlow;
35 this.executionModulesManager = executionModulesManager;
36 accessControlContext = AccessController.getContext();
37 }
38
39 public void run() {
40 // authenticate thread
41 // Authentication authentication = getProcessThreadGroup()
42 // .getAuthentication();
43 // if (authentication == null)
44 // throw new SlcException("Can only execute authenticated threads");
45 // SecurityContextHolder.getContext().setAuthentication(authentication);
46
47 // Retrieve execution flow descriptor
48 ExecutionFlowDescriptor executionFlowDescriptor = realizedFlow.getFlowDescriptor();
49 String flowName = executionFlowDescriptor.getName();
50
51 getProcessThreadGroup().dispatchAddStep(
52 new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.PHASE_START, "Flow " + flowName));
53
54 try {
55 Subject subject = Subject.getSubject(accessControlContext);
56 try {
57 Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
58
59 @Override
60 public Void run() throws Exception {
61 String autoUpgrade = System.getProperty(SYSPROP_EXECUTION_AUTO_UPGRADE);
62 if (autoUpgrade != null && autoUpgrade.equals("true"))
63 executionModulesManager.upgrade(realizedFlow.getModuleNameVersion());
64 executionModulesManager.start(realizedFlow.getModuleNameVersion());
65 //
66 // START FLOW
67 //
68 executionModulesManager.execute(realizedFlow);
69 // END FLOW
70 return null;
71 }
72
73 });
74 } catch (PrivilegedActionException privilegedActionException) {
75 throw (Exception) privilegedActionException.getCause();
76 }
77 } catch (FlowConfigurationException e) {
78 String msg = "Configuration problem with flow " + flowName + ":\n" + e.getMessage();
79 log.error(msg);
80 getProcessThreadGroup().dispatchAddStep(
81 new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.ERROR, msg + " " + e.getMessage()));
82 } catch (Exception e) {
83 // TODO: re-throw exception ?
84 String msg = "Execution of flow " + flowName + " failed.";
85 log.error(msg, e);
86 getProcessThreadGroup().dispatchAddStep(
87 new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.ERROR, msg + " " + e.getMessage()));
88 } finally {
89 getProcessThreadGroup().dispatchAddStep(
90 new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.PHASE_END, "Flow " + flowName));
91 processDestructionCallbacks();
92 }
93 }
94
95 private synchronized void processDestructionCallbacks() {
96 for (int i = destructionCallbacks.size() - 1; i >= 0; i--) {
97 try {
98 destructionCallbacks.get(i).run();
99 } catch (Exception e) {
100 log.warn("Could not process destruction callback " + i + " in thread " + getName(), e);
101 }
102 }
103 }
104
105 /**
106 * Gather object destruction callback to be called in reverse order at the
107 * end of the thread
108 */
109 public synchronized void registerDestructionCallback(String name, Runnable callback) {
110 destructionCallbacks.add(callback);
111 }
112
113 protected ProcessThreadGroup getProcessThreadGroup() {
114 return (ProcessThreadGroup) getThreadGroup();
115 }
116 }