+++ /dev/null
-package org.argeo.slc.runtime;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URLDecoder;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.argeo.slc.DefaultNameVersion;
-import org.argeo.slc.NameVersion;
-import org.argeo.slc.SlcException;
-import org.argeo.slc.execution.ExecutionModuleDescriptor;
-import org.argeo.slc.execution.ExecutionModulesManager;
-import org.argeo.slc.execution.ExecutionProcess;
-import org.argeo.slc.execution.SlcAgent;
-
-/** Implements the base methods of an SLC agent. */
-public class DefaultAgent implements SlcAgent {
- // private final static Log log = LogFactory.getLog(DefaultAgent.class);
- /** UTF-8 charset for encoding. */
- private final static String UTF8 = "UTF-8";
-
- private String agentUuid = null;
- private ExecutionModulesManager modulesManager;
-
- private ThreadGroup processesThreadGroup;
- private Map<String, ProcessThread> runningProcesses = Collections
- .synchronizedMap(new HashMap<String, ProcessThread>());
-
- private String defaultModulePrefix = null;
-
- /*
- * LIFECYCLE
- */
- /** Initialization */
- public void init() {
- agentUuid = initAgentUuid();
- processesThreadGroup = new ThreadGroup("SLC Processes of Agent #"
- + agentUuid);
- }
-
- /** Clean up (needs to be called by overriding method) */
- public void destroy() {
- }
-
- /**
- * Called during initialization in order to determines the agent UUID. To be
- * overridden. By default creates a new one per instance.
- */
- protected String initAgentUuid() {
- return UUID.randomUUID().toString();
- }
-
- /*
- * SLC AGENT
- */
- public void process(ExecutionProcess process) {
- ProcessThread processThread = createProcessThread(processesThreadGroup,
- modulesManager, process);
- processThread.start();
- runningProcesses.put(process.getUuid(), processThread);
-
- // clean up old processes
- Iterator<ProcessThread> it = runningProcesses.values().iterator();
- while (it.hasNext()) {
- ProcessThread pThread = it.next();
- if (!pThread.isAlive())
- it.remove();
- }
- }
-
- public String process(List<URI> uris) {
- DefaultProcess process = new DefaultProcess();
- for (URI uri : uris) {
- String[] path = uri.getPath().split("/");
- if (path.length < 3)
- throw new SlcException("Badly formatted URI: " + uri);
- NameVersion nameVersion = new DefaultNameVersion(path[1]);
- StringBuilder flow = new StringBuilder();
- for (int i = 2; i < path.length; i++)
- flow.append('/').append(path[i]);
-
- Map<String, Object> values = getQueryMap(uri.getQuery());
- // Get execution module descriptor
- ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
- nameVersion.getName(), nameVersion.getVersion());
- process.getRealizedFlows().add(
- emd.asRealizedFlow(flow.toString(), values));
- }
- process(process);
- return process.getUuid();
- }
-
- public void kill(String processUuid) {
- if (runningProcesses.containsKey(processUuid)) {
- runningProcesses.get(processUuid).interrupt();
- } else {
- // assume is finished
- }
- }
-
- public void waitFor(String processUuid, Long millis) {
- if (runningProcesses.containsKey(processUuid)) {
- try {
- if (millis != null)
- runningProcesses.get(processUuid).join(millis);
- else
- runningProcesses.get(processUuid).join();
- } catch (InterruptedException e) {
- // silent
- }
- } else {
- // assume is finished
- }
- }
-
- /** Creates the thread which will coordinate the execution for this agent. */
- protected ProcessThread createProcessThread(
- ThreadGroup processesThreadGroup,
- ExecutionModulesManager modulesManager, ExecutionProcess process) {
- ProcessThread processThread = new ProcessThread(processesThreadGroup,
- modulesManager, process);
- return processThread;
- }
-
- public ExecutionModuleDescriptor getExecutionModuleDescriptor(
- String moduleName, String moduleVersion) {
- // Get execution module descriptor
- ExecutionModuleDescriptor emd;
- try {
- modulesManager
- .start(new DefaultNameVersion(moduleName, moduleVersion));
- emd = modulesManager.getExecutionModuleDescriptor(moduleName,
- moduleVersion);
- } catch (SlcException e) {
- if (defaultModulePrefix != null) {
- moduleName = defaultModulePrefix + "." + moduleName;
- modulesManager.start(new DefaultNameVersion(moduleName,
- moduleVersion));
- emd = modulesManager.getExecutionModuleDescriptor(moduleName,
- moduleVersion);
- } else
- throw e;
- }
- return emd;
- }
-
- public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
- return modulesManager.listExecutionModules();
- }
-
- public boolean ping() {
- return true;
- }
-
- /*
- * UTILITIES
- */
- /**
- * @param query
- * can be null
- */
- static Map<String, Object> getQueryMap(String query) {
- Map<String, Object> map = new LinkedHashMap<String, Object>();
- if (query == null)
- return map;
- String[] params = query.split("&");
- for (String param : params) {
- String[] arr = param.split("=");
- String name = arr[0];
- Object value = arr.length > 1 ? param.split("=")[1] : Boolean.TRUE;
- try {
- map.put(URLDecoder.decode(name, UTF8),
- URLDecoder.decode(value.toString(), UTF8));
- } catch (UnsupportedEncodingException e) {
- throw new SlcException("Cannot decode '" + param + "'", e);
- }
- }
- return map;
- }
-
- /*
- * BEAN
- */
- public void setModulesManager(ExecutionModulesManager modulesManager) {
- this.modulesManager = modulesManager;
- }
-
- public void setDefaultModulePrefix(String defaultModulePrefix) {
- this.defaultModulePrefix = defaultModulePrefix;
- }
-
- public String getAgentUuid() {
- return agentUuid;
- }
-
- @Override
- public String toString() {
- return "Agent #" + getAgentUuid();
- }
-}