X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=org.argeo.slc.core%2Fsrc%2Forg%2Fargeo%2Fslc%2Fcore%2Fexecution%2FExecutionThread.java;fp=org.argeo.slc.core%2Fsrc%2Forg%2Fargeo%2Fslc%2Fcore%2Fexecution%2FExecutionThread.java;h=48a5df38bed6f9bf2be8c2e7e8d8d786af0f0712;hb=a9b97cc33383ded70277f49aa287f84903334e70;hp=0000000000000000000000000000000000000000;hpb=d1298659fe6f179d1cbbc8c89f108a0bbc5b4edf;p=gpl%2Fargeo-slc.git diff --git a/org.argeo.slc.core/src/org/argeo/slc/core/execution/ExecutionThread.java b/org.argeo.slc.core/src/org/argeo/slc/core/execution/ExecutionThread.java new file mode 100644 index 000000000..48a5df38b --- /dev/null +++ b/org.argeo.slc.core/src/org/argeo/slc/core/execution/ExecutionThread.java @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2007-2012 Argeo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.argeo.slc.core.execution; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.argeo.slc.SlcException; +import org.argeo.slc.execution.ExecutionFlowDescriptor; +import org.argeo.slc.execution.ExecutionModulesManager; +import org.argeo.slc.execution.ExecutionStep; +import org.argeo.slc.execution.RealizedFlow; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; + +/** Thread of a single execution */ +public class ExecutionThread extends Thread { + public final static String SYSPROP_EXECUTION_AUTO_UPGRADE = "slc.execution.autoupgrade"; + private final static Log log = LogFactory.getLog(ExecutionThread.class); + + private ExecutionModulesManager executionModulesManager; + private final RealizedFlow realizedFlow; + + private List destructionCallbacks = new ArrayList(); + + public ExecutionThread(ProcessThreadGroup processThreadGroup, + ExecutionModulesManager executionModulesManager, + RealizedFlow realizedFlow) { + super(processThreadGroup, "Flow " + + realizedFlow.getFlowDescriptor().getName()); + this.realizedFlow = realizedFlow; + this.executionModulesManager = executionModulesManager; + } + + public void run() { + // authenticate thread + Authentication authentication = getProcessThreadGroup() + .getAuthentication(); + if (authentication == null) + throw new SlcException("Can only execute authenticated threads"); + SecurityContextHolder.getContext().setAuthentication(authentication); + + // Retrieve execution flow descriptor + ExecutionFlowDescriptor executionFlowDescriptor = realizedFlow + .getFlowDescriptor(); + String flowName = executionFlowDescriptor.getName(); + + getProcessThreadGroup().dispatchAddStep( + new ExecutionStep(realizedFlow.getModuleName(), + ExecutionStep.PHASE_START, "Flow " + flowName)); + + try { + String autoUpgrade = System + .getProperty(SYSPROP_EXECUTION_AUTO_UPGRADE); + if (autoUpgrade != null && autoUpgrade.equals("true")) + executionModulesManager.upgrade(realizedFlow + .getModuleNameVersion()); + executionModulesManager.start(realizedFlow.getModuleNameVersion()); + // + // START FLOW + // + executionModulesManager.execute(realizedFlow); + // END FLOW + } catch (FlowConfigurationException e) { + String msg = "Configuration problem with flow " + flowName + ":\n" + + e.getMessage(); + log.error(msg); + getProcessThreadGroup().dispatchAddStep( + new ExecutionStep(realizedFlow.getModuleName(), + ExecutionStep.ERROR, msg + " " + e.getMessage())); + } catch (Exception e) { + // TODO: re-throw exception ? + String msg = "Execution of flow " + flowName + " failed."; + log.error(msg, e); + getProcessThreadGroup().dispatchAddStep( + new ExecutionStep(realizedFlow.getModuleName(), + ExecutionStep.ERROR, msg + " " + e.getMessage())); + } finally { + getProcessThreadGroup().dispatchAddStep( + new ExecutionStep(realizedFlow.getModuleName(), + ExecutionStep.PHASE_END, "Flow " + flowName)); + processDestructionCallbacks(); + } + } + + private synchronized void processDestructionCallbacks() { + for (int i = destructionCallbacks.size() - 1; i >= 0; i--) { + try { + destructionCallbacks.get(i).run(); + } catch (Exception e) { + log.warn("Could not process destruction callback " + i + + " in thread " + getName(), e); + } + } + } + + /** + * Gather object destruction callback to be called in reverse order at the + * end of the thread + */ + synchronized void registerDestructionCallback(String name, Runnable callback) { + destructionCallbacks.add(callback); + } + + protected ProcessThreadGroup getProcessThreadGroup() { + return (ProcessThreadGroup) getThreadGroup(); + } +} \ No newline at end of file