]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.osgi/src/main/java/org/argeo/slc/osgi/OsgiExecutionModulesManager.java
Update default timeout to 2 minutes (from 10s)
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.osgi / src / main / java / org / argeo / slc / osgi / OsgiExecutionModulesManager.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.osgi;
18
19 import java.lang.management.ManagementFactory;
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27
28 import javax.management.MBeanServer;
29 import javax.management.ObjectName;
30 import javax.management.StandardMBean;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.argeo.slc.BasicNameVersion;
35 import org.argeo.slc.NameVersion;
36 import org.argeo.slc.SlcException;
37 import org.argeo.slc.core.execution.AbstractExecutionModulesManager;
38 import org.argeo.slc.core.execution.DefaultExecutionFlowDescriptorConverter;
39 import org.argeo.slc.deploy.Module;
40 import org.argeo.slc.deploy.ModuleDescriptor;
41 import org.argeo.slc.execution.ExecutionContext;
42 import org.argeo.slc.execution.ExecutionFlow;
43 import org.argeo.slc.execution.ExecutionFlowDescriptor;
44 import org.argeo.slc.execution.ExecutionFlowDescriptorConverter;
45 import org.argeo.slc.execution.ExecutionModuleDescriptor;
46 import org.argeo.slc.execution.ExecutionModulesListener;
47 import org.argeo.slc.process.RealizedFlow;
48 import org.osgi.framework.Bundle;
49 import org.osgi.framework.BundleException;
50 import org.osgi.framework.Constants;
51 import org.springframework.osgi.service.importer.OsgiServiceLifecycleListener;
52
53 /** Execution modules manager implementation based on an OSGi runtime. */
54 public class OsgiExecutionModulesManager extends
55 AbstractExecutionModulesManager implements OsgiServiceLifecycleListener {
56
57 private final static Log log = LogFactory
58 .getLog(OsgiExecutionModulesManager.class);
59
60 private BundlesManager bundlesManager;
61 private Map<OsgiBundle, ExecutionContext> executionContexts = new HashMap<OsgiBundle, ExecutionContext>();
62 private Map<OsgiBundle, ExecutionFlowDescriptorConverter> executionFlowDescriptorConverters = new HashMap<OsgiBundle, ExecutionFlowDescriptorConverter>();
63 private Map<OsgiBundle, Set<ExecutionFlow>> executionFlows = new HashMap<OsgiBundle, Set<ExecutionFlow>>();
64 private ExecutionFlowDescriptorConverter defaultDescriptorConverter = new DefaultExecutionFlowDescriptorConverter();
65
66 private List<ExecutionModulesListener> executionModulesListeners = new ArrayList<ExecutionModulesListener>();
67
68 private Boolean registerFlowsToJmx = true;
69
70 public synchronized ExecutionModuleDescriptor getExecutionModuleDescriptor(
71 String moduleName, String version) {
72 ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
73 OsgiBundle osgiBundle = null;
74 BasicNameVersion nameVersion = new BasicNameVersion(moduleName, version);
75 bundles: for (Iterator<OsgiBundle> iterator = executionContexts
76 .keySet().iterator(); iterator.hasNext();) {
77 OsgiBundle ob = iterator.next();
78 if (ob.equals(nameVersion)) {
79 osgiBundle = ob;
80 break bundles;
81 }
82 }
83 if (osgiBundle == null)
84 throw new SlcException("No execution module registered for "
85 + nameVersion);
86 md.setName(osgiBundle.getName());
87 md.setVersion(osgiBundle.getVersion());
88 md.setTitle(osgiBundle.getTitle());
89 md.setDescription(osgiBundle.getDescription());
90
91 ExecutionFlowDescriptorConverter executionFlowDescriptorConverter = getExecutionFlowDescriptorConverter(
92 moduleName, version);
93 if (executionFlowDescriptorConverter == null)
94 throw new SlcException("No flow converter found.");
95 executionFlowDescriptorConverter.addFlowsToDescriptor(md,
96 listFlows(moduleName, version));
97 return md;
98 }
99
100 public synchronized List<ExecutionModuleDescriptor> listExecutionModules() {
101 List<ExecutionModuleDescriptor> descriptors = new ArrayList<ExecutionModuleDescriptor>();
102
103 for (Iterator<OsgiBundle> iterator = executionContexts.keySet()
104 .iterator(); iterator.hasNext();) {
105 OsgiBundle osgiBundle = iterator.next();
106 ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
107 setMetadataFromBundle(md,
108 bundlesManager.findRelatedBundle(osgiBundle));
109 descriptors.add(md);
110 }
111 return descriptors;
112 }
113
114 protected synchronized Map<String, ExecutionFlow> listFlows(
115 String moduleName, String moduleVersion) {
116
117 Map<String, ExecutionFlow> flows = new HashMap<String, ExecutionFlow>();
118 OsgiBundle key = new OsgiBundle(moduleName, moduleVersion);
119 if (!executionFlows.containsKey(key))
120 return flows;
121 Set<ExecutionFlow> flowsT = executionFlows.get(key);
122 for (ExecutionFlow flow : flowsT)
123 flows.put(flow.getName(), flow);
124 return flows;
125 }
126
127 protected ExecutionFlow findExecutionFlow(String moduleName,
128 String moduleVersion, String flowName) {
129 String filter = "(&(Bundle-SymbolicName=" + moduleName
130 + ")(org.springframework.osgi.bean.name=" + flowName + "))";
131 return bundlesManager.getSingleServiceStrict(ExecutionFlow.class,
132 filter);
133 }
134
135 protected ExecutionContext findExecutionContext(String moduleName,
136 String moduleVersion) {
137 String filter = "(&(Bundle-SymbolicName=" + moduleName
138 + ")(Bundle-Version=" + moduleVersion + "))";
139 return bundlesManager.getSingleServiceStrict(ExecutionContext.class,
140 filter);
141 }
142
143 protected ExecutionFlowDescriptorConverter findExecutionFlowDescriptorConverter(
144 String moduleName, String moduleVersion) {
145
146 String filter = "(&(Bundle-SymbolicName=" + moduleName
147 + ")(Bundle-Version=" + moduleVersion + "))";
148 return bundlesManager.getSingleService(
149 ExecutionFlowDescriptorConverter.class, filter);
150 }
151
152 /**
153 * Builds a minimal realized flow, based on the provided information
154 * (typically from the command line).
155 *
156 * @param module
157 * a bundle id, or a pattern contained in a bundle symbolic name
158 * @param module
159 * the execution flow name
160 * @return a minimal realized flow, to be used in an execution
161 */
162 public RealizedFlow findRealizedFlow(String module, String executionName) {
163 // First check whether we have a bundleId
164 Long bundleId = null;
165 try {
166 bundleId = Long.parseLong(module);
167 } catch (NumberFormatException e) {
168 // silent
169 }
170
171 // Look for bundle names containing pattern
172 OsgiBundle bundle = null;
173 if (bundleId != null) {
174 bundle = bundlesManager.getBundle(bundleId);
175 } else {
176 bundle = bundlesManager.findFromPattern(module);
177 }
178
179 if (bundle != null) {
180 RealizedFlow launch = new RealizedFlow();
181 launch.setModuleName(bundle.getName());
182 launch.setModuleVersion(bundle.getVersion());
183 ExecutionFlowDescriptor descriptor = new ExecutionFlowDescriptor();
184 descriptor.setName(executionName);
185 launch.setFlowDescriptor(descriptor);
186 return launch;
187 } else {
188 log.warn("Could not find any execution module matching these requirements.");
189 return null;
190 }
191 }
192
193 public void upgrade(NameVersion nameVersion) {
194 OsgiBundle osgiBundle = new OsgiBundle(nameVersion);
195 bundlesManager.upgradeSynchronous(osgiBundle);
196 }
197
198 protected synchronized ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter(
199 String moduleName, String moduleVersion) {
200 OsgiBundle osgiBundle = new OsgiBundle(moduleName, moduleVersion);
201 return getExecutionFlowDescriptorConverter(osgiBundle);
202 }
203
204 protected synchronized ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter(
205 OsgiBundle osgiBundle) {
206 if (executionFlowDescriptorConverters.containsKey(osgiBundle))
207 return executionFlowDescriptorConverters.get(osgiBundle);
208 else
209 return defaultDescriptorConverter;
210 }
211
212 public ModuleDescriptor getModuleDescriptor(String moduleName,
213 String version) {
214 return getExecutionModuleDescriptor(moduleName, version);
215 }
216
217 public List<ModuleDescriptor> listModules() {
218 Bundle[] bundles = bundlesManager.getBundleContext().getBundles();
219 List<ModuleDescriptor> lst = new ArrayList<ModuleDescriptor>();
220 for (Bundle bundle : bundles) {
221 ModuleDescriptor moduleDescriptor = new ModuleDescriptor();
222 setMetadataFromBundle(moduleDescriptor, bundle);
223 lst.add(moduleDescriptor);
224 }
225 return lst;
226 }
227
228 public void start(NameVersion nameVersion) {
229 try {
230 Bundle bundle = bundlesManager.findRelatedBundle(new OsgiBundle(
231 nameVersion));
232 bundlesManager.startSynchronous(bundle);
233 } catch (BundleException e) {
234 throw new SlcException("Cannot start " + nameVersion, e);
235 }
236 }
237
238 public void stop(NameVersion nameVersion) {
239 try {
240 Bundle bundle = bundlesManager.findRelatedBundle(new OsgiBundle(
241 nameVersion));
242 bundlesManager.stopSynchronous(bundle);
243 } catch (BundleException e) {
244 throw new SlcException("Cannot stop " + nameVersion, e);
245 }
246 }
247
248 protected void setMetadataFromBundle(ModuleDescriptor md, Bundle bundle) {
249 Bundle bdl = bundle;
250 if (bdl == null) {
251 if (md.getName() == null || md.getVersion() == null)
252 throw new SlcException("Name and version not available.");
253
254 Bundle[] bundles = bundlesManager.getBundleContext().getBundles();
255 for (Bundle b : bundles) {
256 if (b.getSymbolicName().equals(md.getName())
257 && md.getVersion().equals(
258 getHeaderSafe(b, Constants.BUNDLE_VERSION))) {
259 bdl = b;
260 break;
261 }
262 }
263
264 }
265
266 if (bdl == null)
267 throw new SlcException("Cannot find bundle.");
268
269 md.setName(bdl.getSymbolicName());
270 md.setVersion(getHeaderSafe(bdl, Constants.BUNDLE_VERSION));
271 md.setTitle(getHeaderSafe(bdl, Constants.BUNDLE_NAME));
272 md.setDescription(getHeaderSafe(bdl, Constants.BUNDLE_DESCRIPTION));
273 }
274
275 private String getHeaderSafe(Bundle bundle, Object key) {
276 Object obj = bundle.getHeaders().get(key);
277 if (obj == null)
278 return null;
279 else
280 return obj.toString();
281 }
282
283 /*
284 * REGISTRATION
285 */
286
287 /** Registers an execution context. */
288 public synchronized void register(ExecutionContext executionContext,
289 Map<String, String> properties) {
290 OsgiBundle osgiBundle = asOsgiBundle(properties);
291 Bundle bundle = bundlesManager.findRelatedBundle(osgiBundle);
292 osgiBundle.setTitle(getHeaderSafe(bundle, Constants.BUNDLE_NAME));
293 osgiBundle.setDescription(getHeaderSafe(bundle,
294 Constants.BUNDLE_DESCRIPTION));
295 executionContexts.put(osgiBundle, executionContext);
296 if (log.isTraceEnabled())
297 log.trace("Registered execution context from " + osgiBundle);
298 // Notify
299 for (ExecutionModulesListener listener : executionModulesListeners)
300 listener.executionModuleAdded(osgiBundle.getModuleDescriptor());
301 }
302
303 /** Unregisters an execution context. */
304 public synchronized void unregister(ExecutionContext executionContext,
305 Map<String, String> properties) {
306 OsgiBundle osgiBundle = asOsgiBundle(properties);
307 if (executionContexts.containsKey(osgiBundle)) {
308 executionContexts.remove(osgiBundle);
309 if (log.isTraceEnabled())
310 log.trace("Removed execution context from " + osgiBundle);
311 // Notify
312 for (ExecutionModulesListener listener : executionModulesListeners)
313 listener.executionModuleRemoved(osgiBundle
314 .getModuleDescriptor());
315 }
316 }
317
318 /** Registers an execution flow. */
319 public synchronized void register(ExecutionFlow executionFlow,
320 Map<String, String> properties) {
321 OsgiBundle osgiBundle = asOsgiBundle(properties);
322 if (!executionFlows.containsKey(osgiBundle)) {
323 executionFlows.put(osgiBundle, new HashSet<ExecutionFlow>());
324 }
325 executionFlows.get(osgiBundle).add(executionFlow);
326 if (log.isTraceEnabled())
327 log.trace("Registered " + executionFlow + " from " + osgiBundle);
328
329 // notifications
330 if (registerFlowsToJmx)
331 registerMBean(osgiBundle, executionFlow);
332 ExecutionFlowDescriptorConverter efdc = getExecutionFlowDescriptorConverter(osgiBundle);
333 for (ExecutionModulesListener listener : executionModulesListeners)
334 listener.executionFlowAdded(osgiBundle.getModuleDescriptor(),
335 efdc.getExecutionFlowDescriptor(executionFlow));
336 }
337
338 /** Unregisters an execution flow. */
339 public synchronized void unregister(ExecutionFlow executionFlow,
340 Map<String, String> properties) {
341 OsgiBundle osgiBundle = asOsgiBundle(properties);
342 if (executionFlows.containsKey(osgiBundle)) {
343 Set<ExecutionFlow> flows = executionFlows.get(osgiBundle);
344 flows.remove(executionFlow);
345 if (log.isTraceEnabled())
346 log.trace("Removed " + executionFlow + " from " + osgiBundle);
347 if (flows.size() == 0) {
348 executionFlows.remove(osgiBundle);
349 if (log.isTraceEnabled())
350 log.trace("Removed flows set from " + osgiBundle);
351 }
352
353 // notifications
354 if (registerFlowsToJmx)
355 unregisterMBean(osgiBundle, executionFlow);
356 ExecutionFlowDescriptorConverter efdc = getExecutionFlowDescriptorConverter(osgiBundle);
357 for (ExecutionModulesListener listener : executionModulesListeners)
358 listener.executionFlowRemoved(osgiBundle.getModuleDescriptor(),
359 efdc.getExecutionFlowDescriptor(executionFlow));
360 }
361 }
362
363 /** Registers an execution module listener. */
364 public synchronized void register(
365 ExecutionModulesListener executionModulesListener,
366 Map<String, String> properties) {
367 // sync with current state
368 for (OsgiBundle osgiBundle : executionContexts.keySet()) {
369 executionModulesListener.executionModuleAdded(osgiBundle
370 .getModuleDescriptor());
371 }
372 for (OsgiBundle osgiBundle : executionFlows.keySet()) {
373 ExecutionFlowDescriptorConverter efdc = getExecutionFlowDescriptorConverter(osgiBundle);
374 for (ExecutionFlow executionFlow : executionFlows.get(osgiBundle))
375 executionModulesListener.executionFlowAdded(
376 osgiBundle.getModuleDescriptor(),
377 efdc.getExecutionFlowDescriptor(executionFlow));
378 }
379 executionModulesListeners.add(executionModulesListener);
380 }
381
382 /** Unregisters an execution module listener. */
383 public synchronized void unregister(
384 ExecutionModulesListener executionModulesListener,
385 Map<String, String> properties) {
386 executionModulesListeners.remove(executionModulesListener);
387 }
388
389 @SuppressWarnings({ "rawtypes" })
390 public synchronized void bind(Object service, Map properties)
391 throws Exception {
392 if (service instanceof ExecutionFlowDescriptorConverter) {
393 ExecutionFlowDescriptorConverter executionFlowDescriptorConverter = (ExecutionFlowDescriptorConverter) service;
394 OsgiBundle osgiBundle = asOsgiBundle(properties);
395 executionFlowDescriptorConverters.put(osgiBundle,
396 executionFlowDescriptorConverter);
397 if (log.isTraceEnabled())
398 log.debug("Registered execution flow descriptor converter from "
399 + osgiBundle);
400 } else {
401 // ignore
402 }
403 }
404
405 @SuppressWarnings("rawtypes")
406 public synchronized void unbind(Object service, Map properties)
407 throws Exception {
408 if (service instanceof ExecutionFlowDescriptorConverter) {
409 OsgiBundle osgiBundle = asOsgiBundle(properties);
410 if (executionFlowDescriptorConverters.containsKey(osgiBundle)) {
411 executionFlowDescriptorConverters.remove(osgiBundle);
412 if (log.isTraceEnabled())
413 log.debug("Removed execution flow descriptor converter from "
414 + osgiBundle);
415 }
416 } else {
417 // ignore
418 }
419 }
420
421 /*
422 * JMX
423 */
424 protected MBeanServer getMBeanServer() {
425 return ManagementFactory.getPlatformMBeanServer();
426 }
427
428 public void registerMBean(Module module, ExecutionFlow executionFlow) {
429 try {
430 StandardMBean mbean = new StandardMBean(executionFlow,
431 ExecutionFlow.class);
432 getMBeanServer().registerMBean(mbean,
433 flowMBeanName(module, executionFlow));
434 } catch (Exception e) {
435 String msg = "Cannot register execution flow " + executionFlow
436 + " as mbean";
437 throw new SlcException(msg, e);
438 }
439 }
440
441 public void unregisterMBean(Module module, ExecutionFlow executionFlow) {
442 try {
443 getMBeanServer().unregisterMBean(
444 flowMBeanName(module, executionFlow));
445 } catch (Exception e) {
446 String msg = "Cannot unregister execution flow " + executionFlow
447 + " as mbean";
448 throw new SlcException(msg, e);
449 }
450 }
451
452 @SuppressWarnings("deprecation")
453 protected ObjectName flowMBeanName(Module module,
454 ExecutionFlow executionFlow) {
455 String executionModulesPrefix = "SLCExecutionModules";
456 String path = executionFlow.getPath();
457 String name = executionFlow.getName();
458 if (path == null && name.indexOf('/') >= 0) {
459 path = name.substring(0, name.lastIndexOf('/') - 1);
460 name = name.substring(name.lastIndexOf('/'));
461 }
462
463 StringBuffer buf = new StringBuffer(executionModulesPrefix + ":"
464 + "module=" + module.getName() + " [" + module.getVersion()
465 + "],");
466
467 if (path != null && !path.equals("")) {
468 int depth = 0;
469 for (String token : path.split("/")) {
470 if (!token.equals("")) {
471 buf.append("path").append(depth).append('=');
472 // in order to have directories first
473 buf.append('/');
474 buf.append(token).append(',');
475 depth++;
476 }
477 }
478 }
479 buf.append("name=").append(name);
480 try {
481 return new ObjectName(buf.toString());
482 } catch (Exception e) {
483 throw new SlcException("Cannot generate object name based on "
484 + buf, e);
485 }
486 }
487
488 /*
489 * UTILITIES
490 */
491 @SuppressWarnings("rawtypes")
492 private OsgiBundle asOsgiBundle(Map properties) {
493 String bundleSymbolicName = checkAndGet(Constants.BUNDLE_SYMBOLICNAME,
494 properties);
495 String bundleVersion = checkAndGet(Constants.BUNDLE_VERSION, properties);
496 return new OsgiBundle(bundleSymbolicName, bundleVersion);
497 }
498
499 @SuppressWarnings("rawtypes")
500 private String checkAndGet(Object key, Map properties) {
501 if (!properties.containsKey(key) || properties.get(key) == null)
502 throw new SlcException(key + " not set in " + properties);
503 else
504 return properties.get(key).toString();
505 }
506
507 public void setBundlesManager(BundlesManager bundlesManager) {
508 this.bundlesManager = bundlesManager;
509 }
510
511 public void setDefaultDescriptorConverter(
512 ExecutionFlowDescriptorConverter defaultDescriptorConverter) {
513 this.defaultDescriptorConverter = defaultDescriptorConverter;
514 }
515
516 public void setRegisterFlowsToJmx(Boolean registerFlowsToJmx) {
517 this.registerFlowsToJmx = registerFlowsToJmx;
518 }
519
520 }