From: Mathieu Baudier Date: Thu, 2 Oct 2008 07:46:23 +0000 (+0000) Subject: Integrate detached with standard SLC X-Git-Tag: argeo-slc-2.1.7~2517 X-Git-Url: http://git.argeo.org/?a=commitdiff_plain;h=cf1adb6cbf016fe861e7b6952776081a0339c4d3;p=gpl%2Fargeo-slc.git Integrate detached with standard SLC git-svn-id: https://svn.argeo.org/slc/trunk@1674 4cfe0d0a-d680-48aa-b62c-e0a02a3f76cc --- diff --git a/org.argeo.slc.detached/pom.xml b/org.argeo.slc.detached/pom.xml index f276f7bc6..79cb5b4c9 100644 --- a/org.argeo.slc.detached/pom.xml +++ b/org.argeo.slc.detached/pom.xml @@ -69,5 +69,9 @@ junit test + + log4j + log4j + \ No newline at end of file diff --git a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedAnswer.java b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedAnswer.java new file mode 100644 index 000000000..b4279e082 --- /dev/null +++ b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedAnswer.java @@ -0,0 +1,55 @@ +package org.argeo.slc.detached; + +import java.util.Properties; + +public class DetachedAnswer implements DetachedCommunication { + static final long serialVersionUID = 1l; + + public static int PROCESSED = 0; + public static int ERROR = 1; + public static int SKIPPED = 2; + + private Properties properties = new Properties(); + private int status; + private String log; + private String uuid; + + public DetachedAnswer() { + + } + + public DetachedAnswer(DetachedRequest request, String message) { + uuid = request.getUuid(); + log = message; + status = PROCESSED; + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties outputParameters) { + this.properties = outputParameters; + } + + public int getStatus() { + return status; + } + + public void setStatus(int outputStatus) { + this.status = outputStatus; + } + + public String getLog() { + return log; + } + + public void setLog(String log) { + this.log = log; + } + + public String getUuid() { + return uuid; + } + +} diff --git a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedClient.java b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedClient.java index 101d1efa7..3c7503253 100644 --- a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedClient.java +++ b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedClient.java @@ -1,8 +1,8 @@ package org.argeo.slc.detached; public interface DetachedClient { - public void sendRequest(DetachedStepRequest request) throws Exception; + public void sendRequest(DetachedRequest request) throws Exception; /** Blocks until next answer. */ - public DetachedStepAnswer receiveAnswer() throws Exception; + public DetachedAnswer receiveAnswer() throws Exception; } diff --git a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedCommunication.java b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedCommunication.java new file mode 100644 index 000000000..8bdba36e2 --- /dev/null +++ b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedCommunication.java @@ -0,0 +1,7 @@ +package org.argeo.slc.detached; + +import java.io.Serializable; + +public interface DetachedCommunication extends Serializable { + public String getUuid(); +} diff --git a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedDriver.java b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedDriver.java index 1e7f7bc7c..08791a8a2 100644 --- a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedDriver.java +++ b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedDriver.java @@ -2,6 +2,6 @@ package org.argeo.slc.detached; public interface DetachedDriver { /** Blocks until it receives a request. */ - public DetachedStepRequest receiveRequest() throws Exception; - public void sendAnswer(DetachedStepAnswer answer) throws Exception; + public DetachedRequest receiveRequest() throws Exception; + public void sendAnswer(DetachedAnswer answer) throws Exception; } diff --git a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServer.java b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServer.java index 37bb2c877..76828b5bc 100644 --- a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServer.java +++ b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServer.java @@ -1,5 +1,5 @@ package org.argeo.slc.detached; public interface DetachedExecutionServer { - public DetachedStepAnswer executeStep(DetachedStepRequest request); + public DetachedAnswer executeStep(DetachedRequest request); } diff --git a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java index d21ec2dfc..13d67947f 100644 --- a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java +++ b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java @@ -19,7 +19,7 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer { detachedContext = new DetachedContextImpl(); } - public DetachedStepAnswer executeStep(DetachedStepRequest request) { + public DetachedAnswer executeStep(DetachedRequest request) { try { DetachedStep step = null; @@ -29,7 +29,7 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer { for (int i = 0; i < refs.length; i++) { StaticRefProvider provider = (StaticRefProvider) bundleContext .getService(refs[i]); - Object obj = provider.getStaticRef(request.getStepRef()); + Object obj = provider.getStaticRef(request.getRef()); if (obj != null) { step = (DetachedStep) obj; break; @@ -38,7 +38,7 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer { if (step == null) throw new DetachedException("Could not find step with ref " - + request.getStepRef()); + + request.getRef()); return step.execute(detachedContext, request); } catch (DetachedException e) { @@ -58,8 +58,9 @@ public class DetachedExecutionServerImpl implements DetachedExecutionServer { public void run() { while (active) { try { - DetachedStepRequest request = driver.receiveRequest(); - executeStep(request); + DetachedRequest request = driver.receiveRequest(); + DetachedAnswer answer = executeStep(request); + driver.sendAnswer(answer); } catch (Exception e) { if (e instanceof RuntimeException) throw (RuntimeException) e; diff --git a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedRequest.java b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedRequest.java new file mode 100644 index 000000000..0a3484267 --- /dev/null +++ b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedRequest.java @@ -0,0 +1,44 @@ +package org.argeo.slc.detached; + +import java.util.Properties; + +public class DetachedRequest implements DetachedCommunication { + static final long serialVersionUID = 1l; + + private String uuid; + private Properties properties = new Properties(); + private String ref; + private String path; + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties inputParameters) { + this.properties = inputParameters; + } + + public String getRef() { + return ref; + } + + public void setRef(String stepRef) { + this.ref = stepRef; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } +} diff --git a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedStep.java b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedStep.java index fd25d5260..67dba674c 100644 --- a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedStep.java +++ b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedStep.java @@ -2,6 +2,6 @@ package org.argeo.slc.detached; public interface DetachedStep { - public DetachedStepAnswer execute(DetachedContext detachedContext, - DetachedStepRequest detachedStepRequest); + public DetachedAnswer execute(DetachedContext detachedContext, + DetachedRequest detachedStepRequest); } diff --git a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedStepAnswer.java b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedStepAnswer.java deleted file mode 100644 index ebc6fe64a..000000000 --- a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedStepAnswer.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.argeo.slc.detached; - -import java.util.Properties; - -public class DetachedStepAnswer { - public static int PROCESSED = 0; - public static int ERROR = 1; - public static int SKIPPED = 2; - - private Properties outputParameters; - private int outputStatus; - private String log; - - public Properties getOutputParameters() { - return outputParameters; - } - - public void setOutputParameters(Properties outputParameters) { - this.outputParameters = outputParameters; - } - - public int getOutputStatus() { - return outputStatus; - } - - public void setOutputStatus(int outputStatus) { - this.outputStatus = outputStatus; - } - - public String getLog() { - return log; - } - - public void setLog(String log) { - this.log = log; - } - -} diff --git a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedStepRequest.java b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedStepRequest.java deleted file mode 100644 index 1c772ade3..000000000 --- a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedStepRequest.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.argeo.slc.detached; - -import java.io.Serializable; -import java.util.Properties; - -public class DetachedStepRequest implements Serializable { - private String uuid; - private Properties inputParameters; - private String stepRef; - private String path; - - public Properties getInputParameters() { - return inputParameters; - } - - public void setInputParameters(Properties inputParameters) { - this.inputParameters = inputParameters; - } - - public String getStepRef() { - return stepRef; - } - - public void setStepRef(String stepRef) { - this.stepRef = stepRef; - } - - public String getPath() { - return path; - } - - public void setPath(String path) { - this.path = path; - } - - public String getUuid() { - return uuid; - } - - public void setUuid(String uuid) { - this.uuid = uuid; - } -} diff --git a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/FileDriver.java b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/FileDriver.java index 89522ed0a..cc9d13922 100644 --- a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/FileDriver.java +++ b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/FileDriver.java @@ -3,66 +3,129 @@ package org.argeo.slc.detached.drivers; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.argeo.slc.detached.DetachedClient; +import org.argeo.slc.detached.DetachedCommunication; import org.argeo.slc.detached.DetachedDriver; import org.argeo.slc.detached.DetachedException; -import org.argeo.slc.detached.DetachedStepAnswer; -import org.argeo.slc.detached.DetachedStepRequest; +import org.argeo.slc.detached.DetachedAnswer; +import org.argeo.slc.detached.DetachedRequest; +import org.springframework.beans.factory.InitializingBean; -public class FileDriver implements DetachedDriver, DetachedClient { - private File requestDir; - private File answerDir; +public class FileDriver implements DetachedDriver, DetachedClient, + InitializingBean { + private final static Log log = LogFactory.getLog(FileDriver.class); - public synchronized DetachedStepRequest receiveRequest() throws Exception { - if (!requestDir.exists()) - throw new DetachedException("Request dir " - + requestDir.getCanonicalPath() + " does not exist."); + private File baseDir; + private File requestsDir; + private File answersDir; + private File processedRequestsDir; + private File processedAnswersDir; + public synchronized DetachedRequest receiveRequest() throws Exception { + return (DetachedRequest) receiveFile(requestsDir, + processedRequestsDir); + } + + public void sendAnswer(DetachedAnswer answer) throws Exception { + sendFile(answersDir, answer); + } + + public DetachedAnswer receiveAnswer() throws Exception { + return (DetachedAnswer) receiveFile(answersDir, processedAnswersDir); + } + + public void sendRequest(DetachedRequest request) throws Exception { + sendFile(requestsDir, request); + } + + protected void sendFile(File dir, DetachedCommunication detCom) + throws Exception { + File file = new File(dir.getPath() + File.separator + detCom.getUuid()); + File lockFile = createLockFile(file); + ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream( + file)); + out.writeObject(detCom); + out.close(); + lockFile.delete(); + } + + protected DetachedCommunication receiveFile(File dir, File processedDir) + throws Exception { File file = null; while (file == null) { - File[] files = requestDir.listFiles(); + File[] files = dir.listFiles(); if (files.length > 0) file = files[0]; else Thread.sleep(1000); } + File lockFile = nameLockFile(file); + while (lockFile.exists()) + // TODO: implements time out + Thread.sleep(100); + ObjectInputStream in = new ObjectInputStream(new FileInputStream(file)); - DetachedStepRequest request = (DetachedStepRequest) in.readObject(); + DetachedCommunication detCom = (DetachedCommunication) in.readObject(); in.close(); - file.delete();// move it to a processed dir instead? - return request; + // Move to processed dir + file.renameTo(new File(processedDir.getAbsolutePath() + File.separator + + file.getName())); + return detCom; + } - public void sendAnswer(DetachedStepAnswer answer) throws Exception { - // TODO Auto-generated method stub + protected File createLockFile(File file) { + File lockFile = nameLockFile(file); + try { + lockFile.createNewFile(); + } catch (IOException e) { + throw new DetachedException("Cannot create lock file " + lockFile); + } + return lockFile; + } + protected File nameLockFile(File file) { + return new File(file.getAbsolutePath() + ".lck"); } - public DetachedStepAnswer receiveAnswer() throws Exception { - // TODO Auto-generated method stub - return null; + public void setBaseDir(File baseDir) { + this.baseDir = baseDir; } - public void sendRequest(DetachedStepRequest request) throws Exception { - File file = new File(requestDir.getPath() + File.separator - + request.getUuid()); - ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream( - file)); - out.writeObject(request); - out.close(); + public void init() { + this.requestsDir = new File(baseDir.getAbsolutePath() + File.separator + + "requests"); + this.answersDir = new File(baseDir.getAbsolutePath() + File.separator + + "answers"); + this.processedRequestsDir = new File(baseDir.getAbsolutePath() + + File.separator + "processed" + File.separator + "requests"); + this.processedAnswersDir = new File(baseDir.getAbsolutePath() + + File.separator + "processed" + File.separator + "answers"); + + createIfNotExist(requestsDir); + createIfNotExist(answersDir); + createIfNotExist(processedRequestsDir); + createIfNotExist(processedAnswersDir); } - public void setRequestDir(File requestDir) { - this.requestDir = requestDir; + private void createIfNotExist(File dir) { + if (!dir.exists()) { + log.warn("Dir " + requestsDir.getAbsolutePath() + + " does not exist. Creating it..."); + dir.mkdirs(); + } } - public void setAnswerDir(File answerDir) { - this.answerDir = answerDir; + public void afterPropertiesSet() throws Exception { + init(); } } diff --git a/org.argeo.slc.detached/src/main/resources/META-INF/MANIFEST.MF b/org.argeo.slc.detached/src/main/resources/META-INF/MANIFEST.MF index 2c158afef..92b00a88f 100644 --- a/org.argeo.slc.detached/src/main/resources/META-INF/MANIFEST.MF +++ b/org.argeo.slc.detached/src/main/resources/META-INF/MANIFEST.MF @@ -12,7 +12,7 @@ Created-By: Apache Maven Bundle Plugin Bundle-Vendor: Argeo Build-Jdk: 1.6.0 Bundle-Version: 0.11.1.SNAPSHOT -Bnd-LastModified: 1222616555343 +Bnd-LastModified: 1222787012918 Bundle-ManifestVersion: 2 Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt Bundle-Description: SLC Distribution