<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
--- /dev/null
+package org.argeo.slc.detached;
+
+import java.util.Properties;
+
+public class DetachedAnswer implements DetachedCommunication {
+ static final long serialVersionUID = 1l;
+
+ public static int PROCESSED = 0;
+ public static int ERROR = 1;
+ public static int SKIPPED = 2;
+
+ private Properties properties = new Properties();
+ private int status;
+ private String log;
+ private String uuid;
+
+ public DetachedAnswer() {
+
+ }
+
+ public DetachedAnswer(DetachedRequest request, String message) {
+ uuid = request.getUuid();
+ log = message;
+ status = PROCESSED;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties outputParameters) {
+ this.properties = outputParameters;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int outputStatus) {
+ this.status = outputStatus;
+ }
+
+ public String getLog() {
+ return log;
+ }
+
+ public void setLog(String log) {
+ this.log = log;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+}
package org.argeo.slc.detached;
public interface DetachedClient {
- public void sendRequest(DetachedStepRequest request) throws Exception;
+ public void sendRequest(DetachedRequest request) throws Exception;
/** Blocks until next answer. */
- public DetachedStepAnswer receiveAnswer() throws Exception;
+ public DetachedAnswer receiveAnswer() throws Exception;
}
--- /dev/null
+package org.argeo.slc.detached;
+
+import java.io.Serializable;
+
+public interface DetachedCommunication extends Serializable {
+ public String getUuid();
+}
public interface DetachedDriver {
/** Blocks until it receives a request. */
- public DetachedStepRequest receiveRequest() throws Exception;
- public void sendAnswer(DetachedStepAnswer answer) throws Exception;
+ public DetachedRequest receiveRequest() throws Exception;
+ public void sendAnswer(DetachedAnswer answer) throws Exception;
}
package org.argeo.slc.detached;
public interface DetachedExecutionServer {
- public DetachedStepAnswer executeStep(DetachedStepRequest request);
+ public DetachedAnswer executeStep(DetachedRequest request);
}
detachedContext = new DetachedContextImpl();
}
- public DetachedStepAnswer executeStep(DetachedStepRequest request) {
+ public DetachedAnswer executeStep(DetachedRequest request) {
try {
DetachedStep step = null;
for (int i = 0; i < refs.length; i++) {
StaticRefProvider provider = (StaticRefProvider) bundleContext
.getService(refs[i]);
- Object obj = provider.getStaticRef(request.getStepRef());
+ Object obj = provider.getStaticRef(request.getRef());
if (obj != null) {
step = (DetachedStep) obj;
break;
if (step == null)
throw new DetachedException("Could not find step with ref "
- + request.getStepRef());
+ + request.getRef());
return step.execute(detachedContext, request);
} catch (DetachedException e) {
public void run() {
while (active) {
try {
- DetachedStepRequest request = driver.receiveRequest();
- executeStep(request);
+ DetachedRequest request = driver.receiveRequest();
+ DetachedAnswer answer = executeStep(request);
+ driver.sendAnswer(answer);
} catch (Exception e) {
if (e instanceof RuntimeException)
throw (RuntimeException) e;
--- /dev/null
+package org.argeo.slc.detached;
+
+import java.util.Properties;
+
+public class DetachedRequest implements DetachedCommunication {
+ static final long serialVersionUID = 1l;
+
+ private String uuid;
+ private Properties properties = new Properties();
+ private String ref;
+ private String path;
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties inputParameters) {
+ this.properties = inputParameters;
+ }
+
+ public String getRef() {
+ return ref;
+ }
+
+ public void setRef(String stepRef) {
+ this.ref = stepRef;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+}
public interface DetachedStep {
- public DetachedStepAnswer execute(DetachedContext detachedContext,
- DetachedStepRequest detachedStepRequest);
+ public DetachedAnswer execute(DetachedContext detachedContext,
+ DetachedRequest detachedStepRequest);
}
+++ /dev/null
-package org.argeo.slc.detached;
-
-import java.util.Properties;
-
-public class DetachedStepAnswer {
- public static int PROCESSED = 0;
- public static int ERROR = 1;
- public static int SKIPPED = 2;
-
- private Properties outputParameters;
- private int outputStatus;
- private String log;
-
- public Properties getOutputParameters() {
- return outputParameters;
- }
-
- public void setOutputParameters(Properties outputParameters) {
- this.outputParameters = outputParameters;
- }
-
- public int getOutputStatus() {
- return outputStatus;
- }
-
- public void setOutputStatus(int outputStatus) {
- this.outputStatus = outputStatus;
- }
-
- public String getLog() {
- return log;
- }
-
- public void setLog(String log) {
- this.log = log;
- }
-
-}
+++ /dev/null
-package org.argeo.slc.detached;
-
-import java.io.Serializable;
-import java.util.Properties;
-
-public class DetachedStepRequest implements Serializable {
- private String uuid;
- private Properties inputParameters;
- private String stepRef;
- private String path;
-
- public Properties getInputParameters() {
- return inputParameters;
- }
-
- public void setInputParameters(Properties inputParameters) {
- this.inputParameters = inputParameters;
- }
-
- public String getStepRef() {
- return stepRef;
- }
-
- public void setStepRef(String stepRef) {
- this.stepRef = stepRef;
- }
-
- public String getPath() {
- return path;
- }
-
- public void setPath(String path) {
- this.path = path;
- }
-
- public String getUuid() {
- return uuid;
- }
-
- public void setUuid(String uuid) {
- this.uuid = uuid;
- }
-}
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.argeo.slc.detached.DetachedClient;
+import org.argeo.slc.detached.DetachedCommunication;
import org.argeo.slc.detached.DetachedDriver;
import org.argeo.slc.detached.DetachedException;
-import org.argeo.slc.detached.DetachedStepAnswer;
-import org.argeo.slc.detached.DetachedStepRequest;
+import org.argeo.slc.detached.DetachedAnswer;
+import org.argeo.slc.detached.DetachedRequest;
+import org.springframework.beans.factory.InitializingBean;
-public class FileDriver implements DetachedDriver, DetachedClient {
- private File requestDir;
- private File answerDir;
+public class FileDriver implements DetachedDriver, DetachedClient,
+ InitializingBean {
+ private final static Log log = LogFactory.getLog(FileDriver.class);
- public synchronized DetachedStepRequest receiveRequest() throws Exception {
- if (!requestDir.exists())
- throw new DetachedException("Request dir "
- + requestDir.getCanonicalPath() + " does not exist.");
+ private File baseDir;
+ private File requestsDir;
+ private File answersDir;
+ private File processedRequestsDir;
+ private File processedAnswersDir;
+ public synchronized DetachedRequest receiveRequest() throws Exception {
+ return (DetachedRequest) receiveFile(requestsDir,
+ processedRequestsDir);
+ }
+
+ public void sendAnswer(DetachedAnswer answer) throws Exception {
+ sendFile(answersDir, answer);
+ }
+
+ public DetachedAnswer receiveAnswer() throws Exception {
+ return (DetachedAnswer) receiveFile(answersDir, processedAnswersDir);
+ }
+
+ public void sendRequest(DetachedRequest request) throws Exception {
+ sendFile(requestsDir, request);
+ }
+
+ protected void sendFile(File dir, DetachedCommunication detCom)
+ throws Exception {
+ File file = new File(dir.getPath() + File.separator + detCom.getUuid());
+ File lockFile = createLockFile(file);
+ ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(
+ file));
+ out.writeObject(detCom);
+ out.close();
+ lockFile.delete();
+ }
+
+ protected DetachedCommunication receiveFile(File dir, File processedDir)
+ throws Exception {
File file = null;
while (file == null) {
- File[] files = requestDir.listFiles();
+ File[] files = dir.listFiles();
if (files.length > 0)
file = files[0];
else
Thread.sleep(1000);
}
+ File lockFile = nameLockFile(file);
+ while (lockFile.exists())
+ // TODO: implements time out
+ Thread.sleep(100);
+
ObjectInputStream in = new ObjectInputStream(new FileInputStream(file));
- DetachedStepRequest request = (DetachedStepRequest) in.readObject();
+ DetachedCommunication detCom = (DetachedCommunication) in.readObject();
in.close();
- file.delete();// move it to a processed dir instead?
- return request;
+ // Move to processed dir
+ file.renameTo(new File(processedDir.getAbsolutePath() + File.separator
+ + file.getName()));
+ return detCom;
+
}
- public void sendAnswer(DetachedStepAnswer answer) throws Exception {
- // TODO Auto-generated method stub
+ protected File createLockFile(File file) {
+ File lockFile = nameLockFile(file);
+ try {
+ lockFile.createNewFile();
+ } catch (IOException e) {
+ throw new DetachedException("Cannot create lock file " + lockFile);
+ }
+ return lockFile;
+ }
+ protected File nameLockFile(File file) {
+ return new File(file.getAbsolutePath() + ".lck");
}
- public DetachedStepAnswer receiveAnswer() throws Exception {
- // TODO Auto-generated method stub
- return null;
+ public void setBaseDir(File baseDir) {
+ this.baseDir = baseDir;
}
- public void sendRequest(DetachedStepRequest request) throws Exception {
- File file = new File(requestDir.getPath() + File.separator
- + request.getUuid());
- ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(
- file));
- out.writeObject(request);
- out.close();
+ public void init() {
+ this.requestsDir = new File(baseDir.getAbsolutePath() + File.separator
+ + "requests");
+ this.answersDir = new File(baseDir.getAbsolutePath() + File.separator
+ + "answers");
+ this.processedRequestsDir = new File(baseDir.getAbsolutePath()
+ + File.separator + "processed" + File.separator + "requests");
+ this.processedAnswersDir = new File(baseDir.getAbsolutePath()
+ + File.separator + "processed" + File.separator + "answers");
+
+ createIfNotExist(requestsDir);
+ createIfNotExist(answersDir);
+ createIfNotExist(processedRequestsDir);
+ createIfNotExist(processedAnswersDir);
}
- public void setRequestDir(File requestDir) {
- this.requestDir = requestDir;
+ private void createIfNotExist(File dir) {
+ if (!dir.exists()) {
+ log.warn("Dir " + requestsDir.getAbsolutePath()
+ + " does not exist. Creating it...");
+ dir.mkdirs();
+ }
}
- public void setAnswerDir(File answerDir) {
- this.answerDir = answerDir;
+ public void afterPropertiesSet() throws Exception {
+ init();
}
}
Bundle-Vendor: Argeo\r
Build-Jdk: 1.6.0\r
Bundle-Version: 0.11.1.SNAPSHOT\r
-Bnd-LastModified: 1222616555343\r
+Bnd-LastModified: 1222787012918\r
Bundle-ManifestVersion: 2\r
Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt\r
Bundle-Description: SLC Distribution\r