*/
package org.argeo.slc.core.execution;
+import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
+import java.net.URI;
+import java.net.URLDecoder;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.BasicNameVersion;
+import org.argeo.slc.SlcException;
import org.argeo.slc.execution.ExecutionModuleDescriptor;
import org.argeo.slc.execution.ExecutionModulesManager;
import org.argeo.slc.execution.ExecutionProcess;
-import org.argeo.slc.execution.ExecutionProcessNotifier;
-import org.argeo.slc.execution.ExecutionStep;
import org.argeo.slc.execution.SlcAgent;
import org.argeo.slc.execution.SlcAgentDescriptor;
/** Implements the base methods of an SLC agent. */
-public class DefaultAgent implements SlcAgent, ExecutionProcessNotifier {
+public class DefaultAgent implements SlcAgent {
private final static Log log = LogFactory.getLog(DefaultAgent.class);
+ /** UTF-8 charset for encoding. */
+ private final static String UTF8 = "UTF-8";
private SlcAgentDescriptor agentDescriptor;
private ExecutionModulesManager modulesManager;
}
processesThreadGroup = new ThreadGroup("SLC Processes of Agent #"
+ agentDescriptor.getUuid());
- modulesManager.registerProcessNotifier(this,
- new HashMap<String, String>());
+ // modulesManager.registerProcessNotifier(this,
+ // new HashMap<String, String>());
// final String module = System
// .getProperty(ExecutionModulesManager.UNIQUE_LAUNCH_MODULE_PROPERTY);
/** Clean up (needs to be called by overriding method) */
public void destroy() {
- modulesManager.unregisterProcessNotifier(this,
- new HashMap<String, String>());
+ // modulesManager.unregisterProcessNotifier(this,
+ // new HashMap<String, String>());
}
/**
modulesManager, process);
processThread.start();
runningProcesses.put(process.getUuid(), processThread);
- // FIXME find a way to remove them from this register
+
+ // clean up old processes
+ Iterator<ProcessThread> it = runningProcesses.values().iterator();
+ while (it.hasNext()) {
+ ProcessThread pThread = it.next();
+ if (!pThread.isAlive())
+ it.remove();
+ }
+ }
+
+ public String process(List<URI> uris) {
+ DefaultProcess process = new DefaultProcess();
+ for (URI uri : uris) {
+ String[] path = uri.getPath().split("/");
+ if (path.length < 3)
+ throw new SlcException("Badly formatted URI: " + uri);
+ String module = path[1];
+ StringBuilder flow = new StringBuilder();
+ for (int i = 2; i < path.length; i++)
+ flow.append('/').append(path[i]);
+
+ Map<String, Object> values = new HashMap<String, Object>();
+ if (uri.getQuery() != null)
+ values = getQueryMap(uri.getQuery());
+
+ modulesManager.start(new BasicNameVersion(module, null));
+ ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
+ module, null);
+ process.getRealizedFlows().add(
+ emd.asRealizedFlow(flow.toString(), values));
+ }
+ process(process);
+ return process.getUuid();
}
- public void kill(ExecutionProcess process) {
- String processUuid = process.getUuid();
+ public void kill(String processUuid) {
if (runningProcesses.containsKey(processUuid)) {
runningProcesses.get(processUuid).interrupt();
+ } else {
+ // assume is finished
+ }
+ }
+
+ public void waitFor(String processUuid, Long millis) {
+ if (runningProcesses.containsKey(processUuid)) {
+ try {
+ if (millis != null)
+ runningProcesses.get(processUuid).join(millis);
+ else
+ runningProcesses.get(processUuid).join();
+ } catch (InterruptedException e) {
+ // silent
+ }
+ } else {
+ // assume is finished
}
}
/*
* PROCESS NOTIFIER
*/
- public void updateStatus(ExecutionProcess process, String oldStatus,
- String newStatus) {
- if (newStatus.equals(ExecutionProcess.COMPLETED)
- || newStatus.equals(ExecutionProcess.ERROR)
- || newStatus.equals(ExecutionProcess.KILLED)) {
- runningProcesses.remove(process.getUuid());
- }
- }
+ // public void updateStatus(ExecutionProcess process, String oldStatus,
+ // String newStatus) {
+ // if (newStatus.equals(ExecutionProcess.COMPLETED)
+ // || newStatus.equals(ExecutionProcess.ERROR)
+ // || newStatus.equals(ExecutionProcess.KILLED)) {
+ // runningProcesses.remove(process.getUuid());
+ // }
+ // }
+ //
+ // public void addSteps(ExecutionProcess process, List<ExecutionStep> steps)
+ // {
+ // }
- public void addSteps(ExecutionProcess process, List<ExecutionStep> steps) {
+ /*
+ * UTILITIES
+ */
+ private static Map<String, Object> getQueryMap(String query) {
+ String[] params = query.split("&");
+ Map<String, Object> map = new LinkedHashMap<String, Object>();
+ for (String param : params) {
+ String name = param.split("=")[0];
+ String value = param.split("=")[1];
+ try {
+ map.put(URLDecoder.decode(name, UTF8),
+ URLDecoder.decode(value, UTF8));
+ } catch (UnsupportedEncodingException e) {
+ throw new SlcException("Cannot decode '" + param + "'", e);
+ }
+ }
+ return map;
}
/*