package org.argeo.slc.osgi;
+import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.argeo.slc.BasicNameVersion;
+import org.argeo.slc.NameVersion;
import org.argeo.slc.SlcException;
-import org.argeo.slc.build.BasicNameVersion;
-import org.argeo.slc.build.NameVersion;
import org.argeo.slc.core.execution.AbstractExecutionModulesManager;
import org.argeo.slc.core.execution.DefaultExecutionFlowDescriptorConverter;
+import org.argeo.slc.deploy.Module;
import org.argeo.slc.deploy.ModuleDescriptor;
import org.argeo.slc.execution.ExecutionContext;
import org.argeo.slc.execution.ExecutionFlow;
import org.argeo.slc.process.RealizedFlow;
import org.osgi.framework.Bundle;
import org.osgi.framework.Constants;
-import org.osgi.util.tracker.ServiceTracker;
-import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.InitializingBean;
import org.springframework.osgi.service.importer.OsgiServiceLifecycleListener;
+/** Execution modules manager implementation based on an OSGi runtime. */
public class OsgiExecutionModulesManager extends
- AbstractExecutionModulesManager implements InitializingBean,
- DisposableBean, OsgiServiceLifecycleListener {
-
-// static {
-// // Force usage of vanilla Xalan when in OSGi
-// // We would like to do it in a cleaner way
-// // but the integration of Xalan and Xerces in the JRE
-// // makes it very difficult
-// // Suggestions welcome!
-// Properties systemProperties = System.getProperties();
-// // if (!systemProperties
-// // .containsKey("javax.xml.parsers.DocumentBuilderFactory"))
-// // System.setProperty("javax.xml.parsers.DocumentBuilderFactory",
-// // "org.apache.xerces.jaxp.DocumentBuilderFactoryImpl");
-// // if
-// // (!systemProperties.containsKey("javax.xml.parsers.SAXParserFactory"))
-// // System.setProperty("javax.xml.parsers.SAXParserFactory",
-// // "org.apache.xerces.jaxp.SAXParserFactoryImpl");
-// if (!systemProperties
-// .containsKey("javax.xml.transform.TransformerFactory"))
-// System.setProperty("javax.xml.transform.TransformerFactory",
-// "org.apache.xalan.processor.TransformerFactoryImpl");
-// }
-
-// private final static String PROPERTY_CACHE_SERVICES = "slc.osgi.execution.cacheServices";
+ AbstractExecutionModulesManager implements OsgiServiceLifecycleListener {
private final static Log log = LogFactory
.getLog(OsgiExecutionModulesManager.class);
private BundlesManager bundlesManager;
- private ServiceTracker executionContextsTracker;
private Map<OsgiBundle, ExecutionContext> executionContexts = new HashMap<OsgiBundle, ExecutionContext>();
private Map<OsgiBundle, ExecutionFlowDescriptorConverter> executionFlowDescriptorConverters = new HashMap<OsgiBundle, ExecutionFlowDescriptorConverter>();
private Map<OsgiBundle, Set<ExecutionFlow>> executionFlows = new HashMap<OsgiBundle, Set<ExecutionFlow>>();
private ExecutionFlowDescriptorConverter defaultDescriptorConverter = new DefaultExecutionFlowDescriptorConverter();
-// private Boolean useCachedServices = Boolean.parseBoolean(System
-// .getProperty(PROPERTY_CACHE_SERVICES, "true"));
+ private List<ExecutionModulesListener> executionModulesListeners = new ArrayList<ExecutionModulesListener>();
+
+ private Boolean registerFlowsToJmx = true;
public synchronized ExecutionModuleDescriptor getExecutionModuleDescriptor(
String moduleName, String version) {
ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
-// if (useCachedServices) {
- OsgiBundle osgiBundle = null;
- BasicNameVersion nameVersion = new BasicNameVersion(moduleName,
- version);
- bundles: for (Iterator<OsgiBundle> iterator = executionContexts
- .keySet().iterator(); iterator.hasNext();) {
- OsgiBundle ob = iterator.next();
- if (ob.equals(nameVersion)) {
- osgiBundle = ob;
- break bundles;
- }
+ OsgiBundle osgiBundle = null;
+ BasicNameVersion nameVersion = new BasicNameVersion(moduleName, version);
+ bundles: for (Iterator<OsgiBundle> iterator = executionContexts
+ .keySet().iterator(); iterator.hasNext();) {
+ OsgiBundle ob = iterator.next();
+ if (ob.equals(nameVersion)) {
+ osgiBundle = ob;
+ break bundles;
}
- if (osgiBundle == null)
- throw new SlcException("No execution module registered for "
- + nameVersion);
- md.setName(osgiBundle.getName());
- md.setVersion(osgiBundle.getVersion());
- md.setLabel(osgiBundle.getLabel());
- md.setDescription(osgiBundle.getDescription());
-// } else {
-// md.setName(moduleName);
-// md.setVersion(version);
-// setMetadataFromBundle(md, null);
-// }
+ }
+ if (osgiBundle == null)
+ throw new SlcException("No execution module registered for "
+ + nameVersion);
+ md.setName(osgiBundle.getName());
+ md.setVersion(osgiBundle.getVersion());
+ md.setTitle(osgiBundle.getTitle());
+ md.setDescription(osgiBundle.getDescription());
+
ExecutionFlowDescriptorConverter executionFlowDescriptorConverter = getExecutionFlowDescriptorConverter(
moduleName, version);
if (executionFlowDescriptorConverter == null)
throw new SlcException("No flow converter found.");
- executionFlowDescriptorConverter.addFlowsToDescriptor(md, listFlows(
- moduleName, version));
+ executionFlowDescriptorConverter.addFlowsToDescriptor(md,
+ listFlows(moduleName, version));
return md;
}
public synchronized List<ExecutionModuleDescriptor> listExecutionModules() {
List<ExecutionModuleDescriptor> descriptors = new ArrayList<ExecutionModuleDescriptor>();
-// if (useCachedServices) {
- for (Iterator<OsgiBundle> iterator = executionContexts.keySet()
- .iterator(); iterator.hasNext();) {
- OsgiBundle osgiBundle = iterator.next();
- ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
- setMetadataFromBundle(md, bundlesManager
- .findRelatedBundle(osgiBundle));
- descriptors.add(md);
- }
-// } else {
-// ServiceReference[] arr = executionContextsTracker
-// .getServiceReferences();
-// if (arr == null) {
-// log.error("Tracker returned null.");
-// return descriptors;
-// }
-//
-// List<ServiceReference> srs = Arrays.asList(arr);
-// // ServiceReference[] srs =
-// // executionContexts.getServiceReferences();
-// for (ServiceReference sr : srs) {
-// ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
-// setMetadataFromBundle(md, sr.getBundle());
-// descriptors.add(md);
-// }
-// }
+ for (Iterator<OsgiBundle> iterator = executionContexts.keySet()
+ .iterator(); iterator.hasNext();) {
+ OsgiBundle osgiBundle = iterator.next();
+ ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
+ setMetadataFromBundle(md,
+ bundlesManager.findRelatedBundle(osgiBundle));
+ descriptors.add(md);
+ }
return descriptors;
}
String moduleName, String moduleVersion) {
Map<String, ExecutionFlow> flows = new HashMap<String, ExecutionFlow>();
-// if (useCachedServices) {
- OsgiBundle key = new OsgiBundle(
- moduleName, moduleVersion);
- if(!executionFlows.containsKey(key))
- return flows;
- Set<ExecutionFlow> flowsT = executionFlows.get(key);
- for (ExecutionFlow flow : flowsT)
- flows.put(flow.getName(), flow);
-// } else {
-//
-// // TODO: use service trackers?
-// // String filter = OsgiFilterUtils.unifyFilter(ExecutionFlow.class,
-// // null);
-//
-// String filter = "(Bundle-SymbolicName=" + moduleName + ")";
-// ServiceReference[] sfs;
-// try {
-// sfs = bundlesManager.getBundleContext().getServiceReferences(
-// ExecutionFlow.class.getName(), filter);
-// } catch (InvalidSyntaxException e) {
-// throw new SlcException(
-// "Cannot retrieve service reference for flow " + filter,
-// e);
-// }
-//
-// for (ServiceReference sf : sfs) {
-// ExecutionFlow flow = (ExecutionFlow) bundlesManager
-// .getBundleContext().getService(sf);
-// flows.put(flow.getName(), flow);
-// }
-// }
+ OsgiBundle key = new OsgiBundle(moduleName, moduleVersion);
+ if (!executionFlows.containsKey(key))
+ return flows;
+ Set<ExecutionFlow> flowsT = executionFlows.get(key);
+ for (ExecutionFlow flow : flowsT)
+ flows.put(flow.getName(), flow);
return flows;
}
ExecutionFlowDescriptorConverter.class, filter);
}
- public void setBundlesManager(BundlesManager bundlesManager) {
- this.bundlesManager = bundlesManager;
- }
-
- public void afterPropertiesSet() throws Exception {
-// if (!useCachedServices)
-// executionContextsTracker = bundlesManager
-// .newTracker(ExecutionContext.class);
- }
-
- public void destroy() throws Exception {
- if (executionContextsTracker != null)
- executionContextsTracker.close();
- }
-
/**
* Builds a minimal realized flow, based on the provided information
* (typically from the command line).
launch.setFlowDescriptor(descriptor);
return launch;
} else {
- log
- .warn("Could not find any execution module matching these requirements.");
+ log.warn("Could not find any execution module matching these requirements.");
return null;
}
}
protected synchronized ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter(
String moduleName, String moduleVersion) {
-// if (useCachedServices) {
- OsgiBundle osgiBundle = new OsgiBundle(moduleName, moduleVersion);
- if (executionFlowDescriptorConverters.containsKey(osgiBundle))
- return executionFlowDescriptorConverters.get(osgiBundle);
- else
- return defaultDescriptorConverter;
-// } else {
-// // Check whether a descriptor converter is published by this module
-// ExecutionFlowDescriptorConverter descriptorConverter = findExecutionFlowDescriptorConverter(
-// moduleName, moduleVersion);
-// if (descriptorConverter == null)
-// return defaultDescriptorConverter;
-// else
-// return descriptorConverter;
-// }
+ OsgiBundle osgiBundle = new OsgiBundle(moduleName, moduleVersion);
+ return getExecutionFlowDescriptorConverter(osgiBundle);
+ }
+
+ protected synchronized ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter(
+ OsgiBundle osgiBundle) {
+ if (executionFlowDescriptorConverters.containsKey(osgiBundle))
+ return executionFlowDescriptorConverters.get(osgiBundle);
+ else
+ return defaultDescriptorConverter;
}
public ModuleDescriptor getModuleDescriptor(String moduleName,
md.setName(bdl.getSymbolicName());
md.setVersion(getHeaderSafe(bdl, Constants.BUNDLE_VERSION));
- md.setLabel(getHeaderSafe(bdl, Constants.BUNDLE_NAME));
+ md.setTitle(getHeaderSafe(bdl, Constants.BUNDLE_NAME));
md.setDescription(getHeaderSafe(bdl, Constants.BUNDLE_DESCRIPTION));
}
return obj.toString();
}
- @SuppressWarnings("unchecked")
- public synchronized void bind(Object service, Map properties)
- throws Exception {
- if (service instanceof ExecutionContext) {
- ExecutionContext executionContext = (ExecutionContext) service;
- OsgiBundle osgiBundle = asOsgiBundle(properties);
- Bundle bundle = bundlesManager.findRelatedBundle(osgiBundle);
- osgiBundle.setLabel(getHeaderSafe(bundle, Constants.BUNDLE_NAME));
- osgiBundle.setDescription(getHeaderSafe(bundle,
- Constants.BUNDLE_DESCRIPTION));
- executionContexts.put(osgiBundle, executionContext);
+ /*
+ * REGISTRATION
+ */
+
+ /** Registers an execution context. */
+ public synchronized void register(ExecutionContext executionContext,
+ Map<String, String> properties) {
+ OsgiBundle osgiBundle = asOsgiBundle(properties);
+ Bundle bundle = bundlesManager.findRelatedBundle(osgiBundle);
+ osgiBundle.setTitle(getHeaderSafe(bundle, Constants.BUNDLE_NAME));
+ osgiBundle.setDescription(getHeaderSafe(bundle,
+ Constants.BUNDLE_DESCRIPTION));
+ executionContexts.put(osgiBundle, executionContext);
+ if (log.isTraceEnabled())
+ log.trace("Registered execution context from " + osgiBundle);
+ // Notify
+ for (ExecutionModulesListener listener : executionModulesListeners)
+ listener.executionModuleAdded(osgiBundle.getModuleDescriptor());
+ }
+
+ /** Unregisters an execution context. */
+ public synchronized void unregister(ExecutionContext executionContext,
+ Map<String, String> properties) {
+ OsgiBundle osgiBundle = asOsgiBundle(properties);
+ if (executionContexts.containsKey(osgiBundle)) {
+ executionContexts.remove(osgiBundle);
if (log.isTraceEnabled())
- log.debug("Registered execution context from " + osgiBundle);
+ log.trace("Removed execution context from " + osgiBundle);
// Notify
- for (ExecutionModulesListener listener : getExecutionModulesListeners())
- listener.executionModuleAdded(osgiBundle, executionContext);
+ for (ExecutionModulesListener listener : executionModulesListeners)
+ listener.executionModuleRemoved(osgiBundle
+ .getModuleDescriptor());
+ }
+ }
- } else if (service instanceof ExecutionFlow) {
- ExecutionFlow executionFlow = (ExecutionFlow) service;
- OsgiBundle osgiBundle = asOsgiBundle(properties);
- if (!executionFlows.containsKey(osgiBundle)) {
- executionFlows.put(osgiBundle, new HashSet());
- }
- executionFlows.get(osgiBundle).add(executionFlow);
+ /** Registers an execution flow. */
+ public synchronized void register(ExecutionFlow executionFlow,
+ Map<String, String> properties) {
+ OsgiBundle osgiBundle = asOsgiBundle(properties);
+ if (!executionFlows.containsKey(osgiBundle)) {
+ executionFlows.put(osgiBundle, new HashSet<ExecutionFlow>());
+ }
+ executionFlows.get(osgiBundle).add(executionFlow);
+ if (log.isTraceEnabled())
+ log.trace("Registered " + executionFlow + " from " + osgiBundle);
+
+ // notifications
+ if (registerFlowsToJmx)
+ registerMBean(osgiBundle, executionFlow);
+ ExecutionFlowDescriptorConverter efdc = getExecutionFlowDescriptorConverter(osgiBundle);
+ for (ExecutionModulesListener listener : executionModulesListeners)
+ listener.executionFlowAdded(osgiBundle.getModuleDescriptor(),
+ efdc.getExecutionFlowDescriptor(executionFlow));
+ }
+
+ /** Unregisters an execution flow. */
+ public synchronized void unregister(ExecutionFlow executionFlow,
+ Map<String, String> properties) {
+ OsgiBundle osgiBundle = asOsgiBundle(properties);
+ if (executionFlows.containsKey(osgiBundle)) {
+ Set<ExecutionFlow> flows = executionFlows.get(osgiBundle);
+ flows.remove(executionFlow);
if (log.isTraceEnabled())
- log
- .debug("Registered " + executionFlow + " from "
- + osgiBundle);
- for (ExecutionModulesListener listener : getExecutionModulesListeners())
- listener.executionFlowAdded(osgiBundle, executionFlow);
+ log.trace("Removed " + executionFlow + " from " + osgiBundle);
+ if (flows.size() == 0) {
+ executionFlows.remove(osgiBundle);
+ if (log.isTraceEnabled())
+ log.trace("Removed flows set from " + osgiBundle);
+ }
+
+ // notifications
+ if (registerFlowsToJmx)
+ unregisterMBean(osgiBundle, executionFlow);
+ ExecutionFlowDescriptorConverter efdc = getExecutionFlowDescriptorConverter(osgiBundle);
+ for (ExecutionModulesListener listener : executionModulesListeners)
+ listener.executionFlowRemoved(osgiBundle.getModuleDescriptor(),
+ efdc.getExecutionFlowDescriptor(executionFlow));
+ }
+ }
+
+ /** Registers an execution module listener. */
+ public synchronized void register(
+ ExecutionModulesListener executionModulesListener,
+ Map<String, String> properties) {
+ // sync with current state
+ for (OsgiBundle osgiBundle : executionContexts.keySet()) {
+ executionModulesListener.executionModuleAdded(osgiBundle
+ .getModuleDescriptor());
+ }
+ for (OsgiBundle osgiBundle : executionFlows.keySet()) {
+ ExecutionFlowDescriptorConverter efdc = getExecutionFlowDescriptorConverter(osgiBundle);
+ for (ExecutionFlow executionFlow : executionFlows.get(osgiBundle))
+ executionModulesListener.executionFlowAdded(
+ osgiBundle.getModuleDescriptor(),
+ efdc.getExecutionFlowDescriptor(executionFlow));
+ }
+ executionModulesListeners.add(executionModulesListener);
+ }
- } else if (service instanceof ExecutionFlowDescriptorConverter) {
+ /** Unregisters an execution module listener. */
+ public synchronized void unregister(
+ ExecutionModulesListener executionModulesListener,
+ Map<String, String> properties) {
+ executionModulesListeners.remove(executionModulesListener);
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ public synchronized void bind(Object service, Map properties)
+ throws Exception {
+ if (service instanceof ExecutionFlowDescriptorConverter) {
ExecutionFlowDescriptorConverter executionFlowDescriptorConverter = (ExecutionFlowDescriptorConverter) service;
OsgiBundle osgiBundle = asOsgiBundle(properties);
executionFlowDescriptorConverters.put(osgiBundle,
executionFlowDescriptorConverter);
if (log.isTraceEnabled())
- log
- .debug("Registered execution flow descriptor converter from "
- + osgiBundle);
+ log.debug("Registered execution flow descriptor converter from "
+ + osgiBundle);
} else {
// ignore
}
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("rawtypes")
public synchronized void unbind(Object service, Map properties)
throws Exception {
- if (service instanceof ExecutionContext) {
- OsgiBundle osgiBundle = asOsgiBundle(properties);
- if (executionContexts.containsKey(osgiBundle)) {
- ExecutionContext executionContext = executionContexts
- .remove(osgiBundle);
- if (log.isTraceEnabled())
- log.debug("Removed execution context from " + osgiBundle);
- // Notify
- for (ExecutionModulesListener listener : getExecutionModulesListeners())
- listener.executionModuleRemoved(osgiBundle,
- executionContext);
- }
- } else if (service instanceof ExecutionFlow) {
- ExecutionFlow executionFlow = (ExecutionFlow) service;
- OsgiBundle osgiBundle = asOsgiBundle(properties);
- if (executionFlows.containsKey(osgiBundle)) {
- Set flows = executionFlows.get(osgiBundle);
- flows.remove(executionFlow);
- if (log.isTraceEnabled())
- log.debug("Removed " + executionFlow + " from "
- + osgiBundle);
- if (flows.size() == 0) {
- executionFlows.remove(osgiBundle);
- if (log.isTraceEnabled())
- log.debug("Removed flows set from " + osgiBundle);
- }
- for (ExecutionModulesListener listener : getExecutionModulesListeners())
- listener.executionFlowRemoved(osgiBundle, executionFlow);
- }
- } else if (service instanceof ExecutionFlowDescriptorConverter) {
+ if (service instanceof ExecutionFlowDescriptorConverter) {
OsgiBundle osgiBundle = asOsgiBundle(properties);
if (executionFlowDescriptorConverters.containsKey(osgiBundle)) {
executionFlowDescriptorConverters.remove(osgiBundle);
if (log.isTraceEnabled())
- log
- .debug("Removed execution flow descriptor converter from "
- + osgiBundle);
+ log.debug("Removed execution flow descriptor converter from "
+ + osgiBundle);
}
} else {
// ignore
}
}
- @SuppressWarnings("unchecked")
+ /*
+ * JMX
+ */
+ protected MBeanServer getMBeanServer() {
+ return ManagementFactory.getPlatformMBeanServer();
+ }
+
+ public void registerMBean(Module module, ExecutionFlow executionFlow) {
+ try {
+ StandardMBean mbean = new StandardMBean(executionFlow,
+ ExecutionFlow.class);
+ getMBeanServer().registerMBean(mbean,
+ flowMBeanName(module, executionFlow));
+ } catch (Exception e) {
+ String msg = "Cannot register execution flow " + executionFlow
+ + " as mbean";
+ throw new SlcException(msg, e);
+ }
+ }
+
+ public void unregisterMBean(Module module, ExecutionFlow executionFlow) {
+ try {
+ getMBeanServer().unregisterMBean(
+ flowMBeanName(module, executionFlow));
+ } catch (Exception e) {
+ String msg = "Cannot unregister execution flow " + executionFlow
+ + " as mbean";
+ throw new SlcException(msg, e);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ protected ObjectName flowMBeanName(Module module,
+ ExecutionFlow executionFlow) {
+ String executionModulesPrefix = "SLCExecutionModules";
+ String path = executionFlow.getPath();
+ String name = executionFlow.getName();
+ if (path == null && name.indexOf('/') >= 0) {
+ path = name.substring(0, name.lastIndexOf('/') - 1);
+ name = name.substring(name.lastIndexOf('/'));
+ }
+
+ StringBuffer buf = new StringBuffer(executionModulesPrefix + ":"
+ + "module=" + module.getName() + " [" + module.getVersion()
+ + "],");
+
+ if (path != null && !path.equals("")) {
+ int depth = 0;
+ for (String token : path.split("/")) {
+ if (!token.equals("")) {
+ buf.append("path").append(depth).append('=');
+ // in order to have directories first
+ buf.append('/');
+ buf.append(token).append(',');
+ depth++;
+ }
+ }
+ }
+ buf.append("name=").append(name);
+ try {
+ return new ObjectName(buf.toString());
+ } catch (Exception e) {
+ throw new SlcException("Cannot generate object name based on "
+ + buf, e);
+ }
+ }
+
+ /*
+ * UTILITIES
+ */
+ @SuppressWarnings("rawtypes")
private OsgiBundle asOsgiBundle(Map properties) {
String bundleSymbolicName = checkAndGet(Constants.BUNDLE_SYMBOLICNAME,
properties);
return new OsgiBundle(bundleSymbolicName, bundleVersion);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("rawtypes")
private String checkAndGet(Object key, Map properties) {
if (!properties.containsKey(key) || properties.get(key) == null)
throw new SlcException(key + " not set in " + properties);
return properties.get(key).toString();
}
+ public void setBundlesManager(BundlesManager bundlesManager) {
+ this.bundlesManager = bundlesManager;
+ }
+
public void setDefaultDescriptorConverter(
ExecutionFlowDescriptorConverter defaultDescriptorConverter) {
this.defaultDescriptorConverter = defaultDescriptorConverter;
}
+
+ public void setRegisterFlowsToJmx(Boolean registerFlowsToJmx) {
+ this.registerFlowsToJmx = registerFlowsToJmx;
+ }
+
}