X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;ds=inline;f=runtime%2Forg.argeo.slc.core%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fcore%2Fexecution%2FDefaultAgent.java;h=9bfde6edcde2ce006413167e15d33b8dd2430de9;hb=a17824ea2422474f25e2ea0eeae310f4dd9e6361;hp=909fa17603382be0c74c21bdd4f757b0d3ae95ae;hpb=57aa5d64fb38af2d98076eeaaa11573b3147cc26;p=gpl%2Fargeo-slc.git diff --git a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java index 909fa1760..9bfde6edc 100644 --- a/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java +++ b/runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java @@ -15,27 +15,34 @@ */ package org.argeo.slc.core.execution; +import java.io.UnsupportedEncodingException; import java.net.InetAddress; +import java.net.URI; +import java.net.URLDecoder; import java.net.UnknownHostException; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.argeo.slc.BasicNameVersion; +import org.argeo.slc.SlcException; import org.argeo.slc.execution.ExecutionModuleDescriptor; import org.argeo.slc.execution.ExecutionModulesManager; import org.argeo.slc.execution.ExecutionProcess; -import org.argeo.slc.execution.ExecutionProcessNotifier; -import org.argeo.slc.execution.ExecutionStep; import org.argeo.slc.execution.SlcAgent; import org.argeo.slc.execution.SlcAgentDescriptor; /** Implements the base methods of an SLC agent. */ -public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { +public class DefaultAgent implements SlcAgent { private final static Log log = LogFactory.getLog(DefaultAgent.class); + /** UTF-8 charset for encoding. */ + private final static String UTF8 = "UTF-8"; private SlcAgentDescriptor agentDescriptor; private ExecutionModulesManager modulesManager; @@ -59,8 +66,8 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { } processesThreadGroup = new ThreadGroup("SLC Processes of Agent #" + agentDescriptor.getUuid()); - modulesManager.registerProcessNotifier(this, - new HashMap()); + // modulesManager.registerProcessNotifier(this, + // new HashMap()); // final String module = System // .getProperty(ExecutionModulesManager.UNIQUE_LAUNCH_MODULE_PROPERTY); @@ -79,8 +86,8 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { /** Clean up (needs to be called by overriding method) */ public void destroy() { - modulesManager.unregisterProcessNotifier(this, - new HashMap()); + // modulesManager.unregisterProcessNotifier(this, + // new HashMap()); } /** @@ -106,13 +113,61 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { modulesManager, process); processThread.start(); runningProcesses.put(process.getUuid(), processThread); - // FIXME find a way to remove them from this register + + // clean up old processes + Iterator it = runningProcesses.values().iterator(); + while (it.hasNext()) { + ProcessThread pThread = it.next(); + if (!pThread.isAlive()) + it.remove(); + } + } + + public String process(List uris) { + DefaultProcess process = new DefaultProcess(); + for (URI uri : uris) { + String[] path = uri.getPath().split("/"); + if (path.length < 3) + throw new SlcException("Badly formatted URI: " + uri); + String module = path[1]; + StringBuilder flow = new StringBuilder(); + for (int i = 2; i < path.length; i++) + flow.append('/').append(path[i]); + + Map values = new HashMap(); + if (uri.getQuery() != null) + values = getQueryMap(uri.getQuery()); + + modulesManager.start(new BasicNameVersion(module, null)); + ExecutionModuleDescriptor emd = getExecutionModuleDescriptor( + module, null); + process.getRealizedFlows().add( + emd.asRealizedFlow(flow.toString(), values)); + } + process(process); + return process.getUuid(); } - public void kill(ExecutionProcess process) { - String processUuid = process.getUuid(); + public void kill(String processUuid) { if (runningProcesses.containsKey(processUuid)) { runningProcesses.get(processUuid).interrupt(); + } else { + // assume is finished + } + } + + public void waitFor(String processUuid, Long millis) { + if (runningProcesses.containsKey(processUuid)) { + try { + if (millis != null) + runningProcesses.get(processUuid).join(millis); + else + runningProcesses.get(processUuid).join(); + } catch (InterruptedException e) { + // silent + } + } else { + // assume is finished } } @@ -141,16 +196,36 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier { /* * PROCESS NOTIFIER */ - public void updateStatus(ExecutionProcess process, String oldStatus, - String newStatus) { - if (newStatus.equals(ExecutionProcess.COMPLETED) - || newStatus.equals(ExecutionProcess.ERROR) - || newStatus.equals(ExecutionProcess.KILLED)) { - runningProcesses.remove(process.getUuid()); - } - } + // public void updateStatus(ExecutionProcess process, String oldStatus, + // String newStatus) { + // if (newStatus.equals(ExecutionProcess.COMPLETED) + // || newStatus.equals(ExecutionProcess.ERROR) + // || newStatus.equals(ExecutionProcess.KILLED)) { + // runningProcesses.remove(process.getUuid()); + // } + // } + // + // public void addSteps(ExecutionProcess process, List steps) + // { + // } - public void addSteps(ExecutionProcess process, List steps) { + /* + * UTILITIES + */ + private static Map getQueryMap(String query) { + String[] params = query.split("&"); + Map map = new LinkedHashMap(); + for (String param : params) { + String name = param.split("=")[0]; + String value = param.split("=")[1]; + try { + map.put(URLDecoder.decode(name, UTF8), + URLDecoder.decode(value, UTF8)); + } catch (UnsupportedEncodingException e) { + throw new SlcException("Cannot decode '" + param + "'", e); + } + } + return map; } /*