private int skipCount = 0;
private BundleContext bundleContext;
- //private DetachedDriver driver;
-
- //private boolean active = false;
-
-// public void setDriver(DetachedDriver driver) {
-// this.driver = driver;
-// }
public DetachedExecutionServerImpl() {
detachedContext = new DetachedContextImpl();
sessions = new Vector();
}
- public synchronized DetachedAnswer executeStep(DetachedRequest request) {
+ public synchronized DetachedAnswer executeRequest(DetachedRequest request) {
DetachedAnswer answer = null;
try {
- DetachedStep step = null;
+ // DetachedStep step = null;
// Find action
ServiceReference[] refs = bundleContext.getAllServiceReferences(
// Execute actions
if (obj instanceof DetachedStep) {
- if (getCurrentSession() == null)
- throw new DetachedException("No open session.");
-
- StringBuffer skippedLog = new StringBuffer();
- boolean execute = true;
- if (getPreviousSession() != null
- && !getPreviousSession().isClosed()) {
- if (getCurrentSession().getDoItAgainPolicy().equals(
- DetachedSession.SKIP_UNTIL_ERROR)) {
- // Skip execution of already successful steps
- if (getPreviousSession().getAnswers().size() > skipCount) {
- DetachedAnswer previousAnswer = (DetachedAnswer) getPreviousSession()
- .getAnswers().get(skipCount);
- DetachedRequest previousRequest = (DetachedRequest) getPreviousSession()
- .getRequests().get(skipCount);
- // Check paths
- if (!previousRequest.getPath().equals(
- request.getPath())) {
- String msg = "New request is not consistent with previous path. previousPath="
- + previousRequest.getPath()
- + ", newPath="
- + request.getPath()
- + "\n";
- skippedLog.append(msg);
- log.warn(msg);
- }
-
- if (previousAnswer.getStatus() != DetachedAnswer.ERROR) {
- execute = false;
- }
- } else {
- // went further as skip count, doing nothing.
- }
- }
- }
-
- if (execute) {
- step = (DetachedStep) obj;
- answer = step.execute(detachedContext, request);
- } else {
- skippedLog.append("Skipped path " + request.getPath()
- + " (skipCount=" + skipCount + ")");
- answer = new DetachedAnswer(request);
- answer.setStatus(DetachedAnswer.SKIPPED);
- answer.setLog(skippedLog.toString());
- }
+ answer = processStep((DetachedStep) obj, request);
} else if (obj instanceof DetachedAdminCommand) {
- if (obj instanceof OpenSession) {
- if (getCurrentSession() != null)
- throw new DetachedException(
- "There is already an open session #"
- + getCurrentSession().getUuid());
- sessions.add(((OpenSession) obj).execute(request,
- bundleContext));
- answer = new DetachedAnswer(request, "Session #"
- + getCurrentSession().getUuid() + " open.");
- } else if (obj instanceof CloseSession) {
- if (getCurrentSession() == null)
- throw new DetachedException(
- "There is no open session to close");
- answer = new DetachedAnswer(request, "Session #"
- + getCurrentSession().getUuid() + " closed.");
- answer.setStatus(DetachedAnswer.CLOSED_SESSION);
- }
+ answer = processAdminCommand((DetachedAdminCommand) obj,
+ request);
}
- if (answer == null)
+ if (answer == null) {
throw new DetachedException("Unknown action type "
+ obj.getClass() + " for action with ref "
+ request.getRef());
+ } else {
+ log.info("Processed '" + request.getRef() + "' (path="
+ + request.getPath() + ")");
+ }
} catch (DetachedException e) {
answer = new DetachedAnswer(request);
return answer;
}
+ protected DetachedAnswer processStep(DetachedStep obj,
+ DetachedRequest request) {
+ DetachedAnswer answer;
+ if (getCurrentSession() == null)
+ throw new DetachedException("No open session.");
+
+ StringBuffer skippedLog = new StringBuffer();
+ boolean execute = true;
+ if (getPreviousSession() != null && !getPreviousSession().isClosed()) {
+ if (getCurrentSession().getDoItAgainPolicy().equals(
+ DetachedSession.SKIP_UNTIL_ERROR)) {
+ // Skip execution of already successful steps
+ if (getPreviousSession().getAnswers().size() > skipCount) {
+ DetachedAnswer previousAnswer = (DetachedAnswer) getPreviousSession()
+ .getAnswers().get(skipCount);
+ DetachedRequest previousRequest = (DetachedRequest) getPreviousSession()
+ .getRequests().get(skipCount);
+ // Check paths
+ if (!previousRequest.getPath().equals(request.getPath())) {
+ String msg = "New request is not consistent with previous path. previousPath="
+ + previousRequest.getPath()
+ + ", newPath="
+ + request.getPath() + "\n";
+ skippedLog.append(msg);
+ log.warn(msg);
+ }
+
+ if (previousAnswer.getStatus() != DetachedAnswer.ERROR) {
+ execute = false;
+ }
+ } else {
+ // went further as skip count, doing nothing.
+ }
+ }
+ }
+
+ if (execute) {
+ DetachedStep step = (DetachedStep) obj;
+ answer = step.execute(detachedContext, request);
+ } else {
+ skippedLog.append("Skipped path " + request.getPath()
+ + " (skipCount=" + skipCount + ")");
+ answer = new DetachedAnswer(request);
+ answer.setStatus(DetachedAnswer.SKIPPED);
+ answer.setLog(skippedLog.toString());
+ }
+ return answer;
+ }
+
+ protected DetachedAnswer processAdminCommand(DetachedAdminCommand obj,
+ DetachedRequest request) {
+ DetachedAnswer answer;
+ if (obj instanceof OpenSession) {
+ if (getCurrentSession() != null)
+ throw new DetachedException(
+ "There is already an open session #"
+ + getCurrentSession().getUuid());
+ sessions.add(((OpenSession) obj).execute(request, bundleContext));
+ answer = new DetachedAnswer(request, "Session #"
+ + getCurrentSession().getUuid() + " open.");
+ } else if (obj instanceof CloseSession) {
+ if (getCurrentSession() == null)
+ throw new DetachedException("There is no open session to close");
+ answer = new DetachedAnswer(request, "Session #"
+ + getCurrentSession().getUuid() + " closed.");
+ answer.setStatus(DetachedAnswer.CLOSED_SESSION);
+ } else {
+ answer = null;
+ }
+ return answer;
+ }
+
protected final DetachedSession getCurrentSession() {
if (sessions.size() == 0) {
return null;
public void init(BundleContext bundleContext) {
this.bundleContext = bundleContext;
-// Thread driverThread = new Thread(new Runnable() {
-//
-// public void run() {
-// while (active) {
-// try {
-// DetachedRequest request = driver.receiveRequest();
-// DetachedAnswer answer = executeStep(request);
-// driver.sendAnswer(answer);
-// } catch (Exception e) {
-// if (e instanceof RuntimeException)
-// throw (RuntimeException) e;
-// else
-// e.printStackTrace();
-// }
-// }
-//
-// }
-// }, "driverThread");
-//
-// active = true;
-//
-// driverThread.start();
}
}
public synchronized DetachedRequest receiveRequest() throws Exception {
DetachedRequest request = (DetachedRequest) receiveFile(requestsDir,
- processedRequestsDir);
+ processedRequestsDir, 0);
if (request != null)
- log.debug("Received detached request #" + request.getUuid()
- + " for ref '" + request.getRef() + "', path="
- + request.getPath());
+ if (log.isTraceEnabled())
+ log.trace("Received detached request #" + request.getUuid()
+ + " for ref '" + request.getRef() + "', path="
+ + request.getPath());
return request;
}
public void sendAnswer(DetachedAnswer answer) throws Exception {
sendFile(answersDir, answer);
- log.debug("Sent detached answer #" + answer.getUuid());
+ if (log.isTraceEnabled())
+ log.trace("Sent detached answer #" + answer.getUuid());
}
public DetachedAnswer receiveAnswer() throws Exception {
DetachedAnswer answer = (DetachedAnswer) receiveFile(answersDir,
- processedAnswersDir);
+ processedAnswersDir, getReceiveAnswerTimeout());
if (answer != null)
- log.debug("Received detached answer #" + answer.getUuid());
+ if (log.isTraceEnabled())
+ log.trace("Received detached answer #" + answer.getUuid());
return answer;
}
public void sendRequest(DetachedRequest request) throws Exception {
sendFile(requestsDir, request);
- log.debug("Sent detached request #" + request.getUuid()
- + " for ref '" + request.getRef() + "', path="
- + request.getPath());
+ if (log.isTraceEnabled())
+ log.trace("Sent detached request #" + request.getUuid()
+ + " for ref '" + request.getRef() + "', path="
+ + request.getPath());
}
protected void sendFile(File dir, DetachedCommunication detCom)
lockFile.delete();
}
+ /**
+ * @param timeout
+ * in ms, 0 is no timeout
+ */
protected synchronized DetachedCommunication receiveFile(File dir,
- File processedDir) throws Exception {
+ File processedDir, long timeout) throws Exception {
+ long begin = System.currentTimeMillis();
File file = null;
while (file == null && isActive()) {
if (!dir.exists())
// silent
}
}
+
+ long duration = System.currentTimeMillis() - begin;
+ if (timeout != 0 && duration > timeout) {
+ throw new DetachedException("Receive file timed out after "
+ + duration + "ms.");
+ }
}
if (!isActive())
}
// Move to processed dir
FileUtils.moveFileToDirectory(file, processedDir, false);
- // file.renameTo(new File(processedDir.getAbsolutePath() +
- // File.separator
- // + file.getName()));
return detCom;
}