]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/DefaultAgent.java
Prepare next development cycle
[gpl/argeo-slc.git] / runtime / DefaultAgent.java
1 package org.argeo.slc.runtime;
2
3 import java.io.UnsupportedEncodingException;
4 import java.net.URI;
5 import java.net.URLDecoder;
6 import java.util.Collections;
7 import java.util.HashMap;
8 import java.util.Iterator;
9 import java.util.LinkedHashMap;
10 import java.util.List;
11 import java.util.Map;
12 import java.util.UUID;
13
14 import org.argeo.slc.DefaultNameVersion;
15 import org.argeo.slc.NameVersion;
16 import org.argeo.slc.SlcException;
17 import org.argeo.slc.execution.ExecutionModuleDescriptor;
18 import org.argeo.slc.execution.ExecutionModulesManager;
19 import org.argeo.slc.execution.ExecutionProcess;
20 import org.argeo.slc.execution.SlcAgent;
21
22 /** Implements the base methods of an SLC agent. */
23 public class DefaultAgent implements SlcAgent {
24 // private final static Log log = LogFactory.getLog(DefaultAgent.class);
25 /** UTF-8 charset for encoding. */
26 private final static String UTF8 = "UTF-8";
27
28 private String agentUuid = null;
29 private ExecutionModulesManager modulesManager;
30
31 private ThreadGroup processesThreadGroup;
32 private Map<String, ProcessThread> runningProcesses = Collections
33 .synchronizedMap(new HashMap<String, ProcessThread>());
34
35 private String defaultModulePrefix = null;
36
37 /*
38 * LIFECYCLE
39 */
40 /** Initialization */
41 public void init() {
42 agentUuid = initAgentUuid();
43 processesThreadGroup = new ThreadGroup("SLC Processes of Agent #"
44 + agentUuid);
45 }
46
47 /** Clean up (needs to be called by overriding method) */
48 public void destroy() {
49 }
50
51 /**
52 * Called during initialization in order to determines the agent UUID. To be
53 * overridden. By default creates a new one per instance.
54 */
55 protected String initAgentUuid() {
56 return UUID.randomUUID().toString();
57 }
58
59 /*
60 * SLC AGENT
61 */
62 public void process(ExecutionProcess process) {
63 ProcessThread processThread = createProcessThread(processesThreadGroup,
64 modulesManager, process);
65 processThread.start();
66 runningProcesses.put(process.getUuid(), processThread);
67
68 // clean up old processes
69 Iterator<ProcessThread> it = runningProcesses.values().iterator();
70 while (it.hasNext()) {
71 ProcessThread pThread = it.next();
72 if (!pThread.isAlive())
73 it.remove();
74 }
75 }
76
77 public String process(List<URI> uris) {
78 DefaultProcess process = new DefaultProcess();
79 for (URI uri : uris) {
80 String[] path = uri.getPath().split("/");
81 if (path.length < 3)
82 throw new SlcException("Badly formatted URI: " + uri);
83 NameVersion nameVersion = new DefaultNameVersion(path[1]);
84 StringBuilder flow = new StringBuilder();
85 for (int i = 2; i < path.length; i++)
86 flow.append('/').append(path[i]);
87
88 Map<String, Object> values = getQueryMap(uri.getQuery());
89 // Get execution module descriptor
90 ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
91 nameVersion.getName(), nameVersion.getVersion());
92 process.getRealizedFlows().add(
93 emd.asRealizedFlow(flow.toString(), values));
94 }
95 process(process);
96 return process.getUuid();
97 }
98
99 public void kill(String processUuid) {
100 if (runningProcesses.containsKey(processUuid)) {
101 runningProcesses.get(processUuid).interrupt();
102 } else {
103 // assume is finished
104 }
105 }
106
107 public void waitFor(String processUuid, Long millis) {
108 if (runningProcesses.containsKey(processUuid)) {
109 try {
110 if (millis != null)
111 runningProcesses.get(processUuid).join(millis);
112 else
113 runningProcesses.get(processUuid).join();
114 } catch (InterruptedException e) {
115 // silent
116 }
117 } else {
118 // assume is finished
119 }
120 }
121
122 /** Creates the thread which will coordinate the execution for this agent. */
123 protected ProcessThread createProcessThread(
124 ThreadGroup processesThreadGroup,
125 ExecutionModulesManager modulesManager, ExecutionProcess process) {
126 ProcessThread processThread = new ProcessThread(processesThreadGroup,
127 modulesManager, process);
128 return processThread;
129 }
130
131 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
132 String moduleName, String moduleVersion) {
133 // Get execution module descriptor
134 ExecutionModuleDescriptor emd;
135 try {
136 modulesManager
137 .start(new DefaultNameVersion(moduleName, moduleVersion));
138 emd = modulesManager.getExecutionModuleDescriptor(moduleName,
139 moduleVersion);
140 } catch (SlcException e) {
141 if (defaultModulePrefix != null) {
142 moduleName = defaultModulePrefix + "." + moduleName;
143 modulesManager.start(new DefaultNameVersion(moduleName,
144 moduleVersion));
145 emd = modulesManager.getExecutionModuleDescriptor(moduleName,
146 moduleVersion);
147 } else
148 throw e;
149 }
150 return emd;
151 }
152
153 public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
154 return modulesManager.listExecutionModules();
155 }
156
157 public boolean ping() {
158 return true;
159 }
160
161 /*
162 * UTILITIES
163 */
164 /**
165 * @param query
166 * can be null
167 */
168 static Map<String, Object> getQueryMap(String query) {
169 Map<String, Object> map = new LinkedHashMap<String, Object>();
170 if (query == null)
171 return map;
172 String[] params = query.split("&");
173 for (String param : params) {
174 String[] arr = param.split("=");
175 String name = arr[0];
176 Object value = arr.length > 1 ? param.split("=")[1] : Boolean.TRUE;
177 try {
178 map.put(URLDecoder.decode(name, UTF8),
179 URLDecoder.decode(value.toString(), UTF8));
180 } catch (UnsupportedEncodingException e) {
181 throw new SlcException("Cannot decode '" + param + "'", e);
182 }
183 }
184 return map;
185 }
186
187 /*
188 * BEAN
189 */
190 public void setModulesManager(ExecutionModulesManager modulesManager) {
191 this.modulesManager = modulesManager;
192 }
193
194 public void setDefaultModulePrefix(String defaultModulePrefix) {
195 this.defaultModulePrefix = defaultModulePrefix;
196 }
197
198 public String getAgentUuid() {
199 return agentUuid;
200 }
201
202 @Override
203 public String toString() {
204 return "Agent #" + getAgentUuid();
205 }
206 }