2 * Copyright (C) 2007-2012 Argeo GmbH
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org
.argeo
.slc
.core
.execution
;
18 import java
.io
.UnsupportedEncodingException
;
20 import java
.net
.URLDecoder
;
21 import java
.util
.Collections
;
22 import java
.util
.HashMap
;
23 import java
.util
.Iterator
;
24 import java
.util
.LinkedHashMap
;
25 import java
.util
.List
;
27 import java
.util
.UUID
;
29 import org
.argeo
.slc
.DefaultNameVersion
;
30 import org
.argeo
.slc
.NameVersion
;
31 import org
.argeo
.slc
.SlcException
;
32 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
33 import org
.argeo
.slc
.execution
.ExecutionModulesManager
;
34 import org
.argeo
.slc
.execution
.ExecutionProcess
;
35 import org
.argeo
.slc
.execution
.SlcAgent
;
37 /** Implements the base methods of an SLC agent. */
38 public class DefaultAgent
implements SlcAgent
{
39 // private final static Log log = LogFactory.getLog(DefaultAgent.class);
40 /** UTF-8 charset for encoding. */
41 private final static String UTF8
= "UTF-8";
43 private String agentUuid
= null;
44 private ExecutionModulesManager modulesManager
;
46 private ThreadGroup processesThreadGroup
;
47 private Map
<String
, ProcessThread
> runningProcesses
= Collections
48 .synchronizedMap(new HashMap
<String
, ProcessThread
>());
50 private String defaultModulePrefix
= null;
57 agentUuid
= initAgentUuid();
58 processesThreadGroup
= new ThreadGroup("SLC Processes of Agent #"
62 /** Clean up (needs to be called by overriding method) */
63 public void destroy() {
67 * Called during initialization in order to determines the agent UUID. To be
68 * overridden. By default creates a new one per instance.
70 protected String
initAgentUuid() {
71 return UUID
.randomUUID().toString();
77 public void process(ExecutionProcess process
) {
78 ProcessThread processThread
= createProcessThread(processesThreadGroup
,
79 modulesManager
, process
);
80 processThread
.start();
81 runningProcesses
.put(process
.getUuid(), processThread
);
83 // clean up old processes
84 Iterator
<ProcessThread
> it
= runningProcesses
.values().iterator();
85 while (it
.hasNext()) {
86 ProcessThread pThread
= it
.next();
87 if (!pThread
.isAlive())
92 public String
process(List
<URI
> uris
) {
93 DefaultProcess process
= new DefaultProcess();
94 for (URI uri
: uris
) {
95 String
[] path
= uri
.getPath().split("/");
97 throw new SlcException("Badly formatted URI: " + uri
);
98 NameVersion nameVersion
= new DefaultNameVersion(path
[1]);
99 StringBuilder flow
= new StringBuilder();
100 for (int i
= 2; i
< path
.length
; i
++)
101 flow
.append('/').append(path
[i
]);
103 Map
<String
, Object
> values
= getQueryMap(uri
.getQuery());
104 // Get execution module descriptor
105 ExecutionModuleDescriptor emd
= getExecutionModuleDescriptor(
106 nameVersion
.getName(), nameVersion
.getVersion());
107 process
.getRealizedFlows().add(
108 emd
.asRealizedFlow(flow
.toString(), values
));
111 return process
.getUuid();
114 public void kill(String processUuid
) {
115 if (runningProcesses
.containsKey(processUuid
)) {
116 runningProcesses
.get(processUuid
).interrupt();
118 // assume is finished
122 public void waitFor(String processUuid
, Long millis
) {
123 if (runningProcesses
.containsKey(processUuid
)) {
126 runningProcesses
.get(processUuid
).join(millis
);
128 runningProcesses
.get(processUuid
).join();
129 } catch (InterruptedException e
) {
133 // assume is finished
137 /** Creates the thread which will coordinate the execution for this agent. */
138 protected ProcessThread
createProcessThread(
139 ThreadGroup processesThreadGroup
,
140 ExecutionModulesManager modulesManager
, ExecutionProcess process
) {
141 ProcessThread processThread
= new ProcessThread(processesThreadGroup
,
142 modulesManager
, process
);
143 return processThread
;
146 public ExecutionModuleDescriptor
getExecutionModuleDescriptor(
147 String moduleName
, String moduleVersion
) {
148 // Get execution module descriptor
149 ExecutionModuleDescriptor emd
;
152 .start(new DefaultNameVersion(moduleName
, moduleVersion
));
153 emd
= modulesManager
.getExecutionModuleDescriptor(moduleName
,
155 } catch (SlcException e
) {
156 if (defaultModulePrefix
!= null) {
157 moduleName
= defaultModulePrefix
+ "." + moduleName
;
158 modulesManager
.start(new DefaultNameVersion(moduleName
,
160 emd
= modulesManager
.getExecutionModuleDescriptor(moduleName
,
168 public List
<ExecutionModuleDescriptor
> listExecutionModuleDescriptors() {
169 return modulesManager
.listExecutionModules();
172 public boolean ping() {
183 static Map
<String
, Object
> getQueryMap(String query
) {
184 Map
<String
, Object
> map
= new LinkedHashMap
<String
, Object
>();
187 String
[] params
= query
.split("&");
188 for (String param
: params
) {
189 String
[] arr
= param
.split("=");
190 String name
= arr
[0];
191 Object value
= arr
.length
> 1 ? param
.split("=")[1] : Boolean
.TRUE
;
193 map
.put(URLDecoder
.decode(name
, UTF8
),
194 URLDecoder
.decode(value
.toString(), UTF8
));
195 } catch (UnsupportedEncodingException e
) {
196 throw new SlcException("Cannot decode '" + param
+ "'", e
);
205 public void setModulesManager(ExecutionModulesManager modulesManager
) {
206 this.modulesManager
= modulesManager
;
209 public void setDefaultModulePrefix(String defaultModulePrefix
) {
210 this.defaultModulePrefix
= defaultModulePrefix
;
213 public String
getAgentUuid() {
218 public String
toString() {
219 return "Agent #" + getAgentUuid();