2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org
.argeo
.slc
.osgi
;
19 import java
.lang
.management
.ManagementFactory
;
20 import java
.util
.ArrayList
;
21 import java
.util
.HashMap
;
22 import java
.util
.HashSet
;
23 import java
.util
.Iterator
;
24 import java
.util
.List
;
28 import javax
.management
.MBeanServer
;
29 import javax
.management
.ObjectName
;
30 import javax
.management
.StandardMBean
;
32 import org
.apache
.commons
.logging
.Log
;
33 import org
.apache
.commons
.logging
.LogFactory
;
34 import org
.argeo
.slc
.BasicNameVersion
;
35 import org
.argeo
.slc
.NameVersion
;
36 import org
.argeo
.slc
.SlcException
;
37 import org
.argeo
.slc
.core
.execution
.AbstractExecutionModulesManager
;
38 import org
.argeo
.slc
.core
.execution
.DefaultExecutionFlowDescriptorConverter
;
39 import org
.argeo
.slc
.deploy
.Module
;
40 import org
.argeo
.slc
.deploy
.ModuleDescriptor
;
41 import org
.argeo
.slc
.execution
.ExecutionContext
;
42 import org
.argeo
.slc
.execution
.ExecutionFlow
;
43 import org
.argeo
.slc
.execution
.ExecutionFlowDescriptor
;
44 import org
.argeo
.slc
.execution
.ExecutionFlowDescriptorConverter
;
45 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
46 import org
.argeo
.slc
.execution
.ExecutionModulesListener
;
47 import org
.argeo
.slc
.process
.RealizedFlow
;
48 import org
.osgi
.framework
.Bundle
;
49 import org
.osgi
.framework
.BundleException
;
50 import org
.osgi
.framework
.Constants
;
51 import org
.osgi
.framework
.launch
.Framework
;
52 import org
.springframework
.osgi
.service
.importer
.OsgiServiceLifecycleListener
;
54 /** Execution modules manager implementation based on an OSGi runtime. */
55 public class OsgiExecutionModulesManager
extends
56 AbstractExecutionModulesManager
implements OsgiServiceLifecycleListener
{
58 private final static Log log
= LogFactory
59 .getLog(OsgiExecutionModulesManager
.class);
61 private BundlesManager bundlesManager
;
62 private Map
<OsgiBundle
, ExecutionContext
> executionContexts
= new HashMap
<OsgiBundle
, ExecutionContext
>();
63 private Map
<OsgiBundle
, ExecutionFlowDescriptorConverter
> executionFlowDescriptorConverters
= new HashMap
<OsgiBundle
, ExecutionFlowDescriptorConverter
>();
64 private Map
<OsgiBundle
, Set
<ExecutionFlow
>> executionFlows
= new HashMap
<OsgiBundle
, Set
<ExecutionFlow
>>();
65 private ExecutionFlowDescriptorConverter defaultDescriptorConverter
= new DefaultExecutionFlowDescriptorConverter();
67 private List
<ExecutionModulesListener
> executionModulesListeners
= new ArrayList
<ExecutionModulesListener
>();
69 private Boolean registerFlowsToJmx
= true;
71 public void init() throws Exception
{
72 final String module
= System
.getProperty(UNIQUE_LAUNCH_MODULE_PROPERTY
);
73 final String flow
= System
.getProperty(UNIQUE_LAUNCH_FLOW_PROPERTY
);
75 // launch a flow and stops
76 new Thread("Unique Flow") {
79 if (log
.isDebugEnabled())
80 log
.debug("Launch unique flow " + flow
81 + " from module " + module
);
83 OsgiBundle osgiBundle
= bundlesManager
84 .findFromPattern(module
);
85 Bundle moduleBundle
= bundlesManager
86 .findRelatedBundle(osgiBundle
);
87 bundlesManager
.startSynchronous(moduleBundle
);
88 RealizedFlow lastLaunch
= findRealizedFlow(module
, flow
);
89 if (lastLaunch
== null)
90 throw new SlcException("Cannot find launch for "
91 + module
+ " " + flow
);
93 } catch (Exception e
) {
94 log
.error("Error in unique flow " + flow
95 + " from module " + module
, e
);
97 if (log
.isDebugEnabled())
98 log
.debug("Shutdown OSGi runtime...");
99 Framework framework
= (Framework
) bundlesManager
100 .getBundleContext().getBundle(0);
102 // shutdown framework
104 // wait 1 min for shutdown
105 framework
.waitForStop(60 * 1000);
108 } catch (Exception e
) {
119 public void destroy() {
123 public synchronized ExecutionModuleDescriptor
getExecutionModuleDescriptor(
124 String moduleName
, String version
) {
125 ExecutionModuleDescriptor md
= new ExecutionModuleDescriptor();
126 OsgiBundle osgiBundle
= null;
127 BasicNameVersion nameVersion
= new BasicNameVersion(moduleName
, version
);
128 bundles
: for (Iterator
<OsgiBundle
> iterator
= executionContexts
129 .keySet().iterator(); iterator
.hasNext();) {
130 OsgiBundle ob
= iterator
.next();
131 if (ob
.equals(nameVersion
)) {
136 if (osgiBundle
== null)
137 throw new SlcException("No execution module registered for "
139 md
.setName(osgiBundle
.getName());
140 md
.setVersion(osgiBundle
.getVersion());
141 md
.setTitle(osgiBundle
.getTitle());
142 md
.setDescription(osgiBundle
.getDescription());
144 ExecutionFlowDescriptorConverter executionFlowDescriptorConverter
= getExecutionFlowDescriptorConverter(
145 moduleName
, version
);
146 if (executionFlowDescriptorConverter
== null)
147 throw new SlcException("No flow converter found.");
148 executionFlowDescriptorConverter
.addFlowsToDescriptor(md
,
149 listFlows(moduleName
, version
));
153 public synchronized List
<ExecutionModuleDescriptor
> listExecutionModules() {
154 List
<ExecutionModuleDescriptor
> descriptors
= new ArrayList
<ExecutionModuleDescriptor
>();
156 for (Iterator
<OsgiBundle
> iterator
= executionContexts
.keySet()
157 .iterator(); iterator
.hasNext();) {
158 OsgiBundle osgiBundle
= iterator
.next();
159 ExecutionModuleDescriptor md
= new ExecutionModuleDescriptor();
160 setMetadataFromBundle(md
,
161 bundlesManager
.findRelatedBundle(osgiBundle
));
167 protected synchronized Map
<String
, ExecutionFlow
> listFlows(
168 String moduleName
, String moduleVersion
) {
170 Map
<String
, ExecutionFlow
> flows
= new HashMap
<String
, ExecutionFlow
>();
171 OsgiBundle key
= new OsgiBundle(moduleName
, moduleVersion
);
172 if (!executionFlows
.containsKey(key
))
174 Set
<ExecutionFlow
> flowsT
= executionFlows
.get(key
);
175 for (ExecutionFlow flow
: flowsT
)
176 flows
.put(flow
.getName(), flow
);
180 protected ExecutionFlow
findExecutionFlow(String moduleName
,
181 String moduleVersion
, String flowName
) {
182 String filter
= "(&(Bundle-SymbolicName=" + moduleName
183 + ")(org.springframework.osgi.bean.name=" + flowName
+ "))";
184 return bundlesManager
.getSingleServiceStrict(ExecutionFlow
.class,
188 protected ExecutionContext
findExecutionContext(String moduleName
,
189 String moduleVersion
) {
190 String filter
= "(&(Bundle-SymbolicName=" + moduleName
191 + ")(Bundle-Version=" + moduleVersion
+ "))";
192 return bundlesManager
.getSingleServiceStrict(ExecutionContext
.class,
196 protected ExecutionFlowDescriptorConverter
findExecutionFlowDescriptorConverter(
197 String moduleName
, String moduleVersion
) {
198 String filter
= "(&(Bundle-SymbolicName=" + moduleName
199 + ")(Bundle-Version=" + moduleVersion
+ "))";
200 return bundlesManager
.getSingleService(
201 ExecutionFlowDescriptorConverter
.class, filter
, false);
205 * Builds a minimal realized flow, based on the provided information
206 * (typically from the command line).
209 * a bundle id, or a pattern contained in a bundle symbolic name
211 * the execution flow name
212 * @return a minimal realized flow, to be used in an execution
214 public RealizedFlow
findRealizedFlow(String module
, String executionName
) {
215 // First check whether we have a bundleId
216 Long bundleId
= null;
218 bundleId
= Long
.parseLong(module
);
219 } catch (NumberFormatException e
) {
223 // Look for bundle names containing pattern
224 OsgiBundle bundle
= null;
225 if (bundleId
!= null) {
226 bundle
= bundlesManager
.getBundle(bundleId
);
228 bundle
= bundlesManager
.findFromPattern(module
);
231 if (bundle
!= null) {
232 RealizedFlow launch
= new RealizedFlow();
233 launch
.setModuleName(bundle
.getName());
234 launch
.setModuleVersion(bundle
.getVersion());
235 ExecutionFlowDescriptor descriptor
= new ExecutionFlowDescriptor();
236 descriptor
.setName(executionName
);
237 launch
.setFlowDescriptor(descriptor
);
240 log
.warn("Could not find any execution module matching these requirements.");
245 public void upgrade(NameVersion nameVersion
) {
246 OsgiBundle osgiBundle
= new OsgiBundle(nameVersion
);
247 bundlesManager
.upgradeSynchronous(osgiBundle
);
250 protected synchronized ExecutionFlowDescriptorConverter
getExecutionFlowDescriptorConverter(
251 String moduleName
, String moduleVersion
) {
252 OsgiBundle osgiBundle
= new OsgiBundle(moduleName
, moduleVersion
);
253 return getExecutionFlowDescriptorConverter(osgiBundle
);
256 protected synchronized ExecutionFlowDescriptorConverter
getExecutionFlowDescriptorConverter(
257 OsgiBundle osgiBundle
) {
258 if (executionFlowDescriptorConverters
.containsKey(osgiBundle
))
259 return executionFlowDescriptorConverters
.get(osgiBundle
);
261 return defaultDescriptorConverter
;
264 public ModuleDescriptor
getModuleDescriptor(String moduleName
,
266 return getExecutionModuleDescriptor(moduleName
, version
);
269 public List
<ModuleDescriptor
> listModules() {
270 Bundle
[] bundles
= bundlesManager
.getBundleContext().getBundles();
271 List
<ModuleDescriptor
> lst
= new ArrayList
<ModuleDescriptor
>();
272 for (Bundle bundle
: bundles
) {
273 ModuleDescriptor moduleDescriptor
= new ModuleDescriptor();
274 setMetadataFromBundle(moduleDescriptor
, bundle
);
275 lst
.add(moduleDescriptor
);
280 public void start(NameVersion nameVersion
) {
282 Bundle bundle
= bundlesManager
.findRelatedBundle(new OsgiBundle(
284 bundlesManager
.startSynchronous(bundle
);
285 } catch (BundleException e
) {
286 throw new SlcException("Cannot start " + nameVersion
, e
);
290 public void stop(NameVersion nameVersion
) {
292 Bundle bundle
= bundlesManager
.findRelatedBundle(new OsgiBundle(
294 bundlesManager
.stopSynchronous(bundle
);
295 } catch (BundleException e
) {
296 throw new SlcException("Cannot stop " + nameVersion
, e
);
300 protected void setMetadataFromBundle(ModuleDescriptor md
, Bundle bundle
) {
303 if (md
.getName() == null || md
.getVersion() == null)
304 throw new SlcException("Name and version not available.");
306 Bundle
[] bundles
= bundlesManager
.getBundleContext().getBundles();
307 for (Bundle b
: bundles
) {
308 if (b
.getSymbolicName().equals(md
.getName())
309 && md
.getVersion().equals(
310 getHeaderSafe(b
, Constants
.BUNDLE_VERSION
))) {
319 throw new SlcException("Cannot find bundle.");
321 md
.setName(bdl
.getSymbolicName());
322 md
.setVersion(getHeaderSafe(bdl
, Constants
.BUNDLE_VERSION
));
323 md
.setTitle(getHeaderSafe(bdl
, Constants
.BUNDLE_NAME
));
324 md
.setDescription(getHeaderSafe(bdl
, Constants
.BUNDLE_DESCRIPTION
));
327 private String
getHeaderSafe(Bundle bundle
, Object key
) {
328 Object obj
= bundle
.getHeaders().get(key
);
332 return obj
.toString();
339 /** Registers an execution context. */
340 public synchronized void register(ExecutionContext executionContext
,
341 Map
<String
, String
> properties
) {
342 OsgiBundle osgiBundle
= asOsgiBundle(properties
);
343 Bundle bundle
= bundlesManager
.findRelatedBundle(osgiBundle
);
344 osgiBundle
.setTitle(getHeaderSafe(bundle
, Constants
.BUNDLE_NAME
));
345 osgiBundle
.setDescription(getHeaderSafe(bundle
,
346 Constants
.BUNDLE_DESCRIPTION
));
347 executionContexts
.put(osgiBundle
, executionContext
);
348 if (log
.isTraceEnabled())
349 log
.trace("Registered execution context from " + osgiBundle
);
351 for (ExecutionModulesListener listener
: executionModulesListeners
)
352 listener
.executionModuleAdded(osgiBundle
.getModuleDescriptor());
355 /** Unregisters an execution context. */
356 public synchronized void unregister(ExecutionContext executionContext
,
357 Map
<String
, String
> properties
) {
358 OsgiBundle osgiBundle
= asOsgiBundle(properties
);
359 if (executionContexts
.containsKey(osgiBundle
)) {
360 executionContexts
.remove(osgiBundle
);
361 if (log
.isTraceEnabled())
362 log
.trace("Removed execution context from " + osgiBundle
);
364 for (ExecutionModulesListener listener
: executionModulesListeners
)
365 listener
.executionModuleRemoved(osgiBundle
366 .getModuleDescriptor());
370 /** Registers an execution flow. */
371 public synchronized void register(ExecutionFlow executionFlow
,
372 Map
<String
, String
> properties
) {
373 OsgiBundle osgiBundle
= asOsgiBundle(properties
);
374 if (!executionFlows
.containsKey(osgiBundle
)) {
375 executionFlows
.put(osgiBundle
, new HashSet
<ExecutionFlow
>());
377 executionFlows
.get(osgiBundle
).add(executionFlow
);
378 if (log
.isTraceEnabled())
379 log
.trace("Registered " + executionFlow
+ " from " + osgiBundle
);
382 if (registerFlowsToJmx
)
383 registerMBean(osgiBundle
, executionFlow
);
384 ExecutionFlowDescriptorConverter efdc
= getExecutionFlowDescriptorConverter(osgiBundle
);
385 for (ExecutionModulesListener listener
: executionModulesListeners
)
386 listener
.executionFlowAdded(osgiBundle
.getModuleDescriptor(),
387 efdc
.getExecutionFlowDescriptor(executionFlow
));
390 /** Unregisters an execution flow. */
391 public synchronized void unregister(ExecutionFlow executionFlow
,
392 Map
<String
, String
> properties
) {
393 OsgiBundle osgiBundle
= asOsgiBundle(properties
);
394 if (executionFlows
.containsKey(osgiBundle
)) {
395 Set
<ExecutionFlow
> flows
= executionFlows
.get(osgiBundle
);
396 flows
.remove(executionFlow
);
397 if (log
.isTraceEnabled())
398 log
.trace("Removed " + executionFlow
+ " from " + osgiBundle
);
399 if (flows
.size() == 0) {
400 executionFlows
.remove(osgiBundle
);
401 if (log
.isTraceEnabled())
402 log
.trace("Removed flows set from " + osgiBundle
);
406 if (registerFlowsToJmx
)
407 unregisterMBean(osgiBundle
, executionFlow
);
408 ExecutionFlowDescriptorConverter efdc
= getExecutionFlowDescriptorConverter(osgiBundle
);
409 for (ExecutionModulesListener listener
: executionModulesListeners
)
410 listener
.executionFlowRemoved(osgiBundle
.getModuleDescriptor(),
411 efdc
.getExecutionFlowDescriptor(executionFlow
));
415 /** Registers an execution module listener. */
416 public synchronized void register(
417 ExecutionModulesListener executionModulesListener
,
418 Map
<String
, String
> properties
) {
419 // sync with current state
420 for (OsgiBundle osgiBundle
: executionContexts
.keySet()) {
421 executionModulesListener
.executionModuleAdded(osgiBundle
422 .getModuleDescriptor());
424 for (OsgiBundle osgiBundle
: executionFlows
.keySet()) {
425 ExecutionFlowDescriptorConverter efdc
= getExecutionFlowDescriptorConverter(osgiBundle
);
426 for (ExecutionFlow executionFlow
: executionFlows
.get(osgiBundle
))
427 executionModulesListener
.executionFlowAdded(
428 osgiBundle
.getModuleDescriptor(),
429 efdc
.getExecutionFlowDescriptor(executionFlow
));
431 executionModulesListeners
.add(executionModulesListener
);
434 /** Unregisters an execution module listener. */
435 public synchronized void unregister(
436 ExecutionModulesListener executionModulesListener
,
437 Map
<String
, String
> properties
) {
438 executionModulesListeners
.remove(executionModulesListener
);
441 @SuppressWarnings({ "rawtypes" })
442 public synchronized void bind(Object service
, Map properties
)
444 if (service
instanceof ExecutionFlowDescriptorConverter
) {
445 ExecutionFlowDescriptorConverter executionFlowDescriptorConverter
= (ExecutionFlowDescriptorConverter
) service
;
446 OsgiBundle osgiBundle
= asOsgiBundle(properties
);
447 executionFlowDescriptorConverters
.put(osgiBundle
,
448 executionFlowDescriptorConverter
);
449 if (log
.isTraceEnabled())
450 log
.debug("Registered execution flow descriptor converter from "
457 @SuppressWarnings("rawtypes")
458 public synchronized void unbind(Object service
, Map properties
)
460 if (service
instanceof ExecutionFlowDescriptorConverter
) {
461 OsgiBundle osgiBundle
= asOsgiBundle(properties
);
462 if (executionFlowDescriptorConverters
.containsKey(osgiBundle
)) {
463 executionFlowDescriptorConverters
.remove(osgiBundle
);
464 if (log
.isTraceEnabled())
465 log
.debug("Removed execution flow descriptor converter from "
476 protected MBeanServer
getMBeanServer() {
477 return ManagementFactory
.getPlatformMBeanServer();
480 public void registerMBean(Module module
, ExecutionFlow executionFlow
) {
482 StandardMBean mbean
= new StandardMBean(executionFlow
,
483 ExecutionFlow
.class);
484 getMBeanServer().registerMBean(mbean
,
485 flowMBeanName(module
, executionFlow
));
486 } catch (Exception e
) {
487 String msg
= "Cannot register execution flow " + executionFlow
489 throw new SlcException(msg
, e
);
493 public void unregisterMBean(Module module
, ExecutionFlow executionFlow
) {
495 getMBeanServer().unregisterMBean(
496 flowMBeanName(module
, executionFlow
));
497 } catch (Exception e
) {
498 String msg
= "Cannot unregister execution flow " + executionFlow
500 throw new SlcException(msg
, e
);
504 @SuppressWarnings("deprecation")
505 protected ObjectName
flowMBeanName(Module module
,
506 ExecutionFlow executionFlow
) {
507 String executionModulesPrefix
= "SLCExecutionModules";
508 String path
= executionFlow
.getPath();
509 String name
= executionFlow
.getName();
510 if (path
== null && name
.indexOf('/') >= 0) {
511 path
= name
.substring(0, name
.lastIndexOf('/'));
512 name
= name
.substring(name
.lastIndexOf('/'));
515 StringBuffer buf
= new StringBuffer(executionModulesPrefix
+ ":"
516 + "module=" + module
.getName() + " [" + module
.getVersion()
519 if (path
!= null && !path
.equals("")) {
521 for (String token
: path
.split("/")) {
522 if (!token
.equals("")) {
523 buf
.append("path").append(depth
).append('=');
524 // in order to have directories first
526 buf
.append(token
).append(',');
531 buf
.append("name=").append(name
);
533 return new ObjectName(buf
.toString());
534 } catch (Exception e
) {
535 throw new SlcException("Cannot generate object name based on "
543 @SuppressWarnings("rawtypes")
544 private OsgiBundle
asOsgiBundle(Map properties
) {
545 String bundleSymbolicName
= checkAndGet(Constants
.BUNDLE_SYMBOLICNAME
,
547 String bundleVersion
= checkAndGet(Constants
.BUNDLE_VERSION
, properties
);
548 return new OsgiBundle(bundleSymbolicName
, bundleVersion
);
551 @SuppressWarnings("rawtypes")
552 private String
checkAndGet(Object key
, Map properties
) {
553 if (!properties
.containsKey(key
) || properties
.get(key
) == null)
554 throw new SlcException(key
+ " not set in " + properties
);
556 return properties
.get(key
).toString();
559 public void setBundlesManager(BundlesManager bundlesManager
) {
560 this.bundlesManager
= bundlesManager
;
563 public void setDefaultDescriptorConverter(
564 ExecutionFlowDescriptorConverter defaultDescriptorConverter
) {
565 this.defaultDescriptorConverter
= defaultDescriptorConverter
;
568 public void setRegisterFlowsToJmx(Boolean registerFlowsToJmx
) {
569 this.registerFlowsToJmx
= registerFlowsToJmx
;