X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=runtime%2Forg.argeo.slc.core%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fcore%2Fexecution%2FDefaultAgent.java;h=41c66622d01506b951651a1138747028bbd71466;hb=4216976fae84a1b9a7a7c83111b9dd95c7825cf9;hp=e69b1cf7af749771d7fa4aaab10dc47479db24a2;hpb=b531186cfae271152ca520489fd6443ab3eace4d;p=gpl%2Fargeo-slc.git diff --git a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java index e69b1cf7a..41c66622d 100644 --- a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java +++ b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java @@ -15,61 +15,78 @@ */ package org.argeo.slc.core.execution; -import java.net.InetAddress; -import java.net.UnknownHostException; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URLDecoder; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.argeo.slc.BasicNameVersion; import org.argeo.slc.SlcException; import org.argeo.slc.execution.ExecutionModuleDescriptor; import org.argeo.slc.execution.ExecutionModulesManager; import org.argeo.slc.execution.ExecutionProcess; -import org.argeo.slc.execution.ExecutionProcessNotifier; -import org.argeo.slc.execution.ExecutionStep; -import org.argeo.slc.process.SlcExecution; -import org.argeo.slc.runtime.SlcAgent; -import org.argeo.slc.runtime.SlcAgentDescriptor; +import org.argeo.slc.execution.SlcAgent; /** Implements the base methods of an SLC agent. */ -@SuppressWarnings("deprecation") -public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { - private final static Log log = LogFactory.getLog(DefaultAgent.class); +public class DefaultAgent implements SlcAgent { + // private final static Log log = LogFactory.getLog(DefaultAgent.class); + /** UTF-8 charset for encoding. */ + private final static String UTF8 = "UTF-8"; - private SlcAgentDescriptor agentDescriptor; + private String agentUuid = null; + // private SlcAgentDescriptor agentDescriptor; private ExecutionModulesManager modulesManager; private ThreadGroup processesThreadGroup; private Map runningProcesses = Collections .synchronizedMap(new HashMap()); + private String defaultModulePrefix = null; + /* * LIFECYCLE */ /** Initialization */ public void init() { - agentDescriptor = new SlcAgentDescriptor(); - agentDescriptor.setUuid(initAgentUuid()); - try { - agentDescriptor.setHost(InetAddress.getLocalHost().getHostName()); - } catch (UnknownHostException e) { - log.error("Cannot resolve localhost host name: " + e.getMessage()); - agentDescriptor.setHost("localhost"); - } + agentUuid = initAgentUuid(); + // agentDescriptor = new SlcAgentDescriptor(); + // agentDescriptor.setUuid(initAgentUuid()); + // try { + // agentDescriptor.setHost(InetAddress.getLocalHost().getHostName()); + // } catch (UnknownHostException e) { + // log.error("Cannot resolve localhost host name: " + e.getMessage()); + // agentDescriptor.setHost("localhost"); + // } processesThreadGroup = new ThreadGroup("SLC Processes of Agent #" - + agentDescriptor.getUuid()); - modulesManager.registerProcessNotifier(this, - new HashMap()); + + agentUuid); + // modulesManager.registerProcessNotifier(this, + // new HashMap()); + + // final String module = System + // .getProperty(ExecutionModulesManager.UNIQUE_LAUNCH_MODULE_PROPERTY); + // final String flow = System + // .getProperty(ExecutionModulesManager.UNIQUE_LAUNCH_FLOW_PROPERTY); + // if (module != null) { + // // launch a flow and stops + // new Thread("Unique Flow") { + // @Override + // public void run() { + // executeFlowAndExit(module, null, flow); + // } + // }.start(); + // } } /** Clean up (needs to be called by overriding method) */ public void destroy() { - modulesManager.unregisterProcessNotifier(this, - new HashMap()); + // modulesManager.unregisterProcessNotifier(this, + // new HashMap()); } /** @@ -80,6 +97,13 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { return UUID.randomUUID().toString(); } + /* + * UNIQUE FLOW + */ + // protected void executeFlowAndExit(final String module, + // final String version, final String flow) { + // } + /* * SLC AGENT */ @@ -88,13 +112,63 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { modulesManager, process); processThread.start(); runningProcesses.put(process.getUuid(), processThread); - // FIXME find a way to remove them from this register + + // clean up old processes + Iterator it = runningProcesses.values().iterator(); + while (it.hasNext()) { + ProcessThread pThread = it.next(); + if (!pThread.isAlive()) + it.remove(); + } + } + + public String process(List uris) { + DefaultProcess process = new DefaultProcess(); + for (URI uri : uris) { + String[] path = uri.getPath().split("/"); + if (path.length < 3) + throw new SlcException("Badly formatted URI: " + uri); + String moduleName = path[1]; + // TODO process version + String moduleVersion = null; + StringBuilder flow = new StringBuilder(); + for (int i = 2; i < path.length; i++) + flow.append('/').append(path[i]); + + Map values = new HashMap(); + if (uri.getQuery() != null) + values = getQueryMap(uri.getQuery()); + + // Get execution module descriptor + ExecutionModuleDescriptor emd = getExecutionModuleDescriptor( + moduleName, moduleVersion); + process.getRealizedFlows().add( + emd.asRealizedFlow(flow.toString(), values)); + } + process(process); + return process.getUuid(); } - public void kill(ExecutionProcess process) { - String processUuid = process.getUuid(); + public void kill(String processUuid) { if (runningProcesses.containsKey(processUuid)) { runningProcesses.get(processUuid).interrupt(); + } else { + // assume is finished + } + } + + public void waitFor(String processUuid, Long millis) { + if (runningProcesses.containsKey(processUuid)) { + try { + if (millis != null) + runningProcesses.get(processUuid).join(millis); + else + runningProcesses.get(processUuid).join(); + } catch (InterruptedException e) { + // silent + } + } else { + // assume is finished } } @@ -102,17 +176,30 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { protected ProcessThread createProcessThread( ThreadGroup processesThreadGroup, ExecutionModulesManager modulesManager, ExecutionProcess process) { - if (!(process instanceof SlcExecution)) - throw new SlcException("Unsupported process type " - + process.getClass()); ProcessThread processThread = new ProcessThread(processesThreadGroup, - modulesManager, (SlcExecution) process); + modulesManager, process); return processThread; } public ExecutionModuleDescriptor getExecutionModuleDescriptor( - String moduleName, String version) { - return modulesManager.getExecutionModuleDescriptor(moduleName, version); + String moduleName, String moduleVersion) { + // Get execution module descriptor + ExecutionModuleDescriptor emd; + try { + modulesManager.start(new BasicNameVersion(moduleName, moduleVersion)); + emd = modulesManager.getExecutionModuleDescriptor(moduleName, + moduleVersion); + } catch (SlcException e) { + if (defaultModulePrefix != null) { + moduleName = defaultModulePrefix + "." + moduleName; + modulesManager.start(new BasicNameVersion(moduleName, + moduleVersion)); + emd = modulesManager.getExecutionModuleDescriptor(moduleName, + moduleVersion); + } else + throw e; + } + return emd; } public List listExecutionModuleDescriptors() { @@ -126,16 +213,36 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { /* * PROCESS NOTIFIER */ - public void updateStatus(ExecutionProcess process, String oldStatus, - String newStatus) { - if (newStatus.equals(ExecutionProcess.COMPLETED) - || newStatus.equals(ExecutionProcess.ERROR) - || newStatus.equals(ExecutionProcess.KILLED)) { - runningProcesses.remove(process.getUuid()); - } - } + // public void updateStatus(ExecutionProcess process, String oldStatus, + // String newStatus) { + // if (newStatus.equals(ExecutionProcess.COMPLETED) + // || newStatus.equals(ExecutionProcess.ERROR) + // || newStatus.equals(ExecutionProcess.KILLED)) { + // runningProcesses.remove(process.getUuid()); + // } + // } + // + // public void addSteps(ExecutionProcess process, List steps) + // { + // } - public void addSteps(ExecutionProcess process, List steps) { + /* + * UTILITIES + */ + private static Map getQueryMap(String query) { + String[] params = query.split("&"); + Map map = new LinkedHashMap(); + for (String param : params) { + String name = param.split("=")[0]; + String value = param.split("=")[1]; + try { + map.put(URLDecoder.decode(name, UTF8), + URLDecoder.decode(value, UTF8)); + } catch (UnsupportedEncodingException e) { + throw new SlcException("Cannot decode '" + param + "'", e); + } + } + return map; } /* @@ -145,16 +252,16 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { this.modulesManager = modulesManager; } - protected SlcAgentDescriptor getAgentDescriptor() { - return agentDescriptor; + public void setDefaultModulePrefix(String defaultModulePrefix) { + this.defaultModulePrefix = defaultModulePrefix; } public String getAgentUuid() { - return agentDescriptor.getUuid(); + return agentUuid; } @Override public String toString() { - return agentDescriptor.toString(); + return "Agent #" + getAgentUuid(); } }