1 package org
.argeo
.slc
.runtime
;
3 import java
.io
.UnsupportedEncodingException
;
5 import java
.net
.URLDecoder
;
6 import java
.util
.Collections
;
7 import java
.util
.HashMap
;
8 import java
.util
.Iterator
;
9 import java
.util
.LinkedHashMap
;
10 import java
.util
.List
;
12 import java
.util
.UUID
;
14 import org
.argeo
.api
.slc
.DefaultNameVersion
;
15 import org
.argeo
.api
.slc
.NameVersion
;
16 import org
.argeo
.api
.slc
.SlcException
;
17 import org
.argeo
.api
.slc
.execution
.ExecutionModuleDescriptor
;
18 import org
.argeo
.api
.slc
.execution
.ExecutionModulesManager
;
19 import org
.argeo
.api
.slc
.execution
.ExecutionProcess
;
20 import org
.argeo
.api
.slc
.execution
.SlcAgent
;
22 /** Implements the base methods of an SLC agent. */
23 public class DefaultAgent
implements SlcAgent
{
24 // private final static CmsLog log = CmsLog.getLog(DefaultAgent.class);
25 /** UTF-8 charset for encoding. */
26 private final static String UTF8
= "UTF-8";
28 private String agentUuid
= null;
29 private ExecutionModulesManager modulesManager
;
31 private ThreadGroup processesThreadGroup
;
32 private Map
<String
, ProcessThread
> runningProcesses
= Collections
33 .synchronizedMap(new HashMap
<String
, ProcessThread
>());
35 private String defaultModulePrefix
= null;
42 agentUuid
= initAgentUuid();
43 processesThreadGroup
= new ThreadGroup("SLC Processes of Agent #"
47 /** Clean up (needs to be called by overriding method) */
48 public void destroy() {
52 * Called during initialization in order to determines the agent UUID. To be
53 * overridden. By default creates a new one per instance.
55 protected String
initAgentUuid() {
56 return UUID
.randomUUID().toString();
62 public void process(ExecutionProcess process
) {
63 ProcessThread processThread
= createProcessThread(processesThreadGroup
,
64 modulesManager
, process
);
65 processThread
.start();
66 runningProcesses
.put(process
.getUuid(), processThread
);
68 // clean up old processes
69 Iterator
<ProcessThread
> it
= runningProcesses
.values().iterator();
70 while (it
.hasNext()) {
71 ProcessThread pThread
= it
.next();
72 if (!pThread
.isAlive())
77 public String
process(List
<URI
> uris
) {
78 DefaultProcess process
= new DefaultProcess();
79 for (URI uri
: uris
) {
80 String
[] path
= uri
.getPath().split("/");
82 throw new SlcException("Badly formatted URI: " + uri
);
83 NameVersion nameVersion
= new DefaultNameVersion(path
[1]);
84 StringBuilder flow
= new StringBuilder();
85 for (int i
= 2; i
< path
.length
; i
++)
86 flow
.append('/').append(path
[i
]);
88 Map
<String
, Object
> values
= getQueryMap(uri
.getQuery());
89 // Get execution module descriptor
90 ExecutionModuleDescriptor emd
= getExecutionModuleDescriptor(
91 nameVersion
.getName(), nameVersion
.getVersion());
92 process
.getRealizedFlows().add(
93 emd
.asRealizedFlow(flow
.toString(), values
));
96 return process
.getUuid();
99 public void kill(String processUuid
) {
100 if (runningProcesses
.containsKey(processUuid
)) {
101 runningProcesses
.get(processUuid
).interrupt();
103 // assume is finished
107 public void waitFor(String processUuid
, Long millis
) {
108 if (runningProcesses
.containsKey(processUuid
)) {
111 runningProcesses
.get(processUuid
).join(millis
);
113 runningProcesses
.get(processUuid
).join();
114 } catch (InterruptedException e
) {
118 // assume is finished
122 /** Creates the thread which will coordinate the execution for this agent. */
123 protected ProcessThread
createProcessThread(
124 ThreadGroup processesThreadGroup
,
125 ExecutionModulesManager modulesManager
, ExecutionProcess process
) {
126 ProcessThread processThread
= new ProcessThread(processesThreadGroup
,
127 modulesManager
, process
);
128 return processThread
;
131 public ExecutionModuleDescriptor
getExecutionModuleDescriptor(
132 String moduleName
, String moduleVersion
) {
133 // Get execution module descriptor
134 ExecutionModuleDescriptor emd
;
137 .start(new DefaultNameVersion(moduleName
, moduleVersion
));
138 emd
= modulesManager
.getExecutionModuleDescriptor(moduleName
,
140 } catch (SlcException e
) {
141 if (defaultModulePrefix
!= null) {
142 moduleName
= defaultModulePrefix
+ "." + moduleName
;
143 modulesManager
.start(new DefaultNameVersion(moduleName
,
145 emd
= modulesManager
.getExecutionModuleDescriptor(moduleName
,
153 public List
<ExecutionModuleDescriptor
> listExecutionModuleDescriptors() {
154 return modulesManager
.listExecutionModules();
157 public boolean ping() {
168 static Map
<String
, Object
> getQueryMap(String query
) {
169 Map
<String
, Object
> map
= new LinkedHashMap
<String
, Object
>();
172 String
[] params
= query
.split("&");
173 for (String param
: params
) {
174 String
[] arr
= param
.split("=");
175 String name
= arr
[0];
176 Object value
= arr
.length
> 1 ? param
.split("=")[1] : Boolean
.TRUE
;
178 map
.put(URLDecoder
.decode(name
, UTF8
),
179 URLDecoder
.decode(value
.toString(), UTF8
));
180 } catch (UnsupportedEncodingException e
) {
181 throw new SlcException("Cannot decode '" + param
+ "'", e
);
190 public void setModulesManager(ExecutionModulesManager modulesManager
) {
191 this.modulesManager
= modulesManager
;
194 public void setDefaultModulePrefix(String defaultModulePrefix
) {
195 this.defaultModulePrefix
= defaultModulePrefix
;
198 public String
getAgentUuid() {
203 public String
toString() {
204 return "Agent #" + getAgentUuid();