]> git.argeo.org Git - gpl/argeo-slc.git/blob - org.argeo.slc.core/src/org/argeo/slc/core/execution/DefaultAgent.java
Disable trace logging
[gpl/argeo-slc.git] / org.argeo.slc.core / src / org / argeo / slc / core / execution / DefaultAgent.java
1 /*
2 * Copyright (C) 2007-2012 Argeo GmbH
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.argeo.slc.core.execution;
17
18 import java.io.UnsupportedEncodingException;
19 import java.net.URI;
20 import java.net.URLDecoder;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.LinkedHashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.UUID;
28
29 import org.argeo.slc.DefaultNameVersion;
30 import org.argeo.slc.NameVersion;
31 import org.argeo.slc.SlcException;
32 import org.argeo.slc.execution.ExecutionModuleDescriptor;
33 import org.argeo.slc.execution.ExecutionModulesManager;
34 import org.argeo.slc.execution.ExecutionProcess;
35 import org.argeo.slc.execution.SlcAgent;
36
37 /** Implements the base methods of an SLC agent. */
38 public class DefaultAgent implements SlcAgent {
39 // private final static Log log = LogFactory.getLog(DefaultAgent.class);
40 /** UTF-8 charset for encoding. */
41 private final static String UTF8 = "UTF-8";
42
43 private String agentUuid = null;
44 private ExecutionModulesManager modulesManager;
45
46 private ThreadGroup processesThreadGroup;
47 private Map<String, ProcessThread> runningProcesses = Collections
48 .synchronizedMap(new HashMap<String, ProcessThread>());
49
50 private String defaultModulePrefix = null;
51
52 /*
53 * LIFECYCLE
54 */
55 /** Initialization */
56 public void init() {
57 agentUuid = initAgentUuid();
58 processesThreadGroup = new ThreadGroup("SLC Processes of Agent #"
59 + agentUuid);
60 }
61
62 /** Clean up (needs to be called by overriding method) */
63 public void destroy() {
64 }
65
66 /**
67 * Called during initialization in order to determines the agent UUID. To be
68 * overridden. By default creates a new one per instance.
69 */
70 protected String initAgentUuid() {
71 return UUID.randomUUID().toString();
72 }
73
74 /*
75 * SLC AGENT
76 */
77 public void process(ExecutionProcess process) {
78 ProcessThread processThread = createProcessThread(processesThreadGroup,
79 modulesManager, process);
80 processThread.start();
81 runningProcesses.put(process.getUuid(), processThread);
82
83 // clean up old processes
84 Iterator<ProcessThread> it = runningProcesses.values().iterator();
85 while (it.hasNext()) {
86 ProcessThread pThread = it.next();
87 if (!pThread.isAlive())
88 it.remove();
89 }
90 }
91
92 public String process(List<URI> uris) {
93 DefaultProcess process = new DefaultProcess();
94 for (URI uri : uris) {
95 String[] path = uri.getPath().split("/");
96 if (path.length < 3)
97 throw new SlcException("Badly formatted URI: " + uri);
98 NameVersion nameVersion = new DefaultNameVersion(path[1]);
99 StringBuilder flow = new StringBuilder();
100 for (int i = 2; i < path.length; i++)
101 flow.append('/').append(path[i]);
102
103 Map<String, Object> values = getQueryMap(uri.getQuery());
104 // Get execution module descriptor
105 ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
106 nameVersion.getName(), nameVersion.getVersion());
107 process.getRealizedFlows().add(
108 emd.asRealizedFlow(flow.toString(), values));
109 }
110 process(process);
111 return process.getUuid();
112 }
113
114 public void kill(String processUuid) {
115 if (runningProcesses.containsKey(processUuid)) {
116 runningProcesses.get(processUuid).interrupt();
117 } else {
118 // assume is finished
119 }
120 }
121
122 public void waitFor(String processUuid, Long millis) {
123 if (runningProcesses.containsKey(processUuid)) {
124 try {
125 if (millis != null)
126 runningProcesses.get(processUuid).join(millis);
127 else
128 runningProcesses.get(processUuid).join();
129 } catch (InterruptedException e) {
130 // silent
131 }
132 } else {
133 // assume is finished
134 }
135 }
136
137 /** Creates the thread which will coordinate the execution for this agent. */
138 protected ProcessThread createProcessThread(
139 ThreadGroup processesThreadGroup,
140 ExecutionModulesManager modulesManager, ExecutionProcess process) {
141 ProcessThread processThread = new ProcessThread(processesThreadGroup,
142 modulesManager, process);
143 return processThread;
144 }
145
146 public ExecutionModuleDescriptor getExecutionModuleDescriptor(
147 String moduleName, String moduleVersion) {
148 // Get execution module descriptor
149 ExecutionModuleDescriptor emd;
150 try {
151 modulesManager
152 .start(new DefaultNameVersion(moduleName, moduleVersion));
153 emd = modulesManager.getExecutionModuleDescriptor(moduleName,
154 moduleVersion);
155 } catch (SlcException e) {
156 if (defaultModulePrefix != null) {
157 moduleName = defaultModulePrefix + "." + moduleName;
158 modulesManager.start(new DefaultNameVersion(moduleName,
159 moduleVersion));
160 emd = modulesManager.getExecutionModuleDescriptor(moduleName,
161 moduleVersion);
162 } else
163 throw e;
164 }
165 return emd;
166 }
167
168 public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
169 return modulesManager.listExecutionModules();
170 }
171
172 public boolean ping() {
173 return true;
174 }
175
176 /*
177 * UTILITIES
178 */
179 /**
180 * @param query
181 * can be null
182 */
183 static Map<String, Object> getQueryMap(String query) {
184 Map<String, Object> map = new LinkedHashMap<String, Object>();
185 if (query == null)
186 return map;
187 String[] params = query.split("&");
188 for (String param : params) {
189 String[] arr = param.split("=");
190 String name = arr[0];
191 Object value = arr.length > 1 ? param.split("=")[1] : Boolean.TRUE;
192 try {
193 map.put(URLDecoder.decode(name, UTF8),
194 URLDecoder.decode(value.toString(), UTF8));
195 } catch (UnsupportedEncodingException e) {
196 throw new SlcException("Cannot decode '" + param + "'", e);
197 }
198 }
199 return map;
200 }
201
202 /*
203 * BEAN
204 */
205 public void setModulesManager(ExecutionModulesManager modulesManager) {
206 this.modulesManager = modulesManager;
207 }
208
209 public void setDefaultModulePrefix(String defaultModulePrefix) {
210 this.defaultModulePrefix = defaultModulePrefix;
211 }
212
213 public String getAgentUuid() {
214 return agentUuid;
215 }
216
217 @Override
218 public String toString() {
219 return "Agent #" + getAgentUuid();
220 }
221 }