]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.support.osgi/src/main/java/org/argeo/slc/osgi/OsgiExecutionModulesManager.java
Use prefixes for system nodes
[gpl/argeo-slc.git] / runtime / org.argeo.slc.support.osgi / src / main / java / org / argeo / slc / osgi / OsgiExecutionModulesManager.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.osgi;
18
19 import java.lang.management.ManagementFactory;
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27
28 import javax.management.MBeanServer;
29 import javax.management.ObjectName;
30 import javax.management.StandardMBean;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.argeo.slc.BasicNameVersion;
35 import org.argeo.slc.NameVersion;
36 import org.argeo.slc.SlcException;
37 import org.argeo.slc.core.execution.AbstractExecutionModulesManager;
38 import org.argeo.slc.core.execution.DefaultExecutionFlowDescriptorConverter;
39 import org.argeo.slc.deploy.Module;
40 import org.argeo.slc.deploy.ModuleDescriptor;
41 import org.argeo.slc.execution.ExecutionContext;
42 import org.argeo.slc.execution.ExecutionFlow;
43 import org.argeo.slc.execution.ExecutionFlowDescriptor;
44 import org.argeo.slc.execution.ExecutionFlowDescriptorConverter;
45 import org.argeo.slc.execution.ExecutionModuleDescriptor;
46 import org.argeo.slc.execution.ExecutionModulesListener;
47 import org.argeo.slc.process.RealizedFlow;
48 import org.osgi.framework.Bundle;
49 import org.osgi.framework.BundleException;
50 import org.osgi.framework.Constants;
51 import org.osgi.framework.launch.Framework;
52 import org.springframework.osgi.service.importer.OsgiServiceLifecycleListener;
53
54 /** Execution modules manager implementation based on an OSGi runtime. */
55 public class OsgiExecutionModulesManager extends
56 AbstractExecutionModulesManager implements OsgiServiceLifecycleListener {
57
58 private final static Log log = LogFactory
59 .getLog(OsgiExecutionModulesManager.class);
60
61 private BundlesManager bundlesManager;
62 private Map<OsgiBundle, ExecutionContext> executionContexts = new HashMap<OsgiBundle, ExecutionContext>();
63 private Map<OsgiBundle, ExecutionFlowDescriptorConverter> executionFlowDescriptorConverters = new HashMap<OsgiBundle, ExecutionFlowDescriptorConverter>();
64 private Map<OsgiBundle, Set<ExecutionFlow>> executionFlows = new HashMap<OsgiBundle, Set<ExecutionFlow>>();
65 private ExecutionFlowDescriptorConverter defaultDescriptorConverter = new DefaultExecutionFlowDescriptorConverter();
66
67 private List<ExecutionModulesListener> executionModulesListeners = new ArrayList<ExecutionModulesListener>();
68
69 private Boolean registerFlowsToJmx = true;
70
71 public void init() throws Exception {
72 final String module = System.getProperty(UNIQUE_LAUNCH_MODULE_PROPERTY);
73 final String flow = System.getProperty(UNIQUE_LAUNCH_FLOW_PROPERTY);
74 if (module != null) {
75 // launch a flow and stops
76 new Thread("Unique Flow") {
77 @Override
78 public void run() {
79 if (log.isDebugEnabled())
80 log.debug("Launch unique flow " + flow
81 + " from module " + module);
82 try {
83 OsgiBundle osgiBundle = bundlesManager
84 .findFromPattern(module);
85 Bundle moduleBundle = bundlesManager
86 .findRelatedBundle(osgiBundle);
87 bundlesManager.startSynchronous(moduleBundle);
88 RealizedFlow lastLaunch = findRealizedFlow(module, flow);
89 if (lastLaunch == null)
90 throw new SlcException("Cannot find launch for "
91 + module + " " + flow);
92 execute(lastLaunch);
93 } catch (Exception e) {
94 log.error("Error in unique flow " + flow
95 + " from module " + module, e);
96 } finally {
97 if (log.isDebugEnabled())
98 log.debug("Shutdown OSGi runtime...");
99 Framework framework = (Framework) bundlesManager
100 .getBundleContext().getBundle(0);
101 try {
102 // shutdown framework
103 framework.stop();
104 // wait 1 min for shutdown
105 framework.waitForStop(60 * 1000);
106 // close VM
107 System.exit(0);
108 } catch (Exception e) {
109 e.printStackTrace();
110 System.exit(1);
111 }
112 }
113 }
114 }.start();
115 }
116
117 }
118
119 public void destroy() {
120
121 }
122
123 public synchronized ExecutionModuleDescriptor getExecutionModuleDescriptor(
124 String moduleName, String version) {
125 ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
126 OsgiBundle osgiBundle = null;
127 BasicNameVersion nameVersion = new BasicNameVersion(moduleName, version);
128 bundles: for (Iterator<OsgiBundle> iterator = executionContexts
129 .keySet().iterator(); iterator.hasNext();) {
130 OsgiBundle ob = iterator.next();
131 if (ob.equals(nameVersion)) {
132 osgiBundle = ob;
133 break bundles;
134 }
135 }
136 if (osgiBundle == null)
137 throw new SlcException("No execution module registered for "
138 + nameVersion);
139 md.setName(osgiBundle.getName());
140 md.setVersion(osgiBundle.getVersion());
141 md.setTitle(osgiBundle.getTitle());
142 md.setDescription(osgiBundle.getDescription());
143
144 ExecutionFlowDescriptorConverter executionFlowDescriptorConverter = getExecutionFlowDescriptorConverter(
145 moduleName, version);
146 if (executionFlowDescriptorConverter == null)
147 throw new SlcException("No flow converter found.");
148 executionFlowDescriptorConverter.addFlowsToDescriptor(md,
149 listFlows(moduleName, version));
150 return md;
151 }
152
153 public synchronized List<ExecutionModuleDescriptor> listExecutionModules() {
154 List<ExecutionModuleDescriptor> descriptors = new ArrayList<ExecutionModuleDescriptor>();
155
156 for (Iterator<OsgiBundle> iterator = executionContexts.keySet()
157 .iterator(); iterator.hasNext();) {
158 OsgiBundle osgiBundle = iterator.next();
159 ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
160 setMetadataFromBundle(md,
161 bundlesManager.findRelatedBundle(osgiBundle));
162 descriptors.add(md);
163 }
164 return descriptors;
165 }
166
167 protected synchronized Map<String, ExecutionFlow> listFlows(
168 String moduleName, String moduleVersion) {
169
170 Map<String, ExecutionFlow> flows = new HashMap<String, ExecutionFlow>();
171 OsgiBundle key = new OsgiBundle(moduleName, moduleVersion);
172 if (!executionFlows.containsKey(key))
173 return flows;
174 Set<ExecutionFlow> flowsT = executionFlows.get(key);
175 for (ExecutionFlow flow : flowsT)
176 flows.put(flow.getName(), flow);
177 return flows;
178 }
179
180 protected ExecutionFlow findExecutionFlow(String moduleName,
181 String moduleVersion, String flowName) {
182 String filter = "(&(Bundle-SymbolicName=" + moduleName
183 + ")(org.springframework.osgi.bean.name=" + flowName + "))";
184 return bundlesManager.getSingleServiceStrict(ExecutionFlow.class,
185 filter, true);
186 }
187
188 protected ExecutionContext findExecutionContext(String moduleName,
189 String moduleVersion) {
190 String filter = "(&(Bundle-SymbolicName=" + moduleName
191 + ")(Bundle-Version=" + moduleVersion + "))";
192 return bundlesManager.getSingleServiceStrict(ExecutionContext.class,
193 filter, true);
194 }
195
196 protected ExecutionFlowDescriptorConverter findExecutionFlowDescriptorConverter(
197 String moduleName, String moduleVersion) {
198 String filter = "(&(Bundle-SymbolicName=" + moduleName
199 + ")(Bundle-Version=" + moduleVersion + "))";
200 return bundlesManager.getSingleService(
201 ExecutionFlowDescriptorConverter.class, filter, false);
202 }
203
204 /**
205 * Builds a minimal realized flow, based on the provided information
206 * (typically from the command line).
207 *
208 * @param module
209 * a bundle id, or a pattern contained in a bundle symbolic name
210 * @param module
211 * the execution flow name
212 * @return a minimal realized flow, to be used in an execution
213 */
214 public RealizedFlow findRealizedFlow(String module, String executionName) {
215 // First check whether we have a bundleId
216 Long bundleId = null;
217 try {
218 bundleId = Long.parseLong(module);
219 } catch (NumberFormatException e) {
220 // silent
221 }
222
223 // Look for bundle names containing pattern
224 OsgiBundle bundle = null;
225 if (bundleId != null) {
226 bundle = bundlesManager.getBundle(bundleId);
227 } else {
228 bundle = bundlesManager.findFromPattern(module);
229 }
230
231 if (bundle != null) {
232 RealizedFlow launch = new RealizedFlow();
233 launch.setModuleName(bundle.getName());
234 launch.setModuleVersion(bundle.getVersion());
235 ExecutionFlowDescriptor descriptor = new ExecutionFlowDescriptor();
236 descriptor.setName(executionName);
237 launch.setFlowDescriptor(descriptor);
238 return launch;
239 } else {
240 log.warn("Could not find any execution module matching these requirements.");
241 return null;
242 }
243 }
244
245 public void upgrade(NameVersion nameVersion) {
246 OsgiBundle osgiBundle = new OsgiBundle(nameVersion);
247 bundlesManager.upgradeSynchronous(osgiBundle);
248 }
249
250 protected synchronized ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter(
251 String moduleName, String moduleVersion) {
252 OsgiBundle osgiBundle = new OsgiBundle(moduleName, moduleVersion);
253 return getExecutionFlowDescriptorConverter(osgiBundle);
254 }
255
256 protected synchronized ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter(
257 OsgiBundle osgiBundle) {
258 if (executionFlowDescriptorConverters.containsKey(osgiBundle))
259 return executionFlowDescriptorConverters.get(osgiBundle);
260 else
261 return defaultDescriptorConverter;
262 }
263
264 public ModuleDescriptor getModuleDescriptor(String moduleName,
265 String version) {
266 return getExecutionModuleDescriptor(moduleName, version);
267 }
268
269 public List<ModuleDescriptor> listModules() {
270 Bundle[] bundles = bundlesManager.getBundleContext().getBundles();
271 List<ModuleDescriptor> lst = new ArrayList<ModuleDescriptor>();
272 for (Bundle bundle : bundles) {
273 ModuleDescriptor moduleDescriptor = new ModuleDescriptor();
274 setMetadataFromBundle(moduleDescriptor, bundle);
275 lst.add(moduleDescriptor);
276 }
277 return lst;
278 }
279
280 public void start(NameVersion nameVersion) {
281 try {
282 Bundle bundle = bundlesManager.findRelatedBundle(new OsgiBundle(
283 nameVersion));
284 bundlesManager.startSynchronous(bundle);
285 } catch (BundleException e) {
286 throw new SlcException("Cannot start " + nameVersion, e);
287 }
288 }
289
290 public void stop(NameVersion nameVersion) {
291 try {
292 Bundle bundle = bundlesManager.findRelatedBundle(new OsgiBundle(
293 nameVersion));
294 bundlesManager.stopSynchronous(bundle);
295 } catch (BundleException e) {
296 throw new SlcException("Cannot stop " + nameVersion, e);
297 }
298 }
299
300 protected void setMetadataFromBundle(ModuleDescriptor md, Bundle bundle) {
301 Bundle bdl = bundle;
302 if (bdl == null) {
303 if (md.getName() == null || md.getVersion() == null)
304 throw new SlcException("Name and version not available.");
305
306 Bundle[] bundles = bundlesManager.getBundleContext().getBundles();
307 for (Bundle b : bundles) {
308 if (b.getSymbolicName().equals(md.getName())
309 && md.getVersion().equals(
310 getHeaderSafe(b, Constants.BUNDLE_VERSION))) {
311 bdl = b;
312 break;
313 }
314 }
315
316 }
317
318 if (bdl == null)
319 throw new SlcException("Cannot find bundle.");
320
321 md.setName(bdl.getSymbolicName());
322 md.setVersion(getHeaderSafe(bdl, Constants.BUNDLE_VERSION));
323 md.setTitle(getHeaderSafe(bdl, Constants.BUNDLE_NAME));
324 md.setDescription(getHeaderSafe(bdl, Constants.BUNDLE_DESCRIPTION));
325 }
326
327 private String getHeaderSafe(Bundle bundle, Object key) {
328 Object obj = bundle.getHeaders().get(key);
329 if (obj == null)
330 return null;
331 else
332 return obj.toString();
333 }
334
335 /*
336 * REGISTRATION
337 */
338
339 /** Registers an execution context. */
340 public synchronized void register(ExecutionContext executionContext,
341 Map<String, String> properties) {
342 OsgiBundle osgiBundle = asOsgiBundle(properties);
343 Bundle bundle = bundlesManager.findRelatedBundle(osgiBundle);
344 osgiBundle.setTitle(getHeaderSafe(bundle, Constants.BUNDLE_NAME));
345 osgiBundle.setDescription(getHeaderSafe(bundle,
346 Constants.BUNDLE_DESCRIPTION));
347 executionContexts.put(osgiBundle, executionContext);
348 if (log.isTraceEnabled())
349 log.trace("Registered execution context from " + osgiBundle);
350 // Notify
351 for (ExecutionModulesListener listener : executionModulesListeners)
352 listener.executionModuleAdded(osgiBundle.getModuleDescriptor());
353 }
354
355 /** Unregisters an execution context. */
356 public synchronized void unregister(ExecutionContext executionContext,
357 Map<String, String> properties) {
358 OsgiBundle osgiBundle = asOsgiBundle(properties);
359 if (executionContexts.containsKey(osgiBundle)) {
360 executionContexts.remove(osgiBundle);
361 if (log.isTraceEnabled())
362 log.trace("Removed execution context from " + osgiBundle);
363 // Notify
364 for (ExecutionModulesListener listener : executionModulesListeners)
365 listener.executionModuleRemoved(osgiBundle
366 .getModuleDescriptor());
367 }
368 }
369
370 /** Registers an execution flow. */
371 public synchronized void register(ExecutionFlow executionFlow,
372 Map<String, String> properties) {
373 OsgiBundle osgiBundle = asOsgiBundle(properties);
374 if (!executionFlows.containsKey(osgiBundle)) {
375 executionFlows.put(osgiBundle, new HashSet<ExecutionFlow>());
376 }
377 executionFlows.get(osgiBundle).add(executionFlow);
378 if (log.isTraceEnabled())
379 log.trace("Registered " + executionFlow + " from " + osgiBundle);
380
381 // notifications
382 if (registerFlowsToJmx)
383 registerMBean(osgiBundle, executionFlow);
384 ExecutionFlowDescriptorConverter efdc = getExecutionFlowDescriptorConverter(osgiBundle);
385 for (ExecutionModulesListener listener : executionModulesListeners)
386 listener.executionFlowAdded(osgiBundle.getModuleDescriptor(),
387 efdc.getExecutionFlowDescriptor(executionFlow));
388 }
389
390 /** Unregisters an execution flow. */
391 public synchronized void unregister(ExecutionFlow executionFlow,
392 Map<String, String> properties) {
393 OsgiBundle osgiBundle = asOsgiBundle(properties);
394 if (executionFlows.containsKey(osgiBundle)) {
395 Set<ExecutionFlow> flows = executionFlows.get(osgiBundle);
396 flows.remove(executionFlow);
397 if (log.isTraceEnabled())
398 log.trace("Removed " + executionFlow + " from " + osgiBundle);
399 if (flows.size() == 0) {
400 executionFlows.remove(osgiBundle);
401 if (log.isTraceEnabled())
402 log.trace("Removed flows set from " + osgiBundle);
403 }
404
405 // notifications
406 if (registerFlowsToJmx)
407 unregisterMBean(osgiBundle, executionFlow);
408 ExecutionFlowDescriptorConverter efdc = getExecutionFlowDescriptorConverter(osgiBundle);
409 for (ExecutionModulesListener listener : executionModulesListeners)
410 listener.executionFlowRemoved(osgiBundle.getModuleDescriptor(),
411 efdc.getExecutionFlowDescriptor(executionFlow));
412 }
413 }
414
415 /** Registers an execution module listener. */
416 public synchronized void register(
417 ExecutionModulesListener executionModulesListener,
418 Map<String, String> properties) {
419 // sync with current state
420 for (OsgiBundle osgiBundle : executionContexts.keySet()) {
421 executionModulesListener.executionModuleAdded(osgiBundle
422 .getModuleDescriptor());
423 }
424 for (OsgiBundle osgiBundle : executionFlows.keySet()) {
425 ExecutionFlowDescriptorConverter efdc = getExecutionFlowDescriptorConverter(osgiBundle);
426 for (ExecutionFlow executionFlow : executionFlows.get(osgiBundle))
427 executionModulesListener.executionFlowAdded(
428 osgiBundle.getModuleDescriptor(),
429 efdc.getExecutionFlowDescriptor(executionFlow));
430 }
431 executionModulesListeners.add(executionModulesListener);
432 }
433
434 /** Unregisters an execution module listener. */
435 public synchronized void unregister(
436 ExecutionModulesListener executionModulesListener,
437 Map<String, String> properties) {
438 executionModulesListeners.remove(executionModulesListener);
439 }
440
441 @SuppressWarnings({ "rawtypes" })
442 public synchronized void bind(Object service, Map properties)
443 throws Exception {
444 if (service instanceof ExecutionFlowDescriptorConverter) {
445 ExecutionFlowDescriptorConverter executionFlowDescriptorConverter = (ExecutionFlowDescriptorConverter) service;
446 OsgiBundle osgiBundle = asOsgiBundle(properties);
447 executionFlowDescriptorConverters.put(osgiBundle,
448 executionFlowDescriptorConverter);
449 if (log.isTraceEnabled())
450 log.debug("Registered execution flow descriptor converter from "
451 + osgiBundle);
452 } else {
453 // ignore
454 }
455 }
456
457 @SuppressWarnings("rawtypes")
458 public synchronized void unbind(Object service, Map properties)
459 throws Exception {
460 if (service instanceof ExecutionFlowDescriptorConverter) {
461 OsgiBundle osgiBundle = asOsgiBundle(properties);
462 if (executionFlowDescriptorConverters.containsKey(osgiBundle)) {
463 executionFlowDescriptorConverters.remove(osgiBundle);
464 if (log.isTraceEnabled())
465 log.debug("Removed execution flow descriptor converter from "
466 + osgiBundle);
467 }
468 } else {
469 // ignore
470 }
471 }
472
473 /*
474 * JMX
475 */
476 protected MBeanServer getMBeanServer() {
477 return ManagementFactory.getPlatformMBeanServer();
478 }
479
480 public void registerMBean(Module module, ExecutionFlow executionFlow) {
481 try {
482 StandardMBean mbean = new StandardMBean(executionFlow,
483 ExecutionFlow.class);
484 getMBeanServer().registerMBean(mbean,
485 flowMBeanName(module, executionFlow));
486 } catch (Exception e) {
487 String msg = "Cannot register execution flow " + executionFlow
488 + " as mbean";
489 throw new SlcException(msg, e);
490 }
491 }
492
493 public void unregisterMBean(Module module, ExecutionFlow executionFlow) {
494 try {
495 getMBeanServer().unregisterMBean(
496 flowMBeanName(module, executionFlow));
497 } catch (Exception e) {
498 String msg = "Cannot unregister execution flow " + executionFlow
499 + " as mbean";
500 throw new SlcException(msg, e);
501 }
502 }
503
504 @SuppressWarnings("deprecation")
505 protected ObjectName flowMBeanName(Module module,
506 ExecutionFlow executionFlow) {
507 String executionModulesPrefix = "SLCExecutionModules";
508 String path = executionFlow.getPath();
509 String name = executionFlow.getName();
510 if (path == null && name.indexOf('/') >= 0) {
511 path = name.substring(0, name.lastIndexOf('/'));
512 name = name.substring(name.lastIndexOf('/'));
513 }
514
515 StringBuffer buf = new StringBuffer(executionModulesPrefix + ":"
516 + "module=" + module.getName() + " [" + module.getVersion()
517 + "],");
518
519 if (path != null && !path.equals("")) {
520 int depth = 0;
521 for (String token : path.split("/")) {
522 if (!token.equals("")) {
523 buf.append("path").append(depth).append('=');
524 // in order to have directories first
525 buf.append('/');
526 buf.append(token).append(',');
527 depth++;
528 }
529 }
530 }
531 buf.append("name=").append(name);
532 try {
533 return new ObjectName(buf.toString());
534 } catch (Exception e) {
535 throw new SlcException("Cannot generate object name based on "
536 + buf, e);
537 }
538 }
539
540 /*
541 * UTILITIES
542 */
543 @SuppressWarnings("rawtypes")
544 private OsgiBundle asOsgiBundle(Map properties) {
545 String bundleSymbolicName = checkAndGet(Constants.BUNDLE_SYMBOLICNAME,
546 properties);
547 String bundleVersion = checkAndGet(Constants.BUNDLE_VERSION, properties);
548 return new OsgiBundle(bundleSymbolicName, bundleVersion);
549 }
550
551 @SuppressWarnings("rawtypes")
552 private String checkAndGet(Object key, Map properties) {
553 if (!properties.containsKey(key) || properties.get(key) == null)
554 throw new SlcException(key + " not set in " + properties);
555 else
556 return properties.get(key).toString();
557 }
558
559 public void setBundlesManager(BundlesManager bundlesManager) {
560 this.bundlesManager = bundlesManager;
561 }
562
563 public void setDefaultDescriptorConverter(
564 ExecutionFlowDescriptorConverter defaultDescriptorConverter) {
565 this.defaultDescriptorConverter = defaultDescriptorConverter;
566 }
567
568 public void setRegisterFlowsToJmx(Boolean registerFlowsToJmx) {
569 this.registerFlowsToJmx = registerFlowsToJmx;
570 }
571
572 }