]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java
refactor and clean of the distribution view model.
[gpl/argeo-slc.git] / runtime / org.argeo.slc.core / src / main / java / org / argeo / slc / core / execution / DefaultAgent.java
index 909fa17603382be0c74c21bdd4f757b0d3ae95ae..41c66622d01506b951651a1138747028bbd71466 100644 (file)
  */
 package org.argeo.slc.core.execution;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLDecoder;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.BasicNameVersion;
+import org.argeo.slc.SlcException;
 import org.argeo.slc.execution.ExecutionModuleDescriptor;
 import org.argeo.slc.execution.ExecutionModulesManager;
 import org.argeo.slc.execution.ExecutionProcess;
-import org.argeo.slc.execution.ExecutionProcessNotifier;
-import org.argeo.slc.execution.ExecutionStep;
 import org.argeo.slc.execution.SlcAgent;
-import org.argeo.slc.execution.SlcAgentDescriptor;
 
 /** Implements the base methods of an SLC agent. */
-public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
-       private final static Log log = LogFactory.getLog(DefaultAgent.class);
+public class DefaultAgent implements SlcAgent {
+       // private final static Log log = LogFactory.getLog(DefaultAgent.class);
+       /** UTF-8 charset for encoding. */
+       private final static String UTF8 = "UTF-8";
 
-       private SlcAgentDescriptor agentDescriptor;
+       private String agentUuid = null;
+       // private SlcAgentDescriptor agentDescriptor;
        private ExecutionModulesManager modulesManager;
 
        private ThreadGroup processesThreadGroup;
        private Map<String, ProcessThread> runningProcesses = Collections
                        .synchronizedMap(new HashMap<String, ProcessThread>());
 
+       private String defaultModulePrefix = null;
+
        /*
         * LIFECYCLE
         */
        /** Initialization */
        public void init() {
-               agentDescriptor = new SlcAgentDescriptor();
-               agentDescriptor.setUuid(initAgentUuid());
-               try {
-                       agentDescriptor.setHost(InetAddress.getLocalHost().getHostName());
-               } catch (UnknownHostException e) {
-                       log.error("Cannot resolve localhost host name: " + e.getMessage());
-                       agentDescriptor.setHost("localhost");
-               }
+               agentUuid = initAgentUuid();
+               // agentDescriptor = new SlcAgentDescriptor();
+               // agentDescriptor.setUuid(initAgentUuid());
+               // try {
+               // agentDescriptor.setHost(InetAddress.getLocalHost().getHostName());
+               // } catch (UnknownHostException e) {
+               // log.error("Cannot resolve localhost host name: " + e.getMessage());
+               // agentDescriptor.setHost("localhost");
+               // }
                processesThreadGroup = new ThreadGroup("SLC Processes of Agent #"
-                               + agentDescriptor.getUuid());
-               modulesManager.registerProcessNotifier(this,
-                               new HashMap<String, String>());
+                               + agentUuid);
+               // modulesManager.registerProcessNotifier(this,
+               // new HashMap<String, String>());
 
                // final String module = System
                // .getProperty(ExecutionModulesManager.UNIQUE_LAUNCH_MODULE_PROPERTY);
@@ -79,8 +85,8 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
 
        /** Clean up (needs to be called by overriding method) */
        public void destroy() {
-               modulesManager.unregisterProcessNotifier(this,
-                               new HashMap<String, String>());
+               // modulesManager.unregisterProcessNotifier(this,
+               // new HashMap<String, String>());
        }
 
        /**
@@ -106,13 +112,63 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
                                modulesManager, process);
                processThread.start();
                runningProcesses.put(process.getUuid(), processThread);
-               // FIXME find a way to remove them from this register
+
+               // clean up old processes
+               Iterator<ProcessThread> it = runningProcesses.values().iterator();
+               while (it.hasNext()) {
+                       ProcessThread pThread = it.next();
+                       if (!pThread.isAlive())
+                               it.remove();
+               }
        }
 
-       public void kill(ExecutionProcess process) {
-               String processUuid = process.getUuid();
+       public String process(List<URI> uris) {
+               DefaultProcess process = new DefaultProcess();
+               for (URI uri : uris) {
+                       String[] path = uri.getPath().split("/");
+                       if (path.length < 3)
+                               throw new SlcException("Badly formatted URI: " + uri);
+                       String moduleName = path[1];
+                       // TODO process version
+                       String moduleVersion = null;
+                       StringBuilder flow = new StringBuilder();
+                       for (int i = 2; i < path.length; i++)
+                               flow.append('/').append(path[i]);
+
+                       Map<String, Object> values = new HashMap<String, Object>();
+                       if (uri.getQuery() != null)
+                               values = getQueryMap(uri.getQuery());
+
+                       // Get execution module descriptor
+                       ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
+                                       moduleName, moduleVersion);
+                       process.getRealizedFlows().add(
+                                       emd.asRealizedFlow(flow.toString(), values));
+               }
+               process(process);
+               return process.getUuid();
+       }
+
+       public void kill(String processUuid) {
                if (runningProcesses.containsKey(processUuid)) {
                        runningProcesses.get(processUuid).interrupt();
+               } else {
+                       // assume is finished
+               }
+       }
+
+       public void waitFor(String processUuid, Long millis) {
+               if (runningProcesses.containsKey(processUuid)) {
+                       try {
+                               if (millis != null)
+                                       runningProcesses.get(processUuid).join(millis);
+                               else
+                                       runningProcesses.get(processUuid).join();
+                       } catch (InterruptedException e) {
+                               // silent
+                       }
+               } else {
+                       // assume is finished
                }
        }
 
@@ -126,8 +182,24 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
        }
 
        public ExecutionModuleDescriptor getExecutionModuleDescriptor(
-                       String moduleName, String version) {
-               return modulesManager.getExecutionModuleDescriptor(moduleName, version);
+                       String moduleName, String moduleVersion) {
+               // Get execution module descriptor
+               ExecutionModuleDescriptor emd;
+               try {
+                       modulesManager.start(new BasicNameVersion(moduleName, moduleVersion));
+                       emd = modulesManager.getExecutionModuleDescriptor(moduleName,
+                                       moduleVersion);
+               } catch (SlcException e) {
+                       if (defaultModulePrefix != null) {
+                               moduleName = defaultModulePrefix + "." + moduleName;
+                               modulesManager.start(new BasicNameVersion(moduleName,
+                                               moduleVersion));
+                               emd = modulesManager.getExecutionModuleDescriptor(moduleName,
+                                               moduleVersion);
+                       } else
+                               throw e;
+               }
+               return emd;
        }
 
        public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
@@ -141,16 +213,36 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
        /*
         * PROCESS NOTIFIER
         */
-       public void updateStatus(ExecutionProcess process, String oldStatus,
-                       String newStatus) {
-               if (newStatus.equals(ExecutionProcess.COMPLETED)
-                               || newStatus.equals(ExecutionProcess.ERROR)
-                               || newStatus.equals(ExecutionProcess.KILLED)) {
-                       runningProcesses.remove(process.getUuid());
-               }
-       }
+       // public void updateStatus(ExecutionProcess process, String oldStatus,
+       // String newStatus) {
+       // if (newStatus.equals(ExecutionProcess.COMPLETED)
+       // || newStatus.equals(ExecutionProcess.ERROR)
+       // || newStatus.equals(ExecutionProcess.KILLED)) {
+       // runningProcesses.remove(process.getUuid());
+       // }
+       // }
+       //
+       // public void addSteps(ExecutionProcess process, List<ExecutionStep> steps)
+       // {
+       // }
 
-       public void addSteps(ExecutionProcess process, List<ExecutionStep> steps) {
+       /*
+        * UTILITIES
+        */
+       private static Map<String, Object> getQueryMap(String query) {
+               String[] params = query.split("&");
+               Map<String, Object> map = new LinkedHashMap<String, Object>();
+               for (String param : params) {
+                       String name = param.split("=")[0];
+                       String value = param.split("=")[1];
+                       try {
+                               map.put(URLDecoder.decode(name, UTF8),
+                                               URLDecoder.decode(value, UTF8));
+                       } catch (UnsupportedEncodingException e) {
+                               throw new SlcException("Cannot decode '" + param + "'", e);
+                       }
+               }
+               return map;
        }
 
        /*
@@ -160,16 +252,16 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
                this.modulesManager = modulesManager;
        }
 
-       protected SlcAgentDescriptor getAgentDescriptor() {
-               return agentDescriptor;
+       public void setDefaultModulePrefix(String defaultModulePrefix) {
+               this.defaultModulePrefix = defaultModulePrefix;
        }
 
        public String getAgentUuid() {
-               return agentDescriptor.getUuid();
+               return agentUuid;
        }
 
        @Override
        public String toString() {
-               return agentDescriptor.toString();
+               return "Agent #" + getAgentUuid();
        }
 }