X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=runtime%2Forg.argeo.slc.core%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fcore%2Fexecution%2FDefaultAgent.java;h=aaa687fd77ccc9a78eba7eecf5601e4efb7f7a91;hb=6de9c4036be9e318f59a0ffa187570f5999c53cb;hp=e892b1d14dea025c0a49433d5ce47cce9838e0d6;hpb=a75c0516aca20f9a8c8fdd32feee402257ff2b61;p=gpl%2Fargeo-slc.git diff --git a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java index e892b1d14..aaa687fd7 100644 --- a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java +++ b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java @@ -15,75 +15,52 @@ */ package org.argeo.slc.core.execution; -import java.net.InetAddress; -import java.net.UnknownHostException; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URLDecoder; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.argeo.slc.BasicNameVersion; +import org.argeo.slc.NameVersion; import org.argeo.slc.SlcException; import org.argeo.slc.execution.ExecutionModuleDescriptor; import org.argeo.slc.execution.ExecutionModulesManager; import org.argeo.slc.execution.ExecutionProcess; -import org.argeo.slc.execution.ExecutionProcessNotifier; -import org.argeo.slc.execution.ExecutionStep; import org.argeo.slc.execution.SlcAgent; -import org.argeo.slc.execution.SlcAgentDescriptor; -import org.argeo.slc.process.SlcExecution; /** Implements the base methods of an SLC agent. */ -@SuppressWarnings("deprecation") -public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { - private final static Log log = LogFactory.getLog(DefaultAgent.class); +public class DefaultAgent implements SlcAgent { + // private final static Log log = LogFactory.getLog(DefaultAgent.class); + /** UTF-8 charset for encoding. */ + private final static String UTF8 = "UTF-8"; - private SlcAgentDescriptor agentDescriptor; + private String agentUuid = null; private ExecutionModulesManager modulesManager; private ThreadGroup processesThreadGroup; private Map runningProcesses = Collections .synchronizedMap(new HashMap()); + private String defaultModulePrefix = null; + /* * LIFECYCLE */ /** Initialization */ public void init() { - agentDescriptor = new SlcAgentDescriptor(); - agentDescriptor.setUuid(initAgentUuid()); - try { - agentDescriptor.setHost(InetAddress.getLocalHost().getHostName()); - } catch (UnknownHostException e) { - log.error("Cannot resolve localhost host name: " + e.getMessage()); - agentDescriptor.setHost("localhost"); - } + agentUuid = initAgentUuid(); processesThreadGroup = new ThreadGroup("SLC Processes of Agent #" - + agentDescriptor.getUuid()); - modulesManager.registerProcessNotifier(this, - new HashMap()); - - // final String module = System - // .getProperty(ExecutionModulesManager.UNIQUE_LAUNCH_MODULE_PROPERTY); - // final String flow = System - // .getProperty(ExecutionModulesManager.UNIQUE_LAUNCH_FLOW_PROPERTY); - // if (module != null) { - // // launch a flow and stops - // new Thread("Unique Flow") { - // @Override - // public void run() { - // executeFlowAndExit(module, null, flow); - // } - // }.start(); - // } + + agentUuid); } /** Clean up (needs to be called by overriding method) */ public void destroy() { - modulesManager.unregisterProcessNotifier(this, - new HashMap()); } /** @@ -94,13 +71,6 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { return UUID.randomUUID().toString(); } - /* - * UNIQUE FLOW - */ - // protected void executeFlowAndExit(final String module, - // final String version, final String flow) { - // } - /* * SLC AGENT */ @@ -109,13 +79,58 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { modulesManager, process); processThread.start(); runningProcesses.put(process.getUuid(), processThread); - // FIXME find a way to remove them from this register + + // clean up old processes + Iterator it = runningProcesses.values().iterator(); + while (it.hasNext()) { + ProcessThread pThread = it.next(); + if (!pThread.isAlive()) + it.remove(); + } + } + + public String process(List uris) { + DefaultProcess process = new DefaultProcess(); + for (URI uri : uris) { + String[] path = uri.getPath().split("/"); + if (path.length < 3) + throw new SlcException("Badly formatted URI: " + uri); + NameVersion nameVersion = new BasicNameVersion(path[1]); + StringBuilder flow = new StringBuilder(); + for (int i = 2; i < path.length; i++) + flow.append('/').append(path[i]); + + Map values = getQueryMap(uri.getQuery()); + // Get execution module descriptor + ExecutionModuleDescriptor emd = getExecutionModuleDescriptor( + nameVersion.getName(), nameVersion.getVersion()); + process.getRealizedFlows().add( + emd.asRealizedFlow(flow.toString(), values)); + } + process(process); + return process.getUuid(); } - public void kill(ExecutionProcess process) { - String processUuid = process.getUuid(); + public void kill(String processUuid) { if (runningProcesses.containsKey(processUuid)) { runningProcesses.get(processUuid).interrupt(); + } else { + // assume is finished + } + } + + public void waitFor(String processUuid, Long millis) { + if (runningProcesses.containsKey(processUuid)) { + try { + if (millis != null) + runningProcesses.get(processUuid).join(millis); + else + runningProcesses.get(processUuid).join(); + } catch (InterruptedException e) { + // silent + } + } else { + // assume is finished } } @@ -123,17 +138,31 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { protected ProcessThread createProcessThread( ThreadGroup processesThreadGroup, ExecutionModulesManager modulesManager, ExecutionProcess process) { - if (!(process instanceof SlcExecution)) - throw new SlcException("Unsupported process type " - + process.getClass()); ProcessThread processThread = new ProcessThread(processesThreadGroup, - modulesManager, (SlcExecution) process); + modulesManager, process); return processThread; } public ExecutionModuleDescriptor getExecutionModuleDescriptor( - String moduleName, String version) { - return modulesManager.getExecutionModuleDescriptor(moduleName, version); + String moduleName, String moduleVersion) { + // Get execution module descriptor + ExecutionModuleDescriptor emd; + try { + modulesManager + .start(new BasicNameVersion(moduleName, moduleVersion)); + emd = modulesManager.getExecutionModuleDescriptor(moduleName, + moduleVersion); + } catch (SlcException e) { + if (defaultModulePrefix != null) { + moduleName = defaultModulePrefix + "." + moduleName; + modulesManager.start(new BasicNameVersion(moduleName, + moduleVersion)); + emd = modulesManager.getExecutionModuleDescriptor(moduleName, + moduleVersion); + } else + throw e; + } + return emd; } public List listExecutionModuleDescriptors() { @@ -145,18 +174,29 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { } /* - * PROCESS NOTIFIER + * UTILITIES + */ + /** + * @param query + * can be null */ - public void updateStatus(ExecutionProcess process, String oldStatus, - String newStatus) { - if (newStatus.equals(ExecutionProcess.COMPLETED) - || newStatus.equals(ExecutionProcess.ERROR) - || newStatus.equals(ExecutionProcess.KILLED)) { - runningProcesses.remove(process.getUuid()); + static Map getQueryMap(String query) { + Map map = new LinkedHashMap(); + if (query == null) + return map; + String[] params = query.split("&"); + for (String param : params) { + String[] arr = param.split("="); + String name = arr[0]; + Object value = arr.length > 1 ? param.split("=")[1] : Boolean.TRUE; + try { + map.put(URLDecoder.decode(name, UTF8), + URLDecoder.decode(value.toString(), UTF8)); + } catch (UnsupportedEncodingException e) { + throw new SlcException("Cannot decode '" + param + "'", e); + } } - } - - public void addSteps(ExecutionProcess process, List steps) { + return map; } /* @@ -166,16 +206,16 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { this.modulesManager = modulesManager; } - protected SlcAgentDescriptor getAgentDescriptor() { - return agentDescriptor; + public void setDefaultModulePrefix(String defaultModulePrefix) { + this.defaultModulePrefix = defaultModulePrefix; } public String getAgentUuid() { - return agentDescriptor.getUuid(); + return agentUuid; } @Override public String toString() { - return agentDescriptor.toString(); + return "Agent #" + getAgentUuid(); } }