X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=org.argeo.slc.detached%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fdetached%2FDetachedExecutionServerImpl.java;h=35b9d63d537ffb6d5ce15e792b04907a9588fa78;hb=f198cdf84f9c6b5312825672450f92bc8451ede3;hp=d21ec2dfc5e47a4016739d17baf665dd44269301;hpb=ca0badfce48057d058b0bf9f0dc2efe07affb46b;p=gpl%2Fargeo-slc.git diff --git a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java index d21ec2dfc..35b9d63d5 100644 --- a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java +++ b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java @@ -1,79 +1,199 @@ package org.argeo.slc.detached; +import java.util.List; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.argeo.slc.detached.admin.CloseSession; +import org.argeo.slc.detached.admin.OpenSession; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceReference; +import org.springframework.context.ApplicationContext; +import org.springframework.osgi.context.BundleContextAware; -public class DetachedExecutionServerImpl implements DetachedExecutionServer { - private final DetachedContextImpl detachedContext; +public class DetachedExecutionServerImpl implements DetachedExecutionServer, + BundleContextAware { + private final static Log log = LogFactory + .getLog(DetachedExecutionServerImpl.class); - private BundleContext bundleContext; - private DetachedDriver driver; + private final DetachedContextImpl detachedContext; + private final List sessions; - private boolean active = false; + private int skipCount = 1;// start skipCount at 1 since the first step is + // always an open session - public void setDriver(DetachedDriver driver) { - this.driver = driver; - } + private BundleContext bundleContext; public DetachedExecutionServerImpl() { detachedContext = new DetachedContextImpl(); + sessions = new Vector(); } - public DetachedStepAnswer executeStep(DetachedStepRequest request) { + public synchronized DetachedAnswer executeRequest(DetachedRequest request) { + DetachedAnswer answer = null; try { - DetachedStep step = null; - - // Find step + // Find action ServiceReference[] refs = bundleContext.getAllServiceReferences( - StaticRefProvider.class.getName(), null); + ApplicationContext.class.getName(), null); + Object obj = null; for (int i = 0; i < refs.length; i++) { - StaticRefProvider provider = (StaticRefProvider) bundleContext + ApplicationContext appContext = (ApplicationContext) bundleContext .getService(refs[i]); - Object obj = provider.getStaticRef(request.getStepRef()); + try { + obj = appContext.getBean(request.getRef()); + } catch (Exception e) { + // silent + if (log.isTraceEnabled()) + log.trace("Could not find ref " + request.getRef(), e); + } if (obj != null) { - step = (DetachedStep) obj; break; } } - if (step == null) - throw new DetachedException("Could not find step with ref " - + request.getStepRef()); + if (obj == null) + throw new DetachedException("Could not find action with ref " + + request.getRef()); + + // Execute actions + if (obj instanceof DetachedStep) { + answer = processStep((DetachedStep) obj, request); + + } else if (obj instanceof DetachedAdminCommand) { + answer = processAdminCommand((DetachedAdminCommand) obj, + request); + } - return step.execute(detachedContext, request); - } catch (DetachedException e) { - throw e; + if (answer == null) { + throw new DetachedException("Unknown action type " + + obj.getClass() + " for action with ref " + + request.getRef()); + } } catch (Exception e) { - e.printStackTrace(); - throw new DetachedException( - "Unexpected exception while executing request " + request, - e); + answer = new DetachedAnswer(request); + answer.setStatus(DetachedAnswer.ERROR); + answer.setLog(e.getMessage()); } + getCurrentSession().getRequests().add(request); + getCurrentSession().getAnswers().add(answer); + if (log.isDebugEnabled()) + log.debug("Processed '" + request.getRef() + "' (status=" + + answer.getStatusAsString() + ", path=" + + request.getPath() + ")"); + return answer; } - public void init(BundleContext bundleContext) { - this.bundleContext = bundleContext; - Thread driverThread = new Thread(new Runnable() { - - public void run() { - while (active) { - try { - DetachedStepRequest request = driver.receiveRequest(); - executeStep(request); - } catch (Exception e) { - if (e instanceof RuntimeException) - throw (RuntimeException) e; - else - e.printStackTrace(); + protected DetachedAnswer processStep(DetachedStep obj, + DetachedRequest request) { + DetachedAnswer answer; + if (getCurrentSession() == null) + throw new DetachedException("No open session."); + + StringBuffer skippedLog = new StringBuffer(); + boolean execute = true; + if (getPreviousSession() != null && !getPreviousSession().isClosed()) { + if (getCurrentSession().getDoItAgainPolicy().equals( + DetachedSession.SKIP_UNTIL_ERROR)) { + // Skip execution of already successful steps + if (getPreviousSession().getAnswers().size() > skipCount) { + DetachedAnswer previousAnswer = (DetachedAnswer) getPreviousSession() + .getAnswers().get(skipCount); + DetachedRequest previousRequest = (DetachedRequest) getPreviousSession() + .getRequests().get(skipCount); + // Check paths + if (!previousRequest.getPath().equals(request.getPath())) { + String msg = "New request is not consistent with previous path. previousPath=" + + previousRequest.getPath() + + ", newPath=" + + request.getPath() + "\n"; + skippedLog.append(msg); + log.warn(msg); } + + if (previousAnswer.getStatus() != DetachedAnswer.ERROR) { + execute = false; + String msg = "Skipped path " + request.getPath() + + " (skipCount=" + skipCount + ")"; + skippedLog.append(msg); + log.info(msg); + skipCount++; + } else { + log + .info("Path " + + request.getPath() + + " was previously in error, executing it again." + + " (skipCount=" + skipCount + + "). Reset skip count to 1"); + skipCount = 1; + } + } else { + // went further as skip count, doing nothing. } + } + } + if (execute) { + DetachedStep step = (DetachedStep) obj; + answer = step.execute(detachedContext, request); + } else { + answer = new DetachedAnswer(request); + answer.setStatus(DetachedAnswer.SKIPPED); + answer.setLog(skippedLog.toString()); + } + return answer; + } + + protected DetachedAnswer processAdminCommand(DetachedAdminCommand obj, + DetachedRequest request) { + DetachedAnswer answer; + if (obj instanceof OpenSession) { + if (getCurrentSession() != null) + throw new DetachedException( + "There is already an open session #" + + getCurrentSession().getUuid()); + sessions.add(((OpenSession) obj).execute(request, bundleContext)); + answer = new DetachedAnswer(request, "Session #" + + getCurrentSession().getUuid() + " open."); + } else if (obj instanceof CloseSession) { + if (getCurrentSession() == null) + throw new DetachedException("There is no open session to close"); + answer = new DetachedAnswer(request, "Session #" + + getCurrentSession().getUuid() + " closed."); + answer.setStatus(DetachedAnswer.CLOSED_SESSION); + } else { + answer = null; + } + return answer; + } + + protected final DetachedSession getCurrentSession() { + if (sessions.size() == 0) { + return null; + } else { + DetachedSession session = (DetachedSession) sessions.get(sessions + .size() - 1); + List answers = session.getAnswers(); + if (answers.size() > 0) { + DetachedAnswer lastAnswer = (DetachedAnswer) answers + .get(answers.size() - 1); + if (lastAnswer.getStatus() == DetachedAnswer.ERROR + || lastAnswer.getStatus() == DetachedAnswer.CLOSED_SESSION) + return null; } - }, "driverThread"); + return session; + } + } - active = true; + protected final DetachedSession getPreviousSession() { + if (sessions.size() < 2) + return null; + else + return (DetachedSession) sessions.get(sessions.size() - 2); + } - driverThread.start(); + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; } }