]> git.argeo.org Git - gpl/argeo-slc.git/blob - org.argeo.slc.runtime/src/org/argeo/slc/runtime/ExecutionThread.java
Introduce mbox support
[gpl/argeo-slc.git] / org.argeo.slc.runtime / src / org / argeo / slc / runtime / ExecutionThread.java
1 package org.argeo.slc.runtime;
2
3 import static java.lang.System.Logger.Level.ERROR;
4 import static java.lang.System.Logger.Level.WARNING;
5
6 import java.security.AccessControlContext;
7 import java.security.AccessController;
8 import java.security.PrivilegedActionException;
9 import java.security.PrivilegedExceptionAction;
10 import java.util.ArrayList;
11 import java.util.List;
12
13 import javax.security.auth.Subject;
14
15 import org.argeo.slc.execution.ExecutionFlowDescriptor;
16 import org.argeo.slc.execution.ExecutionModulesManager;
17 import org.argeo.slc.execution.ExecutionStep;
18 import org.argeo.slc.execution.FlowConfigurationException;
19 import org.argeo.slc.execution.RealizedFlow;
20
21 /** Thread of a single execution */
22 public class ExecutionThread extends Thread {
23 public final static String SYSPROP_EXECUTION_AUTO_UPGRADE = "slc.execution.autoupgrade";
24 private final static System.Logger logger = System.getLogger(ExecutionThread.class.getName());
25
26 private ExecutionModulesManager executionModulesManager;
27 private final RealizedFlow realizedFlow;
28 private final AccessControlContext accessControlContext;
29
30 private List<Runnable> destructionCallbacks = new ArrayList<Runnable>();
31
32 public ExecutionThread(ProcessThreadGroup processThreadGroup, ExecutionModulesManager executionModulesManager,
33 RealizedFlow realizedFlow) {
34 super(processThreadGroup, "Flow " + realizedFlow.getFlowDescriptor().getName());
35 this.realizedFlow = realizedFlow;
36 this.executionModulesManager = executionModulesManager;
37 accessControlContext = AccessController.getContext();
38 }
39
40 public void run() {
41 // authenticate thread
42 // Authentication authentication = getProcessThreadGroup()
43 // .getAuthentication();
44 // if (authentication == null)
45 // throw new SlcException("Can only execute authenticated threads");
46 // SecurityContextHolder.getContext().setAuthentication(authentication);
47
48 // Retrieve execution flow descriptor
49 ExecutionFlowDescriptor executionFlowDescriptor = realizedFlow.getFlowDescriptor();
50 String flowName = executionFlowDescriptor.getName();
51
52 getProcessThreadGroup().dispatchAddStep(
53 new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.PHASE_START, "Flow " + flowName));
54
55 try {
56 Subject subject = Subject.getSubject(accessControlContext);
57 try {
58 Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
59
60 @Override
61 public Void run() throws Exception {
62 String autoUpgrade = System.getProperty(SYSPROP_EXECUTION_AUTO_UPGRADE);
63 if (autoUpgrade != null && autoUpgrade.equals("true"))
64 executionModulesManager.upgrade(realizedFlow.getModuleNameVersion());
65 executionModulesManager.start(realizedFlow.getModuleNameVersion());
66 //
67 // START FLOW
68 //
69 executionModulesManager.execute(realizedFlow);
70 // END FLOW
71 return null;
72 }
73
74 });
75 } catch (PrivilegedActionException privilegedActionException) {
76 throw (Exception) privilegedActionException.getCause();
77 }
78 } catch (FlowConfigurationException e) {
79 String msg = "Configuration problem with flow " + flowName + ":\n" + e.getMessage();
80 logger.log(ERROR, msg);
81 getProcessThreadGroup().dispatchAddStep(
82 new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.ERROR, msg + " " + e.getMessage()));
83 } catch (Exception e) {
84 // TODO: re-throw exception ?
85 String msg = "Execution of flow " + flowName + " failed.";
86 logger.log(ERROR, msg, e);
87 getProcessThreadGroup().dispatchAddStep(
88 new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.ERROR, msg + " " + e.getMessage()));
89 } finally {
90 getProcessThreadGroup().dispatchAddStep(
91 new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.PHASE_END, "Flow " + flowName));
92 processDestructionCallbacks();
93 }
94 }
95
96 private synchronized void processDestructionCallbacks() {
97 for (int i = destructionCallbacks.size() - 1; i >= 0; i--) {
98 try {
99 destructionCallbacks.get(i).run();
100 } catch (Exception e) {
101 logger.log(WARNING, "Could not process destruction callback " + i + " in thread " + getName(), e);
102 }
103 }
104 }
105
106 /**
107 * Gather object destruction callback to be called in reverse order at the end
108 * of the thread
109 */
110 public synchronized void registerDestructionCallback(String name, Runnable callback) {
111 destructionCallbacks.add(callback);
112 }
113
114 protected ProcessThreadGroup getProcessThreadGroup() {
115 return (ProcessThreadGroup) getThreadGroup();
116 }
117 }