X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=runtime%2Forg.argeo.slc.detached%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fdetached%2FDetachedExecutionServerImpl.java;h=7eaa347aa37e6ad1aa64b34cea08abf2473334ff;hb=bd07be7603d234ac496652aaa07ded77d4a2a292;hp=35b9d63d537ffb6d5ce15e792b04907a9588fa78;hpb=8279efc2b92ba454b24ad4d06e7878b00836c07d;p=gpl%2Fargeo-slc.git diff --git a/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java b/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java index 35b9d63d5..7eaa347aa 100644 --- a/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java +++ b/runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java @@ -1,56 +1,77 @@ +/* + * Copyright (C) 2007-2012 Argeo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.argeo.slc.detached; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; -import java.util.Vector; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.argeo.slc.detached.admin.CloseSession; import org.argeo.slc.detached.admin.OpenSession; import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceReference; +import org.osgi.framework.Constants; +import org.osgi.util.tracker.ServiceTracker; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.osgi.context.BundleContextAware; +/** Default implementation of a detached server. */ public class DetachedExecutionServerImpl implements DetachedExecutionServer, - BundleContextAware { + BundleContextAware, InitializingBean, DisposableBean, + ApplicationContextAware { private final static Log log = LogFactory .getLog(DetachedExecutionServerImpl.class); private final DetachedContextImpl detachedContext; - private final List sessions; - private int skipCount = 1;// start skipCount at 1 since the first step is - // always an open session + private DetachedSession currentSession; + + /** + * Session being replayed, skipping the steps in the current session. If + * null, no session is replayed + */ + private DetachedSession replayedSession = null; private BundleContext bundleContext; + private ApplicationContext applicationContext; + + private final static String ALL_APP_CONTEXTS_KEY = "__allApplicationContexts"; + + private Map/* */appContextServiceTrackers = Collections + .synchronizedMap(new HashMap()); public DetachedExecutionServerImpl() { detachedContext = new DetachedContextImpl(); - sessions = new Vector(); + currentSession = new DetachedSession(); + currentSession.setUuid(Long.toString(System.currentTimeMillis())); } public synchronized DetachedAnswer executeRequest(DetachedRequest request) { + if(log.isDebugEnabled()) + log.debug("Received " + request); + DetachedAnswer answer = null; try { - // Find action - ServiceReference[] refs = bundleContext.getAllServiceReferences( - ApplicationContext.class.getName(), null); - Object obj = null; - for (int i = 0; i < refs.length; i++) { - ApplicationContext appContext = (ApplicationContext) bundleContext - .getService(refs[i]); - try { - obj = appContext.getBean(request.getRef()); - } catch (Exception e) { - // silent - if (log.isTraceEnabled()) - log.trace("Could not find ref " + request.getRef(), e); - } - if (obj != null) { - break; - } - } + Object obj = retrieveStep(request); if (obj == null) throw new DetachedException("Could not find action with ref " @@ -73,68 +94,162 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer, } catch (Exception e) { answer = new DetachedAnswer(request); answer.setStatus(DetachedAnswer.ERROR); - answer.setLog(e.getMessage()); + log.error("Error executing request " + request, e); } - getCurrentSession().getRequests().add(request); - getCurrentSession().getAnswers().add(answer); - if (log.isDebugEnabled()) - log.debug("Processed '" + request.getRef() + "' (status=" - + answer.getStatusAsString() + ", path=" - + request.getPath() + ")"); + + currentSession.getRequests().add(request); + currentSession.getAnswers().add(answer); + if(log.isDebugEnabled()) + log.debug("Sent " + answer); return answer; } - protected DetachedAnswer processStep(DetachedStep obj, + protected synchronized Object retrieveStep(DetachedRequest request) + throws Exception { + + // Check whether there is a cached object + if (request.getCachedObject() != null) { + Object cachedObj = request.getCachedObject(); + if (log.isTraceEnabled()) + log.trace("Use cached object " + cachedObj + " for request " + + request); + return cachedObj; + } + + // Check its own app context (typically for admin steps) + if (applicationContext.containsBean(request.getRef())) { + try { + Object obj = applicationContext.getBean(request.getRef()); + if (log.isTraceEnabled()) + log.trace("Retrieve from server app context " + obj + + " for request " + request); + return obj; + } catch (Exception e) { + if (log.isTraceEnabled()) + log.trace("Could not retrieve " + request.getRef() + + " from server app context: " + e); + } + } + + // Check whether the source bundle is set + String bundleName = request.getProperties().getProperty( + Constants.BUNDLE_SYMBOLICNAME); + + ApplicationContext sourceAppContext = null; + if (bundleName != null) { + if (!appContextServiceTrackers.containsKey(bundleName)) { + ServiceTracker nSt = new ServiceTracker(bundleContext, + bundleContext.createFilter("(Bundle-SymbolicName=" + + bundleName + ")"), null); + nSt.open(); + appContextServiceTrackers.put(bundleName, nSt); + } + ServiceTracker st = (ServiceTracker) appContextServiceTrackers + .get(bundleName); + sourceAppContext = (ApplicationContext) st.getService(); + if (log.isTraceEnabled()) + log.trace("Use source application context from bundle " + + bundleName); + + Object obj = null; + try { + obj = sourceAppContext.getBean(request.getRef()); + } catch (Exception e) { + if (log.isTraceEnabled()) + log.trace("Could not retrieve " + request.getRef() + + " from app context of " + bundleName + ": " + e); + } + return obj; + } + + // no bundle name specified or it failed + if (!appContextServiceTrackers.containsKey(ALL_APP_CONTEXTS_KEY)) { + ServiceTracker nSt = new ServiceTracker(bundleContext, + ApplicationContext.class.getName(), null); + nSt.open(); + appContextServiceTrackers.put(ALL_APP_CONTEXTS_KEY, nSt); + } + ServiceTracker st = (ServiceTracker) appContextServiceTrackers + .get(ALL_APP_CONTEXTS_KEY); + Object[] arr = st.getServices(); + for (int i = 0; i < arr.length; i++) { + ApplicationContext appC = (ApplicationContext) arr[i]; + if (appC.containsBean(request.getRef())) { + sourceAppContext = appC; + if (log.isTraceEnabled()) + log + .trace("Retrieved source application context " + + "by scanning all published application contexts."); + try { + Object obj = sourceAppContext.getBean(request.getRef()); + return obj; + } catch (Exception e) { + if (log.isTraceEnabled()) + log.trace("Could not retrieve " + request.getRef() + + " from app context " + appC + ": " + e); + } + } + } + + throw new Exception( + "Cannot find any published application context containing bean " + + request.getRef()); + } + + protected synchronized DetachedAnswer processStep(DetachedStep obj, DetachedRequest request) { DetachedAnswer answer; - if (getCurrentSession() == null) - throw new DetachedException("No open session."); - + StringBuffer skippedLog = new StringBuffer(); boolean execute = true; - if (getPreviousSession() != null && !getPreviousSession().isClosed()) { - if (getCurrentSession().getDoItAgainPolicy().equals( - DetachedSession.SKIP_UNTIL_ERROR)) { - // Skip execution of already successful steps - if (getPreviousSession().getAnswers().size() > skipCount) { - DetachedAnswer previousAnswer = (DetachedAnswer) getPreviousSession() - .getAnswers().get(skipCount); - DetachedRequest previousRequest = (DetachedRequest) getPreviousSession() - .getRequests().get(skipCount); - // Check paths - if (!previousRequest.getPath().equals(request.getPath())) { - String msg = "New request is not consistent with previous path. previousPath=" - + previousRequest.getPath() - + ", newPath=" - + request.getPath() + "\n"; - skippedLog.append(msg); - log.warn(msg); - } - - if (previousAnswer.getStatus() != DetachedAnswer.ERROR) { - execute = false; - String msg = "Skipped path " + request.getPath() - + " (skipCount=" + skipCount + ")"; - skippedLog.append(msg); - log.info(msg); - skipCount++; - } else { - log - .info("Path " - + request.getPath() - + " was previously in error, executing it again." - + " (skipCount=" + skipCount - + "). Reset skip count to 1"); - skipCount = 1; - } + + if (replayedSession != null) { + // Skip execution of already successful steps + int stepIndex = currentSession.getExecutedStepCount(); + + if (stepIndex < replayedSession.getExecutedStepCount()) { + DetachedAnswer previousAnswer = (DetachedAnswer) replayedSession + .getAnswers().get(stepIndex); + DetachedRequest previousRequest = (DetachedRequest) replayedSession + .getRequests().get(stepIndex); + + // check step names + if (!previousRequest.getRef().equals(request.getRef())) { + String msg = "New request is not consistent with previous ref. previousRef=" + + previousRequest.getRef() + + ", newRef=" + + request.getRef() + "\n"; + skippedLog.append(msg); + log.warn(msg); + } + + if (previousAnswer.getStatus() != DetachedAnswer.ERROR) { + // if no error occurred in the replayedSession, + // skip the step + execute = false; + String msg = "Skipped Step " + request.getRef() + + " (stepIndex=" + stepIndex + ")"; + skippedLog.append(msg); + log.info(msg); + } else { - // went further as skip count, doing nothing. + // if an error occurred, execute the step and leave + // skipUntillError mode (even if replayedSession + // has more steps) + log.info("### End of SkipUntilError Mode ###"); + log.info("Step " + request.getRef() + + " was previously in error, executing it again." + + " (stepIndex=" + stepIndex + ")."); + replayedSession = null; } + } else { + // went further as skip count, doing nothing. } } if (execute) { DetachedStep step = (DetachedStep) obj; + // Actually execute the step answer = step.execute(detachedContext, request); } else { answer = new DetachedAnswer(request); @@ -144,22 +259,31 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer, return answer; } - protected DetachedAnswer processAdminCommand(DetachedAdminCommand obj, - DetachedRequest request) { + protected synchronized DetachedAnswer processAdminCommand( + DetachedAdminCommand obj, DetachedRequest request) { DetachedAnswer answer; if (obj instanceof OpenSession) { - if (getCurrentSession() != null) - throw new DetachedException( - "There is already an open session #" - + getCurrentSession().getUuid()); - sessions.add(((OpenSession) obj).execute(request, bundleContext)); + DetachedSession newSession = ((OpenSession) obj).execute(request, + bundleContext); + + log.debug("Creating new DetachedSession : " + newSession); + + if ((currentSession != null) && currentSession.lastActionIsError() + && DetachedSession.SKIP_UNTIL_ERROR.equals(newSession.getDoItAgainPolicy())) { + // switch to replay mode + log.info("### Start SkipUntilError Mode ###"); + replayedSession = currentSession; + } + + currentSession = newSession; + answer = new DetachedAnswer(request, "Session #" - + getCurrentSession().getUuid() + " open."); + + currentSession.getUuid() + " open."); } else if (obj instanceof CloseSession) { - if (getCurrentSession() == null) + if (currentSession == null) throw new DetachedException("There is no open session to close"); answer = new DetachedAnswer(request, "Session #" - + getCurrentSession().getUuid() + " closed."); + + currentSession.getUuid() + " closed."); answer.setStatus(DetachedAnswer.CLOSED_SESSION); } else { answer = null; @@ -167,33 +291,61 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer, return answer; } - protected final DetachedSession getCurrentSession() { - if (sessions.size() == 0) { - return null; - } else { - DetachedSession session = (DetachedSession) sessions.get(sessions - .size() - 1); - List answers = session.getAnswers(); - if (answers.size() > 0) { - DetachedAnswer lastAnswer = (DetachedAnswer) answers - .get(answers.size() - 1); - if (lastAnswer.getStatus() == DetachedAnswer.ERROR - || lastAnswer.getStatus() == DetachedAnswer.CLOSED_SESSION) - return null; + protected synchronized String dumpSessionsHistory( + DetachedRequest requestCurrent, DetachedAnswer answerCurrent) { + StringBuffer buf = new StringBuffer( + "##\n## SESSIONS HISTORY DUMP\n##\n"); + buf.append("# CURRENT\n"); + buf.append("Current session: ").append(currentSession) + .append('\n'); + buf.append("Current request: ").append(requestCurrent).append('\n'); + buf.append("Current answer: ").append(answerCurrent).append('\n'); + + buf.append("# CURRENT SESSION\n"); + + List requests = currentSession.getRequests(); + List answers = currentSession.getAnswers(); + for (int j = 0; j < requests.size(); j++) { + DetachedRequest request = (DetachedRequest) requests.get(j); + buf.append('\t').append(j).append(". ").append(request) + .append('\n'); + if (answers.size() > j) { + DetachedAnswer answer = (DetachedAnswer) answers.get(j); + buf.append('\t').append(j).append(". ").append(answer).append( + '\n'); } - return session; } - } - protected final DetachedSession getPreviousSession() { - if (sessions.size() < 2) - return null; - else - return (DetachedSession) sessions.get(sessions.size() - 2); + buf.append("# DETACHED CONTEXT\n"); + buf.append(detachedContext).append('\n'); + + buf.append("##\n## END OF SESSIONS HISTORY DUMP\n##\n"); + return buf.toString(); } public void setBundleContext(BundleContext bundleContext) { this.bundleContext = bundleContext; } + public void afterPropertiesSet() throws Exception { + log.debug("Detached execution server initialized."); + } + + public synchronized void destroy() throws Exception { + Iterator/* */keys = appContextServiceTrackers.keySet() + .iterator(); + while (keys.hasNext()) { + ServiceTracker st = (ServiceTracker) appContextServiceTrackers + .get(keys.next()); + st.close(); + } + appContextServiceTrackers.clear(); + + log.debug("Detached execution server closed."); + } + + public void setApplicationContext(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + }