From b523980e22f7712f178df91ecad0e7f390c8ca2e Mon Sep 17 00:00:00 2001 From: Mathieu Baudier Date: Tue, 28 Jul 2009 10:50:48 +0000 Subject: [PATCH] Completely refactor the way the execution server looks for steps git-svn-id: https://svn.argeo.org/slc/trunk@2815 4cfe0d0a-d680-48aa-b62c-e0a02a3f76cc --- .../argeo/slc/detached/DetachedAnswer.java | 3 +- .../detached/DetachedExecutionServerImpl.java | 181 +++++++++++++++--- .../argeo/slc/detached/DetachedRequest.java | 23 ++- .../slc/detached/drivers/AbstractDriver.java | 86 ++++++++- 4 files changed, 258 insertions(+), 35 deletions(-) diff --git a/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedAnswer.java b/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedAnswer.java index 51162c44f..1604be762 100644 --- a/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedAnswer.java +++ b/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedAnswer.java @@ -90,12 +90,11 @@ public class DetachedAnswer implements DetachedCommunication { } public String toString() { - StringBuffer buf = new StringBuffer(getClass().getName()); + StringBuffer buf = new StringBuffer("detached answer "); buf.append('#').append(uuid); buf.append(" status=").append(convertStatus(status)); buf.append(" properties=").append(properties); buf.append(" log=").append(log); return buf.toString(); } - } diff --git a/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java b/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java index 0cfd3113e..cff93562d 100644 --- a/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java +++ b/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java @@ -2,7 +2,11 @@ package org.argeo.slc.detached; import java.io.PrintWriter; import java.io.StringWriter; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Vector; import org.apache.commons.io.IOUtils; @@ -11,13 +15,18 @@ import org.apache.commons.logging.LogFactory; import org.argeo.slc.detached.admin.CloseSession; import org.argeo.slc.detached.admin.OpenSession; import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceReference; +import org.osgi.framework.Constants; +import org.osgi.util.tracker.ServiceTracker; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.osgi.context.BundleContextAware; /** Default implementation of a detached server. */ public class DetachedExecutionServerImpl implements DetachedExecutionServer, - BundleContextAware { + BundleContextAware, InitializingBean, DisposableBean, + ApplicationContextAware { private final static Log log = LogFactory .getLog(DetachedExecutionServerImpl.class); @@ -28,6 +37,12 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer, // always an open session private BundleContext bundleContext; + private ApplicationContext applicationContext; + + private final static String ALL_APP_CONTEXTS_KEY = "__allApplicationContexts"; + + private Map/* */appContextServiceTrackers = Collections + .synchronizedMap(new HashMap()); public DetachedExecutionServerImpl() { detachedContext = new DetachedContextImpl(); @@ -35,28 +50,11 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer, } public synchronized DetachedAnswer executeRequest(DetachedRequest request) { - log.info("Received request " + request); + log.info("Received " + request); DetachedAnswer answer = null; try { - // Find action - ServiceReference[] refs = bundleContext.getAllServiceReferences( - ApplicationContext.class.getName(), null); - Object obj = null; - for (int i = 0; i < refs.length; i++) { - ApplicationContext appContext = (ApplicationContext) bundleContext - .getService(refs[i]); - try { - obj = appContext.getBean(request.getRef()); - } catch (Exception e) { - // silent - if (log.isTraceEnabled()) - log.trace("Could not find ref " + request.getRef(), e); - } - if (obj != null) { - break; - } - } + Object obj = retrieveStep(request); if (obj == null) throw new DetachedException("Could not find action with ref " @@ -85,7 +83,7 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer, IOUtils.closeQuietly(writer); } - // Case where current session is unexpectly null + // Case where current session is unexpectedly null if (getCurrentSession() == null) { log .error("CURRENT SESSION IS NULL." @@ -106,10 +104,122 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer, getCurrentSession().getRequests().add(request); getCurrentSession().getAnswers().add(answer); - log.info("Sent answer " + answer); + log.info("Sent " + answer); return answer; } + protected synchronized Object retrieveStep(DetachedRequest request) + throws Exception { + + // Check whether there is a cached object + if (request.getCachedObject() != null) { + Object cachedObj = request.getCachedObject(); + if (log.isTraceEnabled()) + log.trace("Use cached object " + cachedObj + " for request " + + request); + return cachedObj; + } + + // Check its own app context (typically for admin steps) + if (applicationContext.containsBean(request.getRef())) { + try { + Object obj = applicationContext.getBean(request.getRef()); + if (log.isTraceEnabled()) + log.trace("Retrieve from server app context " + obj + + " for request " + request); + return obj; + } catch (Exception e) { + if (log.isTraceEnabled()) + log.trace("Could not retrieve " + request.getRef() + + " from server app context: " + e); + } + } + + // Check whether the source bundle is set + String bundleName = request.getProperties().getProperty( + Constants.BUNDLE_SYMBOLICNAME); + + ApplicationContext sourceAppContext = null; + if (bundleName != null) { + if (!appContextServiceTrackers.containsKey(bundleName)) { + ServiceTracker nSt = new ServiceTracker(bundleContext, + bundleContext.createFilter("(Bundle-SymbolicName=" + + bundleName + ")"), null); + nSt.open(); + appContextServiceTrackers.put(bundleName, nSt); + } + ServiceTracker st = (ServiceTracker) appContextServiceTrackers + .get(bundleName); + sourceAppContext = (ApplicationContext) st.getService(); + if (log.isTraceEnabled()) + log.trace("Use source application context from bundle " + + bundleName); + + Object obj = null; + try { + obj = sourceAppContext.getBean(request.getRef()); + } catch (Exception e) { + if (log.isTraceEnabled()) + log.trace("Could not retrieve " + request.getRef() + + " from app context of " + bundleName + ": " + e); + } + return obj; + } + + // no bundle name specified or it failed + if (!appContextServiceTrackers.containsKey(ALL_APP_CONTEXTS_KEY)) { + ServiceTracker nSt = new ServiceTracker(bundleContext, + ApplicationContext.class.getName(), null); + nSt.open(); + appContextServiceTrackers.put(ALL_APP_CONTEXTS_KEY, nSt); + } + ServiceTracker st = (ServiceTracker) appContextServiceTrackers + .get(ALL_APP_CONTEXTS_KEY); + Object[] arr = st.getServices(); + for (int i = 0; i < arr.length; i++) { + ApplicationContext appC = (ApplicationContext) arr[i]; + if (appC.containsBean(request.getRef())) { + sourceAppContext = appC; + if (log.isTraceEnabled()) + log + .trace("Retrieved source application context " + + "by scanning all published application contexts."); + try { + Object obj = sourceAppContext.getBean(request.getRef()); + return obj; + } catch (Exception e) { + if (log.isTraceEnabled()) + log.trace("Could not retrieve " + request.getRef() + + " from app context " + appC + ": " + e); + } + } + } + + // ServiceReference[] refs = bundleContext.getAllServiceReferences( + // ApplicationContext.class.getName(), null); + // Object obj = null; + // for (int i = 0; i < refs.length; i++) { + // ApplicationContext appContext = (ApplicationContext) + // bundleContext + // .getService(refs[i]); + // try { + // obj = appContext.getBean(request.getRef()); + // } catch (Exception e) { + // // silent + // if (log.isTraceEnabled()) + // log.trace("Could not find ref " + request.getRef(), e); + // } + // if (obj != null) { + // break; + // } + // } + // return obj; + + throw new Exception( + "Сannot find any published application context containing bean " + + request.getRef()); + } + protected synchronized DetachedAnswer processStep(DetachedStep obj, DetachedRequest request) { DetachedAnswer answer; @@ -229,7 +339,8 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer, protected synchronized String dumpSessionsHistory( DetachedRequest requestCurrent, DetachedAnswer answerCurrent) { - StringBuffer buf = new StringBuffer("## SESSIONS HISTORY DUMP\n"); + StringBuffer buf = new StringBuffer( + "##\n## SESSIONS HISTORY DUMP\n##\n"); buf.append("# CURRENT\n"); buf.append("Current session: ").append(getCurrentSession()) .append('\n'); @@ -258,6 +369,7 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer, buf.append("# DETACHED CONTEXT\n"); buf.append(detachedContext).append('\n'); + buf.append("##\n## END OF SESSIONS HISTORY DUMP\n##\n"); return buf.toString(); } @@ -272,4 +384,25 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer, this.bundleContext = bundleContext; } + public void afterPropertiesSet() throws Exception { + log.info("Detached execution server initialized."); + } + + public synchronized void destroy() throws Exception { + Iterator/* */keys = appContextServiceTrackers.keySet() + .iterator(); + while (keys.hasNext()) { + ServiceTracker st = (ServiceTracker) appContextServiceTrackers + .get(keys.next()); + st.close(); + } + appContextServiceTrackers.clear(); + + log.info("Detached execution server closed."); + } + + public void setApplicationContext(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + } diff --git a/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedRequest.java b/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedRequest.java index 8a1976817..af625acd3 100644 --- a/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedRequest.java +++ b/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedRequest.java @@ -11,6 +11,8 @@ public class DetachedRequest implements DetachedCommunication { private String ref; private String path = ""; + private Object cachedObject = null; + public DetachedRequest() { } @@ -59,11 +61,26 @@ public class DetachedRequest implements DetachedCommunication { } public String toString() { - StringBuffer buf = new StringBuffer(getClass().getName()); - buf.append('#').append(uuid); - buf.append(" ref=").append(ref); + StringBuffer buf = new StringBuffer("detached request for ref "); + buf.append(ref); + buf.append(" #").append(uuid); + buf.append(" cachedObject=").append((cachedObject != null)); buf.append(" path=").append(path); buf.append(" properties=").append(properties); return buf.toString(); } + + /** + * Optimization. Allows the driver to eagerly cache the object in the + * request, in order to relieve the detached server of the task to look for + * it. No implementation should rely on this to be set. + */ + public Object getCachedObject() { + return cachedObject; + } + + public void setCachedObject(Object cachedObject) { + this.cachedObject = cachedObject; + } + } diff --git a/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/AbstractDriver.java b/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/AbstractDriver.java index 78e82df05..a8b3dffa6 100644 --- a/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/AbstractDriver.java +++ b/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/AbstractDriver.java @@ -1,18 +1,36 @@ package org.argeo.slc.detached.drivers; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.argeo.slc.detached.DetachedAnswer; import org.argeo.slc.detached.DetachedDriver; import org.argeo.slc.detached.DetachedExecutionServer; import org.argeo.slc.detached.DetachedRequest; import org.argeo.slc.detached.DetachedXmlConverter; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.osgi.context.BundleContextAware; + +public abstract class AbstractDriver implements DetachedDriver, + BundleContextAware, ApplicationContextAware { + private final static Log log = LogFactory.getLog(AbstractDriver.class); -public abstract class AbstractDriver implements DetachedDriver { private boolean active = true; private DetachedExecutionServer executionServer = null; private long receiveAnswerTimeout = 10000l; private DetachedXmlConverter xmlConverter = null; + private boolean cacheObjects = true; + + /** May be null */ + private ApplicationContext applicationContext; + /** May be null */ + private BundleContext bundleContext; + public synchronized void start() { Thread driverThread = new Thread(new Runnable() { @@ -23,19 +41,62 @@ public abstract class AbstractDriver implements DetachedDriver { DetachedRequest request = receiveRequest(); if (!active) break; + + String driverBundleName = null; + if (bundleContext != null) + driverBundleName = bundleContext.getBundle() + .getSymbolicName(); + + if (applicationContext != null && cacheObjects) { + try { + String ref = request.getRef(); + if (applicationContext.containsBean(ref)) { + Object obj = applicationContext + .getBean(request.getRef()); + request.setCachedObject(obj); + if (log.isTraceEnabled()) + log.trace("Cached bean '" + ref + + "' in request " + request); + } else { + log + .warn("Cannot cache object in request because no bean '" + + ref + + "' was found in application context" + + (driverBundleName != null ? " (driver bundle " + + driverBundleName + + ")" + : "")); + } + } catch (Exception e) { + if (log.isTraceEnabled()) + log + .trace("Could not retrieve " + + request.getRef() + + " from driver application context because of " + + e); + driverBundleName = null;// do not publish bundle + // name + } + } + + if (driverBundleName != null) + request.getProperties().put( + Constants.BUNDLE_SYMBOLICNAME, + driverBundleName); + DetachedAnswer answer = executionServer .executeRequest(request); sendAnswer(answer); } catch (Exception e) { -// if (e instanceof RuntimeException) -// throw (RuntimeException) e; -// else - e.printStackTrace(); + // if (e instanceof RuntimeException) + // throw (RuntimeException) e; + // else + e.printStackTrace(); } } } - }, "driverThread ("+getClass()+")"); + }, "driverThread (" + getClass() + ")"); driverThread.start(); } @@ -73,4 +134,17 @@ public abstract class AbstractDriver implements DetachedDriver { this.receiveAnswerTimeout = reveiveTimeout; } + public void setApplicationContext(ApplicationContext applicationContext) + throws BeansException { + this.applicationContext = applicationContext; + } + + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; + } + + public void setCacheObjects(boolean cacheObjects) { + this.cacheObjects = cacheObjects; + } + } -- 2.39.2