package org.argeo.slc.osgi;
+import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.argeo.slc.BasicNameVersion;
import org.argeo.slc.SlcException;
import org.argeo.slc.core.execution.AbstractExecutionModulesManager;
import org.argeo.slc.core.execution.DefaultExecutionFlowDescriptorConverter;
+import org.argeo.slc.deploy.Module;
import org.argeo.slc.deploy.ModuleDescriptor;
import org.argeo.slc.execution.ExecutionContext;
import org.argeo.slc.execution.ExecutionFlow;
import org.osgi.framework.Constants;
import org.springframework.osgi.service.importer.OsgiServiceLifecycleListener;
+/** Execution modules manager implementation based on an OSGi runtime. */
public class OsgiExecutionModulesManager extends
AbstractExecutionModulesManager implements OsgiServiceLifecycleListener {
private Map<OsgiBundle, Set<ExecutionFlow>> executionFlows = new HashMap<OsgiBundle, Set<ExecutionFlow>>();
private ExecutionFlowDescriptorConverter defaultDescriptorConverter = new DefaultExecutionFlowDescriptorConverter();
+ private List<ExecutionModulesListener> executionModulesListeners = new ArrayList<ExecutionModulesListener>();
+
+ private Boolean registerFlowsToJmx = true;
+
public synchronized ExecutionModuleDescriptor getExecutionModuleDescriptor(
String moduleName, String version) {
ExecutionModuleDescriptor md = new ExecutionModuleDescriptor();
+ nameVersion);
md.setName(osgiBundle.getName());
md.setVersion(osgiBundle.getVersion());
- md.setLabel(osgiBundle.getLabel());
+ md.setTitle(osgiBundle.getTitle());
md.setDescription(osgiBundle.getDescription());
ExecutionFlowDescriptorConverter executionFlowDescriptorConverter = getExecutionFlowDescriptorConverter(
ExecutionFlowDescriptorConverter.class, filter);
}
- public void setBundlesManager(BundlesManager bundlesManager) {
- this.bundlesManager = bundlesManager;
- }
-
/**
* Builds a minimal realized flow, based on the provided information
* (typically from the command line).
protected synchronized ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter(
String moduleName, String moduleVersion) {
OsgiBundle osgiBundle = new OsgiBundle(moduleName, moduleVersion);
+ return getExecutionFlowDescriptorConverter(osgiBundle);
+ }
+
+ protected synchronized ExecutionFlowDescriptorConverter getExecutionFlowDescriptorConverter(
+ OsgiBundle osgiBundle) {
if (executionFlowDescriptorConverters.containsKey(osgiBundle))
return executionFlowDescriptorConverters.get(osgiBundle);
else
md.setName(bdl.getSymbolicName());
md.setVersion(getHeaderSafe(bdl, Constants.BUNDLE_VERSION));
- md.setLabel(getHeaderSafe(bdl, Constants.BUNDLE_NAME));
+ md.setTitle(getHeaderSafe(bdl, Constants.BUNDLE_NAME));
md.setDescription(getHeaderSafe(bdl, Constants.BUNDLE_DESCRIPTION));
}
return obj.toString();
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public synchronized void bind(Object service, Map properties)
- throws Exception {
- if (service instanceof ExecutionContext) {
- ExecutionContext executionContext = (ExecutionContext) service;
- OsgiBundle osgiBundle = asOsgiBundle(properties);
- Bundle bundle = bundlesManager.findRelatedBundle(osgiBundle);
- osgiBundle.setLabel(getHeaderSafe(bundle, Constants.BUNDLE_NAME));
- osgiBundle.setDescription(getHeaderSafe(bundle,
- Constants.BUNDLE_DESCRIPTION));
- executionContexts.put(osgiBundle, executionContext);
+ /*
+ * REGISTRATION
+ */
+
+ /** Registers an execution context. */
+ public synchronized void register(ExecutionContext executionContext,
+ Map<String, String> properties) {
+ OsgiBundle osgiBundle = asOsgiBundle(properties);
+ Bundle bundle = bundlesManager.findRelatedBundle(osgiBundle);
+ osgiBundle.setTitle(getHeaderSafe(bundle, Constants.BUNDLE_NAME));
+ osgiBundle.setDescription(getHeaderSafe(bundle,
+ Constants.BUNDLE_DESCRIPTION));
+ executionContexts.put(osgiBundle, executionContext);
+ if (log.isTraceEnabled())
+ log.trace("Registered execution context from " + osgiBundle);
+ // Notify
+ for (ExecutionModulesListener listener : executionModulesListeners)
+ listener.executionModuleAdded(osgiBundle.getModuleDescriptor());
+ }
+
+ /** Unregisters an execution context. */
+ public synchronized void unregister(ExecutionContext executionContext,
+ Map<String, String> properties) {
+ OsgiBundle osgiBundle = asOsgiBundle(properties);
+ if (executionContexts.containsKey(osgiBundle)) {
+ executionContexts.remove(osgiBundle);
if (log.isTraceEnabled())
- log.debug("Registered execution context from " + osgiBundle);
+ log.trace("Removed execution context from " + osgiBundle);
// Notify
- for (ExecutionModulesListener listener : getExecutionModulesListeners())
- listener.executionModuleAdded(osgiBundle, executionContext);
+ for (ExecutionModulesListener listener : executionModulesListeners)
+ listener.executionModuleRemoved(osgiBundle
+ .getModuleDescriptor());
+ }
+ }
- } else if (service instanceof ExecutionFlow) {
- ExecutionFlow executionFlow = (ExecutionFlow) service;
- OsgiBundle osgiBundle = asOsgiBundle(properties);
- if (!executionFlows.containsKey(osgiBundle)) {
- executionFlows.put(osgiBundle, new HashSet());
- }
- executionFlows.get(osgiBundle).add(executionFlow);
+ /** Registers an execution flow. */
+ public synchronized void register(ExecutionFlow executionFlow,
+ Map<String, String> properties) {
+ OsgiBundle osgiBundle = asOsgiBundle(properties);
+ if (!executionFlows.containsKey(osgiBundle)) {
+ executionFlows.put(osgiBundle, new HashSet<ExecutionFlow>());
+ }
+ executionFlows.get(osgiBundle).add(executionFlow);
+ if (log.isTraceEnabled())
+ log.trace("Registered " + executionFlow + " from " + osgiBundle);
+
+ // notifications
+ if (registerFlowsToJmx)
+ registerMBean(osgiBundle, executionFlow);
+ ExecutionFlowDescriptorConverter efdc = getExecutionFlowDescriptorConverter(osgiBundle);
+ for (ExecutionModulesListener listener : executionModulesListeners)
+ listener.executionFlowAdded(osgiBundle.getModuleDescriptor(),
+ efdc.getExecutionFlowDescriptor(executionFlow));
+ }
+
+ /** Unregisters an execution flow. */
+ public synchronized void unregister(ExecutionFlow executionFlow,
+ Map<String, String> properties) {
+ OsgiBundle osgiBundle = asOsgiBundle(properties);
+ if (executionFlows.containsKey(osgiBundle)) {
+ Set<ExecutionFlow> flows = executionFlows.get(osgiBundle);
+ flows.remove(executionFlow);
if (log.isTraceEnabled())
- log.debug("Registered " + executionFlow + " from " + osgiBundle);
- for (ExecutionModulesListener listener : getExecutionModulesListeners())
- listener.executionFlowAdded(osgiBundle, executionFlow);
+ log.trace("Removed " + executionFlow + " from " + osgiBundle);
+ if (flows.size() == 0) {
+ executionFlows.remove(osgiBundle);
+ if (log.isTraceEnabled())
+ log.trace("Removed flows set from " + osgiBundle);
+ }
+
+ // notifications
+ if (registerFlowsToJmx)
+ unregisterMBean(osgiBundle, executionFlow);
+ ExecutionFlowDescriptorConverter efdc = getExecutionFlowDescriptorConverter(osgiBundle);
+ for (ExecutionModulesListener listener : executionModulesListeners)
+ listener.executionFlowRemoved(osgiBundle.getModuleDescriptor(),
+ efdc.getExecutionFlowDescriptor(executionFlow));
+ }
+ }
+
+ /** Registers an execution module listener. */
+ public synchronized void register(
+ ExecutionModulesListener executionModulesListener,
+ Map<String, String> properties) {
+ // sync with current state
+ for (OsgiBundle osgiBundle : executionContexts.keySet()) {
+ executionModulesListener.executionModuleAdded(osgiBundle
+ .getModuleDescriptor());
+ }
+ for (OsgiBundle osgiBundle : executionFlows.keySet()) {
+ ExecutionFlowDescriptorConverter efdc = getExecutionFlowDescriptorConverter(osgiBundle);
+ for (ExecutionFlow executionFlow : executionFlows.get(osgiBundle))
+ executionModulesListener.executionFlowAdded(
+ osgiBundle.getModuleDescriptor(),
+ efdc.getExecutionFlowDescriptor(executionFlow));
+ }
+ executionModulesListeners.add(executionModulesListener);
+ }
- } else if (service instanceof ExecutionFlowDescriptorConverter) {
+ /** Unregisters an execution module listener. */
+ public synchronized void unregister(
+ ExecutionModulesListener executionModulesListener,
+ Map<String, String> properties) {
+ executionModulesListeners.remove(executionModulesListener);
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ public synchronized void bind(Object service, Map properties)
+ throws Exception {
+ if (service instanceof ExecutionFlowDescriptorConverter) {
ExecutionFlowDescriptorConverter executionFlowDescriptorConverter = (ExecutionFlowDescriptorConverter) service;
OsgiBundle osgiBundle = asOsgiBundle(properties);
executionFlowDescriptorConverters.put(osgiBundle,
@SuppressWarnings("rawtypes")
public synchronized void unbind(Object service, Map properties)
throws Exception {
- if (service instanceof ExecutionContext) {
- OsgiBundle osgiBundle = asOsgiBundle(properties);
- if (executionContexts.containsKey(osgiBundle)) {
- ExecutionContext executionContext = executionContexts
- .remove(osgiBundle);
- if (log.isTraceEnabled())
- log.debug("Removed execution context from " + osgiBundle);
- // Notify
- for (ExecutionModulesListener listener : getExecutionModulesListeners())
- listener.executionModuleRemoved(osgiBundle,
- executionContext);
- }
- } else if (service instanceof ExecutionFlow) {
- ExecutionFlow executionFlow = (ExecutionFlow) service;
- OsgiBundle osgiBundle = asOsgiBundle(properties);
- if (executionFlows.containsKey(osgiBundle)) {
- Set flows = executionFlows.get(osgiBundle);
- flows.remove(executionFlow);
- if (log.isTraceEnabled())
- log.debug("Removed " + executionFlow + " from "
- + osgiBundle);
- if (flows.size() == 0) {
- executionFlows.remove(osgiBundle);
- if (log.isTraceEnabled())
- log.debug("Removed flows set from " + osgiBundle);
- }
- for (ExecutionModulesListener listener : getExecutionModulesListeners())
- listener.executionFlowRemoved(osgiBundle, executionFlow);
- }
- } else if (service instanceof ExecutionFlowDescriptorConverter) {
+ if (service instanceof ExecutionFlowDescriptorConverter) {
OsgiBundle osgiBundle = asOsgiBundle(properties);
if (executionFlowDescriptorConverters.containsKey(osgiBundle)) {
executionFlowDescriptorConverters.remove(osgiBundle);
}
}
+ /*
+ * JMX
+ */
+ protected MBeanServer getMBeanServer() {
+ return ManagementFactory.getPlatformMBeanServer();
+ }
+
+ public void registerMBean(Module module, ExecutionFlow executionFlow) {
+ try {
+ StandardMBean mbean = new StandardMBean(executionFlow,
+ ExecutionFlow.class);
+ getMBeanServer().registerMBean(mbean,
+ flowMBeanName(module, executionFlow));
+ } catch (Exception e) {
+ String msg = "Cannot register execution flow " + executionFlow
+ + " as mbean";
+ throw new SlcException(msg, e);
+ }
+ }
+
+ public void unregisterMBean(Module module, ExecutionFlow executionFlow) {
+ try {
+ getMBeanServer().unregisterMBean(
+ flowMBeanName(module, executionFlow));
+ } catch (Exception e) {
+ String msg = "Cannot unregister execution flow " + executionFlow
+ + " as mbean";
+ throw new SlcException(msg, e);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ protected ObjectName flowMBeanName(Module module,
+ ExecutionFlow executionFlow) {
+ String executionModulesPrefix = "SLCExecutionModules";
+ String path = executionFlow.getPath();
+ String name = executionFlow.getName();
+ if (path == null && name.indexOf('/') >= 0) {
+ path = name.substring(0, name.lastIndexOf('/') - 1);
+ name = name.substring(name.lastIndexOf('/'));
+ }
+
+ StringBuffer buf = new StringBuffer(executionModulesPrefix + ":"
+ + "module=" + module.getName() + " [" + module.getVersion()
+ + "],");
+
+ if (path != null && !path.equals("")) {
+ int depth = 0;
+ for (String token : path.split("/")) {
+ if (!token.equals("")) {
+ buf.append("path").append(depth).append('=');
+ // in order to have directories first
+ buf.append('/');
+ buf.append(token).append(',');
+ depth++;
+ }
+ }
+ }
+ buf.append("name=").append(name);
+ try {
+ return new ObjectName(buf.toString());
+ } catch (Exception e) {
+ throw new SlcException("Cannot generate object name based on "
+ + buf, e);
+ }
+ }
+
+ /*
+ * UTILITIES
+ */
@SuppressWarnings("rawtypes")
private OsgiBundle asOsgiBundle(Map properties) {
String bundleSymbolicName = checkAndGet(Constants.BUNDLE_SYMBOLICNAME,
return properties.get(key).toString();
}
+ public void setBundlesManager(BundlesManager bundlesManager) {
+ this.bundlesManager = bundlesManager;
+ }
+
public void setDefaultDescriptorConverter(
ExecutionFlowDescriptorConverter defaultDescriptorConverter) {
this.defaultDescriptorConverter = defaultDescriptorConverter;
}
+
+ public void setRegisterFlowsToJmx(Boolean registerFlowsToJmx) {
+ this.registerFlowsToJmx = registerFlowsToJmx;
+ }
+
}