]> git.argeo.org Git - gpl/argeo-slc.git/blobdiff - runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java
Clean up directories
[gpl/argeo-slc.git] / runtime / org.argeo.slc.detached / src / main / java / org / argeo / slc / detached / DetachedExecutionServerImpl.java
index 35b9d63d537ffb6d5ce15e792b04907a9588fa78..7eaa347aa37e6ad1aa64b34cea08abf2473334ff 100644 (file)
@@ -1,56 +1,77 @@
+/*
+ * Copyright (C) 2007-2012 Argeo GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.argeo.slc.detached;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
-import java.util.Vector;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.argeo.slc.detached.admin.CloseSession;
 import org.argeo.slc.detached.admin.OpenSession;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
+import org.osgi.framework.Constants;
+import org.osgi.util.tracker.ServiceTracker;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
 import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 import org.springframework.osgi.context.BundleContextAware;
 
+/** Default implementation of a detached server. */
 public class DetachedExecutionServerImpl implements DetachedExecutionServer,
-               BundleContextAware {
+               BundleContextAware, InitializingBean, DisposableBean,
+               ApplicationContextAware {
        private final static Log log = LogFactory
                        .getLog(DetachedExecutionServerImpl.class);
 
        private final DetachedContextImpl detachedContext;
-       private final List sessions;
 
-       private int skipCount = 1;// start skipCount at 1 since the first step is
-       // always an open session
+       private DetachedSession currentSession;
+
+       /**
+        * Session being replayed, skipping the steps in the current session. If
+        * null, no session is replayed
+        */
+       private DetachedSession replayedSession = null;
 
        private BundleContext bundleContext;
+       private ApplicationContext applicationContext;
+
+       private final static String ALL_APP_CONTEXTS_KEY = "__allApplicationContexts";
+
+       private Map/* <String,ServiceTracker> */appContextServiceTrackers = Collections
+                       .synchronizedMap(new HashMap());
 
        public DetachedExecutionServerImpl() {
                detachedContext = new DetachedContextImpl();
-               sessions = new Vector();
+               currentSession = new DetachedSession();
+               currentSession.setUuid(Long.toString(System.currentTimeMillis()));              
        }
 
        public synchronized DetachedAnswer executeRequest(DetachedRequest request) {
+               if(log.isDebugEnabled())
+                       log.debug("Received " + request);
+
                DetachedAnswer answer = null;
                try {
-                       // Find action
-                       ServiceReference[] refs = bundleContext.getAllServiceReferences(
-                                       ApplicationContext.class.getName(), null);
-                       Object obj = null;
-                       for (int i = 0; i < refs.length; i++) {
-                               ApplicationContext appContext = (ApplicationContext) bundleContext
-                                               .getService(refs[i]);
-                               try {
-                                       obj = appContext.getBean(request.getRef());
-                               } catch (Exception e) {
-                                       // silent
-                                       if (log.isTraceEnabled())
-                                               log.trace("Could not find ref " + request.getRef(), e);
-                               }
-                               if (obj != null) {
-                                       break;
-                               }
-                       }
+                       Object obj = retrieveStep(request);
 
                        if (obj == null)
                                throw new DetachedException("Could not find action with ref "
@@ -73,68 +94,162 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer,
                } catch (Exception e) {
                        answer = new DetachedAnswer(request);
                        answer.setStatus(DetachedAnswer.ERROR);
-                       answer.setLog(e.getMessage());
+                       log.error("Error executing request " + request, e);
                }
-               getCurrentSession().getRequests().add(request);
-               getCurrentSession().getAnswers().add(answer);
-               if (log.isDebugEnabled())
-                       log.debug("Processed '" + request.getRef() + "' (status="
-                                       + answer.getStatusAsString() + ", path="
-                                       + request.getPath() + ")");
+
+               currentSession.getRequests().add(request);
+               currentSession.getAnswers().add(answer);
+               if(log.isDebugEnabled())
+                       log.debug("Sent " + answer);
                return answer;
        }
 
-       protected DetachedAnswer processStep(DetachedStep obj,
+       protected synchronized Object retrieveStep(DetachedRequest request)
+                       throws Exception {
+
+               // Check whether there is a cached object
+               if (request.getCachedObject() != null) {
+                       Object cachedObj = request.getCachedObject();
+                       if (log.isTraceEnabled())
+                               log.trace("Use cached object " + cachedObj + " for request "
+                                               + request);
+                       return cachedObj;
+               }
+
+               // Check its own app context (typically for admin steps)
+               if (applicationContext.containsBean(request.getRef())) {
+                       try {
+                               Object obj = applicationContext.getBean(request.getRef());
+                               if (log.isTraceEnabled())
+                                       log.trace("Retrieve from server app context " + obj
+                                                       + " for request " + request);
+                               return obj;
+                       } catch (Exception e) {
+                               if (log.isTraceEnabled())
+                                       log.trace("Could not retrieve " + request.getRef()
+                                                       + " from server app context: " + e);
+                       }
+               }
+
+               // Check whether the source bundle is set
+               String bundleName = request.getProperties().getProperty(
+                               Constants.BUNDLE_SYMBOLICNAME);
+
+               ApplicationContext sourceAppContext = null;
+               if (bundleName != null) {
+                       if (!appContextServiceTrackers.containsKey(bundleName)) {
+                               ServiceTracker nSt = new ServiceTracker(bundleContext,
+                                               bundleContext.createFilter("(Bundle-SymbolicName="
+                                                               + bundleName + ")"), null);
+                               nSt.open();
+                               appContextServiceTrackers.put(bundleName, nSt);
+                       }
+                       ServiceTracker st = (ServiceTracker) appContextServiceTrackers
+                                       .get(bundleName);
+                       sourceAppContext = (ApplicationContext) st.getService();
+                       if (log.isTraceEnabled())
+                               log.trace("Use source application context from bundle "
+                                               + bundleName);
+
+                       Object obj = null;
+                       try {
+                               obj = sourceAppContext.getBean(request.getRef());
+                       } catch (Exception e) {
+                               if (log.isTraceEnabled())
+                                       log.trace("Could not retrieve " + request.getRef()
+                                                       + " from app context of " + bundleName + ": " + e);
+                       }
+                       return obj;
+               }
+
+               // no bundle name specified or it failed
+               if (!appContextServiceTrackers.containsKey(ALL_APP_CONTEXTS_KEY)) {
+                       ServiceTracker nSt = new ServiceTracker(bundleContext,
+                                       ApplicationContext.class.getName(), null);
+                       nSt.open();
+                       appContextServiceTrackers.put(ALL_APP_CONTEXTS_KEY, nSt);
+               }
+               ServiceTracker st = (ServiceTracker) appContextServiceTrackers
+                               .get(ALL_APP_CONTEXTS_KEY);
+               Object[] arr = st.getServices();
+               for (int i = 0; i < arr.length; i++) {
+                       ApplicationContext appC = (ApplicationContext) arr[i];
+                       if (appC.containsBean(request.getRef())) {
+                               sourceAppContext = appC;
+                               if (log.isTraceEnabled())
+                                       log
+                                                       .trace("Retrieved source application context "
+                                                                       + "by scanning all published application contexts.");
+                               try {
+                                       Object obj = sourceAppContext.getBean(request.getRef());
+                                       return obj;
+                               } catch (Exception e) {
+                                       if (log.isTraceEnabled())
+                                               log.trace("Could not retrieve " + request.getRef()
+                                                               + " from app context " + appC + ": " + e);
+                               }
+                       }
+               }
+
+               throw new Exception(
+                               "Cannot find any published application context containing bean "
+                                               + request.getRef());
+       }
+
+       protected synchronized DetachedAnswer processStep(DetachedStep obj,
                        DetachedRequest request) {
                DetachedAnswer answer;
-               if (getCurrentSession() == null)
-                       throw new DetachedException("No open session.");
-
+               
                StringBuffer skippedLog = new StringBuffer();
                boolean execute = true;
-               if (getPreviousSession() != null && !getPreviousSession().isClosed()) {
-                       if (getCurrentSession().getDoItAgainPolicy().equals(
-                                       DetachedSession.SKIP_UNTIL_ERROR)) {
-                               // Skip execution of already successful steps
-                               if (getPreviousSession().getAnswers().size() > skipCount) {
-                                       DetachedAnswer previousAnswer = (DetachedAnswer) getPreviousSession()
-                                                       .getAnswers().get(skipCount);
-                                       DetachedRequest previousRequest = (DetachedRequest) getPreviousSession()
-                                                       .getRequests().get(skipCount);
-                                       // Check paths
-                                       if (!previousRequest.getPath().equals(request.getPath())) {
-                                               String msg = "New request is not consistent with previous path. previousPath="
-                                                               + previousRequest.getPath()
-                                                               + ", newPath="
-                                                               + request.getPath() + "\n";
-                                               skippedLog.append(msg);
-                                               log.warn(msg);
-                                       }
-
-                                       if (previousAnswer.getStatus() != DetachedAnswer.ERROR) {
-                                               execute = false;
-                                               String msg = "Skipped path " + request.getPath()
-                                                               + " (skipCount=" + skipCount + ")";
-                                               skippedLog.append(msg);
-                                               log.info(msg);
-                                               skipCount++;
-                                       } else {
-                                               log
-                                                               .info("Path "
-                                                                               + request.getPath()
-                                                                               + " was previously in error, executing it again."
-                                                                               + " (skipCount=" + skipCount
-                                                                               + "). Reset skip count to 1");
-                                               skipCount = 1;
-                                       }
+
+               if (replayedSession != null) {
+                       // Skip execution of already successful steps
+                       int stepIndex = currentSession.getExecutedStepCount();
+
+                       if (stepIndex < replayedSession.getExecutedStepCount()) {
+                               DetachedAnswer previousAnswer = (DetachedAnswer) replayedSession
+                                               .getAnswers().get(stepIndex);
+                               DetachedRequest previousRequest = (DetachedRequest) replayedSession
+                                               .getRequests().get(stepIndex);
+
+                               // check step names                             
+                               if (!previousRequest.getRef().equals(request.getRef())) {
+                                       String msg = "New request is not consistent with previous ref. previousRef="
+                                                       + previousRequest.getRef()
+                                                       + ", newRef="
+                                                       + request.getRef() + "\n";
+                                       skippedLog.append(msg);
+                                       log.warn(msg);
+                               }                               
+                               
+                               if (previousAnswer.getStatus() != DetachedAnswer.ERROR) {
+                                       // if no error occurred in the replayedSession,
+                                       // skip the step
+                                       execute = false;
+                                       String msg = "Skipped Step " + request.getRef()
+                                                       + " (stepIndex=" + stepIndex + ")";
+                                       skippedLog.append(msg);
+                                       log.info(msg);
+
                                } else {
-                                       // went further as skip count, doing nothing.
+                                       // if an error occurred, execute the step and leave
+                                       // skipUntillError mode (even if replayedSession
+                                       // has more steps)
+                                       log.info("### End of SkipUntilError Mode ###");
+                                       log.info("Step " + request.getRef()
+                                                       + " was previously in error, executing it again."
+                                                       + " (stepIndex=" + stepIndex + ").");
+                                       replayedSession = null;
                                }
+                       } else {
+                               // went further as skip count, doing nothing.
                        }
                }
 
                if (execute) {
                        DetachedStep step = (DetachedStep) obj;
+                       // Actually execute the step
                        answer = step.execute(detachedContext, request);
                } else {
                        answer = new DetachedAnswer(request);
@@ -144,22 +259,31 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer,
                return answer;
        }
 
-       protected DetachedAnswer processAdminCommand(DetachedAdminCommand obj,
-                       DetachedRequest request) {
+       protected synchronized DetachedAnswer processAdminCommand(
+                       DetachedAdminCommand obj, DetachedRequest request) {
                DetachedAnswer answer;
                if (obj instanceof OpenSession) {
-                       if (getCurrentSession() != null)
-                               throw new DetachedException(
-                                               "There is already an open session #"
-                                                               + getCurrentSession().getUuid());
-                       sessions.add(((OpenSession) obj).execute(request, bundleContext));
+                       DetachedSession newSession = ((OpenSession) obj).execute(request,
+                                       bundleContext);
+
+                       log.debug("Creating new DetachedSession : " + newSession);
+
+                       if ((currentSession != null) && currentSession.lastActionIsError()
+                                       && DetachedSession.SKIP_UNTIL_ERROR.equals(newSession.getDoItAgainPolicy())) {
+                               // switch to replay mode
+                               log.info("### Start SkipUntilError Mode ###");
+                               replayedSession = currentSession;
+                       }
+
+                       currentSession = newSession;
+
                        answer = new DetachedAnswer(request, "Session #"
-                                       + getCurrentSession().getUuid() + " open.");
+                                       + currentSession.getUuid() + " open.");
                } else if (obj instanceof CloseSession) {
-                       if (getCurrentSession() == null)
+                       if (currentSession == null)
                                throw new DetachedException("There is no open session to close");
                        answer = new DetachedAnswer(request, "Session #"
-                                       + getCurrentSession().getUuid() + " closed.");
+                                       + currentSession.getUuid() + " closed.");
                        answer.setStatus(DetachedAnswer.CLOSED_SESSION);
                } else {
                        answer = null;
@@ -167,33 +291,61 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer,
                return answer;
        }
 
-       protected final DetachedSession getCurrentSession() {
-               if (sessions.size() == 0) {
-                       return null;
-               } else {
-                       DetachedSession session = (DetachedSession) sessions.get(sessions
-                                       .size() - 1);
-                       List answers = session.getAnswers();
-                       if (answers.size() > 0) {
-                               DetachedAnswer lastAnswer = (DetachedAnswer) answers
-                                               .get(answers.size() - 1);
-                               if (lastAnswer.getStatus() == DetachedAnswer.ERROR
-                                               || lastAnswer.getStatus() == DetachedAnswer.CLOSED_SESSION)
-                                       return null;
+       protected synchronized String dumpSessionsHistory(
+                       DetachedRequest requestCurrent, DetachedAnswer answerCurrent) {
+               StringBuffer buf = new StringBuffer(
+                               "##\n## SESSIONS HISTORY DUMP\n##\n");
+               buf.append("# CURRENT\n");
+               buf.append("Current session: ").append(currentSession)
+                               .append('\n');
+               buf.append("Current request: ").append(requestCurrent).append('\n');
+               buf.append("Current answer: ").append(answerCurrent).append('\n');
+
+               buf.append("# CURRENT SESSION\n");
+
+               List requests = currentSession.getRequests();
+               List answers = currentSession.getAnswers();
+               for (int j = 0; j < requests.size(); j++) {
+                       DetachedRequest request = (DetachedRequest) requests.get(j);
+                       buf.append('\t').append(j).append(". ").append(request)
+                                       .append('\n');
+                       if (answers.size() > j) {
+                               DetachedAnswer answer = (DetachedAnswer) answers.get(j);
+                               buf.append('\t').append(j).append(". ").append(answer).append(
+                                               '\n');
                        }
-                       return session;
                }
-       }
 
-       protected final DetachedSession getPreviousSession() {
-               if (sessions.size() < 2)
-                       return null;
-               else
-                       return (DetachedSession) sessions.get(sessions.size() - 2);
+               buf.append("# DETACHED CONTEXT\n");
+               buf.append(detachedContext).append('\n');
+
+               buf.append("##\n## END OF SESSIONS HISTORY DUMP\n##\n");
+               return buf.toString();
        }
 
        public void setBundleContext(BundleContext bundleContext) {
                this.bundleContext = bundleContext;
        }
 
+       public void afterPropertiesSet() throws Exception {
+               log.debug("Detached execution server initialized.");
+       }
+
+       public synchronized void destroy() throws Exception {
+               Iterator/* <String> */keys = appContextServiceTrackers.keySet()
+                               .iterator();
+               while (keys.hasNext()) {
+                       ServiceTracker st = (ServiceTracker) appContextServiceTrackers
+                                       .get(keys.next());
+                       st.close();
+               }
+               appContextServiceTrackers.clear();
+
+               log.debug("Detached execution server closed.");
+       }
+
+       public void setApplicationContext(ApplicationContext applicationContext) {
+               this.applicationContext = applicationContext;
+       }
+
 }