]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java
Working command line SLC
[gpl/argeo-slc.git] / runtime / org.argeo.slc.core / src / main / java / org / argeo / slc / core / execution / DefaultAgent.java
index 909fa17603382be0c74c21bdd4f757b0d3ae95ae..9bfde6edcde2ce006413167e15d33b8dd2430de9 100644 (file)
  */
 package org.argeo.slc.core.execution;
 
+import java.io.UnsupportedEncodingException;
 import java.net.InetAddress;
+import java.net.URI;
+import java.net.URLDecoder;
 import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.BasicNameVersion;
+import org.argeo.slc.SlcException;
 import org.argeo.slc.execution.ExecutionModuleDescriptor;
 import org.argeo.slc.execution.ExecutionModulesManager;
 import org.argeo.slc.execution.ExecutionProcess;
-import org.argeo.slc.execution.ExecutionProcessNotifier;
-import org.argeo.slc.execution.ExecutionStep;
 import org.argeo.slc.execution.SlcAgent;
 import org.argeo.slc.execution.SlcAgentDescriptor;
 
 /** Implements the base methods of an SLC agent. */
-public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
+public class DefaultAgent implements SlcAgent {
        private final static Log log = LogFactory.getLog(DefaultAgent.class);
+       /** UTF-8 charset for encoding. */
+       private final static String UTF8 = "UTF-8";
 
        private SlcAgentDescriptor agentDescriptor;
        private ExecutionModulesManager modulesManager;
@@ -59,8 +66,8 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
                }
                processesThreadGroup = new ThreadGroup("SLC Processes of Agent #"
                                + agentDescriptor.getUuid());
-               modulesManager.registerProcessNotifier(this,
-                               new HashMap<String, String>());
+               // modulesManager.registerProcessNotifier(this,
+               // new HashMap<String, String>());
 
                // final String module = System
                // .getProperty(ExecutionModulesManager.UNIQUE_LAUNCH_MODULE_PROPERTY);
@@ -79,8 +86,8 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
 
        /** Clean up (needs to be called by overriding method) */
        public void destroy() {
-               modulesManager.unregisterProcessNotifier(this,
-                               new HashMap<String, String>());
+               // modulesManager.unregisterProcessNotifier(this,
+               // new HashMap<String, String>());
        }
 
        /**
@@ -106,13 +113,61 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
                                modulesManager, process);
                processThread.start();
                runningProcesses.put(process.getUuid(), processThread);
-               // FIXME find a way to remove them from this register
+
+               // clean up old processes
+               Iterator<ProcessThread> it = runningProcesses.values().iterator();
+               while (it.hasNext()) {
+                       ProcessThread pThread = it.next();
+                       if (!pThread.isAlive())
+                               it.remove();
+               }
+       }
+
+       public String process(List<URI> uris) {
+               DefaultProcess process = new DefaultProcess();
+               for (URI uri : uris) {
+                       String[] path = uri.getPath().split("/");
+                       if (path.length < 3)
+                               throw new SlcException("Badly formatted URI: " + uri);
+                       String module = path[1];
+                       StringBuilder flow = new StringBuilder();
+                       for (int i = 2; i < path.length; i++)
+                               flow.append('/').append(path[i]);
+
+                       Map<String, Object> values = new HashMap<String, Object>();
+                       if (uri.getQuery() != null)
+                               values = getQueryMap(uri.getQuery());
+
+                       modulesManager.start(new BasicNameVersion(module, null));
+                       ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
+                                       module, null);
+                       process.getRealizedFlows().add(
+                                       emd.asRealizedFlow(flow.toString(), values));
+               }
+               process(process);
+               return process.getUuid();
        }
 
-       public void kill(ExecutionProcess process) {
-               String processUuid = process.getUuid();
+       public void kill(String processUuid) {
                if (runningProcesses.containsKey(processUuid)) {
                        runningProcesses.get(processUuid).interrupt();
+               } else {
+                       // assume is finished
+               }
+       }
+
+       public void waitFor(String processUuid, Long millis) {
+               if (runningProcesses.containsKey(processUuid)) {
+                       try {
+                               if (millis != null)
+                                       runningProcesses.get(processUuid).join(millis);
+                               else
+                                       runningProcesses.get(processUuid).join();
+                       } catch (InterruptedException e) {
+                               // silent
+                       }
+               } else {
+                       // assume is finished
                }
        }
 
@@ -141,16 +196,36 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
        /*
         * PROCESS NOTIFIER
         */
-       public void updateStatus(ExecutionProcess process, String oldStatus,
-                       String newStatus) {
-               if (newStatus.equals(ExecutionProcess.COMPLETED)
-                               || newStatus.equals(ExecutionProcess.ERROR)
-                               || newStatus.equals(ExecutionProcess.KILLED)) {
-                       runningProcesses.remove(process.getUuid());
-               }
-       }
+       // public void updateStatus(ExecutionProcess process, String oldStatus,
+       // String newStatus) {
+       // if (newStatus.equals(ExecutionProcess.COMPLETED)
+       // || newStatus.equals(ExecutionProcess.ERROR)
+       // || newStatus.equals(ExecutionProcess.KILLED)) {
+       // runningProcesses.remove(process.getUuid());
+       // }
+       // }
+       //
+       // public void addSteps(ExecutionProcess process, List<ExecutionStep> steps)
+       // {
+       // }
 
-       public void addSteps(ExecutionProcess process, List<ExecutionStep> steps) {
+       /*
+        * UTILITIES
+        */
+       private static Map<String, Object> getQueryMap(String query) {
+               String[] params = query.split("&");
+               Map<String, Object> map = new LinkedHashMap<String, Object>();
+               for (String param : params) {
+                       String name = param.split("=")[0];
+                       String value = param.split("=")[1];
+                       try {
+                               map.put(URLDecoder.decode(name, UTF8),
+                                               URLDecoder.decode(value, UTF8));
+                       } catch (UnsupportedEncodingException e) {
+                               throw new SlcException("Cannot decode '" + param + "'", e);
+                       }
+               }
+               return map;
        }
 
        /*