]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.core/src/main/java/org/argeo/slc/core/execution/DefaultAgent.java
Help working
[gpl/argeo-slc.git] / runtime / org.argeo.slc.core / src / main / java / org / argeo / slc / core / execution / DefaultAgent.java
index e69b1cf7af749771d7fa4aaab10dc47479db24a2..aaa687fd77ccc9a78eba7eecf5601e4efb7f7a91 100644 (file)
  */
 package org.argeo.slc.core.execution;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLDecoder;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.BasicNameVersion;
+import org.argeo.slc.NameVersion;
 import org.argeo.slc.SlcException;
 import org.argeo.slc.execution.ExecutionModuleDescriptor;
 import org.argeo.slc.execution.ExecutionModulesManager;
 import org.argeo.slc.execution.ExecutionProcess;
-import org.argeo.slc.execution.ExecutionProcessNotifier;
-import org.argeo.slc.execution.ExecutionStep;
-import org.argeo.slc.process.SlcExecution;
-import org.argeo.slc.runtime.SlcAgent;
-import org.argeo.slc.runtime.SlcAgentDescriptor;
+import org.argeo.slc.execution.SlcAgent;
 
 /** Implements the base methods of an SLC agent. */
-@SuppressWarnings("deprecation")
-public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
-       private final static Log log = LogFactory.getLog(DefaultAgent.class);
+public class DefaultAgent implements SlcAgent {
+       // private final static Log log = LogFactory.getLog(DefaultAgent.class);
+       /** UTF-8 charset for encoding. */
+       private final static String UTF8 = "UTF-8";
 
-       private SlcAgentDescriptor agentDescriptor;
+       private String agentUuid = null;
        private ExecutionModulesManager modulesManager;
 
        private ThreadGroup processesThreadGroup;
        private Map<String, ProcessThread> runningProcesses = Collections
                        .synchronizedMap(new HashMap<String, ProcessThread>());
 
+       private String defaultModulePrefix = null;
+
        /*
         * LIFECYCLE
         */
        /** Initialization */
        public void init() {
-               agentDescriptor = new SlcAgentDescriptor();
-               agentDescriptor.setUuid(initAgentUuid());
-               try {
-                       agentDescriptor.setHost(InetAddress.getLocalHost().getHostName());
-               } catch (UnknownHostException e) {
-                       log.error("Cannot resolve localhost host name: " + e.getMessage());
-                       agentDescriptor.setHost("localhost");
-               }
+               agentUuid = initAgentUuid();
                processesThreadGroup = new ThreadGroup("SLC Processes of Agent #"
-                               + agentDescriptor.getUuid());
-               modulesManager.registerProcessNotifier(this,
-                               new HashMap<String, String>());
+                               + agentUuid);
        }
 
        /** Clean up (needs to be called by overriding method) */
        public void destroy() {
-               modulesManager.unregisterProcessNotifier(this,
-                               new HashMap<String, String>());
        }
 
        /**
@@ -88,13 +79,58 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
                                modulesManager, process);
                processThread.start();
                runningProcesses.put(process.getUuid(), processThread);
-               // FIXME find a way to remove them from this register
+
+               // clean up old processes
+               Iterator<ProcessThread> it = runningProcesses.values().iterator();
+               while (it.hasNext()) {
+                       ProcessThread pThread = it.next();
+                       if (!pThread.isAlive())
+                               it.remove();
+               }
        }
 
-       public void kill(ExecutionProcess process) {
-               String processUuid = process.getUuid();
+       public String process(List<URI> uris) {
+               DefaultProcess process = new DefaultProcess();
+               for (URI uri : uris) {
+                       String[] path = uri.getPath().split("/");
+                       if (path.length < 3)
+                               throw new SlcException("Badly formatted URI: " + uri);
+                       NameVersion nameVersion = new BasicNameVersion(path[1]);
+                       StringBuilder flow = new StringBuilder();
+                       for (int i = 2; i < path.length; i++)
+                               flow.append('/').append(path[i]);
+
+                       Map<String, Object> values = getQueryMap(uri.getQuery());
+                       // Get execution module descriptor
+                       ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
+                                       nameVersion.getName(), nameVersion.getVersion());
+                       process.getRealizedFlows().add(
+                                       emd.asRealizedFlow(flow.toString(), values));
+               }
+               process(process);
+               return process.getUuid();
+       }
+
+       public void kill(String processUuid) {
                if (runningProcesses.containsKey(processUuid)) {
                        runningProcesses.get(processUuid).interrupt();
+               } else {
+                       // assume is finished
+               }
+       }
+
+       public void waitFor(String processUuid, Long millis) {
+               if (runningProcesses.containsKey(processUuid)) {
+                       try {
+                               if (millis != null)
+                                       runningProcesses.get(processUuid).join(millis);
+                               else
+                                       runningProcesses.get(processUuid).join();
+                       } catch (InterruptedException e) {
+                               // silent
+                       }
+               } else {
+                       // assume is finished
                }
        }
 
@@ -102,17 +138,31 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
        protected ProcessThread createProcessThread(
                        ThreadGroup processesThreadGroup,
                        ExecutionModulesManager modulesManager, ExecutionProcess process) {
-               if (!(process instanceof SlcExecution))
-                       throw new SlcException("Unsupported process type "
-                                       + process.getClass());
                ProcessThread processThread = new ProcessThread(processesThreadGroup,
-                               modulesManager, (SlcExecution) process);
+                               modulesManager, process);
                return processThread;
        }
 
        public ExecutionModuleDescriptor getExecutionModuleDescriptor(
-                       String moduleName, String version) {
-               return modulesManager.getExecutionModuleDescriptor(moduleName, version);
+                       String moduleName, String moduleVersion) {
+               // Get execution module descriptor
+               ExecutionModuleDescriptor emd;
+               try {
+                       modulesManager
+                                       .start(new BasicNameVersion(moduleName, moduleVersion));
+                       emd = modulesManager.getExecutionModuleDescriptor(moduleName,
+                                       moduleVersion);
+               } catch (SlcException e) {
+                       if (defaultModulePrefix != null) {
+                               moduleName = defaultModulePrefix + "." + moduleName;
+                               modulesManager.start(new BasicNameVersion(moduleName,
+                                               moduleVersion));
+                               emd = modulesManager.getExecutionModuleDescriptor(moduleName,
+                                               moduleVersion);
+                       } else
+                               throw e;
+               }
+               return emd;
        }
 
        public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
@@ -124,18 +174,29 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
        }
 
        /*
-        * PROCESS NOTIFIER
+        * UTILITIES
         */
-       public void updateStatus(ExecutionProcess process, String oldStatus,
-                       String newStatus) {
-               if (newStatus.equals(ExecutionProcess.COMPLETED)
-                               || newStatus.equals(ExecutionProcess.ERROR)
-                               || newStatus.equals(ExecutionProcess.KILLED)) {
-                       runningProcesses.remove(process.getUuid());
+       /**
+        * @param query
+        *            can be null
+        */
+       static Map<String, Object> getQueryMap(String query) {
+               Map<String, Object> map = new LinkedHashMap<String, Object>();
+               if (query == null)
+                       return map;
+               String[] params = query.split("&");
+               for (String param : params) {
+                       String[] arr = param.split("=");
+                       String name = arr[0];
+                       Object value = arr.length > 1 ? param.split("=")[1] : Boolean.TRUE;
+                       try {
+                               map.put(URLDecoder.decode(name, UTF8),
+                                               URLDecoder.decode(value.toString(), UTF8));
+                       } catch (UnsupportedEncodingException e) {
+                               throw new SlcException("Cannot decode '" + param + "'", e);
+                       }
                }
-       }
-
-       public void addSteps(ExecutionProcess process, List<ExecutionStep> steps) {
+               return map;
        }
 
        /*
@@ -145,16 +206,16 @@ public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
                this.modulesManager = modulesManager;
        }
 
-       protected SlcAgentDescriptor getAgentDescriptor() {
-               return agentDescriptor;
+       public void setDefaultModulePrefix(String defaultModulePrefix) {
+               this.defaultModulePrefix = defaultModulePrefix;
        }
 
        public String getAgentUuid() {
-               return agentDescriptor.getUuid();
+               return agentUuid;
        }
 
        @Override
        public String toString() {
-               return agentDescriptor.toString();
+               return "Agent #" + getAgentUuid();
        }
 }