X-Git-Url: http://git.argeo.org/?a=blobdiff_plain;f=org.argeo.slc.detached%2Fsrc%2Fmain%2Fjava%2Forg%2Fargeo%2Fslc%2Fdetached%2Fdrivers%2FFileDriver.java;h=ce88f5b12f4184e29b49524b7c3fde05259bc92d;hb=f198cdf84f9c6b5312825672450f92bc8451ede3;hp=89522ed0aaacf7ba07ff43a23b3b70334787c9f0;hpb=ca0badfce48057d058b0bf9f0dc2efe07affb46b;p=gpl%2Fargeo-slc.git diff --git a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/FileDriver.java b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/FileDriver.java index 89522ed0a..ce88f5b12 100644 --- a/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/FileDriver.java +++ b/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/FileDriver.java @@ -1,68 +1,243 @@ package org.argeo.slc.detached.drivers; import java.io.File; +import java.io.FileFilter; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.text.SimpleDateFormat; +import java.util.Date; +import javax.xml.transform.stream.StreamResult; +import javax.xml.transform.stream.StreamSource; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.filefilter.NotFileFilter; +import org.apache.commons.io.filefilter.SuffixFileFilter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.argeo.slc.detached.DetachedAnswer; import org.argeo.slc.detached.DetachedClient; -import org.argeo.slc.detached.DetachedDriver; +import org.argeo.slc.detached.DetachedCommunication; import org.argeo.slc.detached.DetachedException; -import org.argeo.slc.detached.DetachedStepAnswer; -import org.argeo.slc.detached.DetachedStepRequest; +import org.argeo.slc.detached.DetachedRequest; +import org.springframework.beans.factory.InitializingBean; + +public class FileDriver extends AbstractDriver implements DetachedClient, + InitializingBean { + private final static Log log = LogFactory.getLog(FileDriver.class); + private final static SimpleDateFormat sdf = new SimpleDateFormat( + "yyMMdd_HHmmss_SSS"); + + private File baseDir; + private File requestsDir; + private File answersDir; + private File processedRequestsDir; + private File processedAnswersDir; + private File cleanedRequestsDir; + private File cleanedAnswersDir; + + private String lockFileExt = "lck"; + private FileFilter notLockFileFilter = new NotFileFilter( + new SuffixFileFilter("." + lockFileExt)); -public class FileDriver implements DetachedDriver, DetachedClient { - private File requestDir; - private File answerDir; + public synchronized DetachedRequest receiveRequest() throws Exception { + DetachedRequest request = (DetachedRequest) receiveFile(requestsDir, + processedRequestsDir, 0); + if (request != null) + if (log.isTraceEnabled()) + log.trace("Received detached request #" + request.getUuid() + + " for ref '" + request.getRef() + "', path=" + + request.getPath()); + return request; + } + + public void sendAnswer(DetachedAnswer answer) throws Exception { + sendFile(answersDir, answer); + if (log.isTraceEnabled()) + log.trace("Sent detached answer #" + answer.getUuid()); + } - public synchronized DetachedStepRequest receiveRequest() throws Exception { - if (!requestDir.exists()) - throw new DetachedException("Request dir " - + requestDir.getCanonicalPath() + " does not exist."); + public DetachedAnswer receiveAnswer() throws Exception { + DetachedAnswer answer = (DetachedAnswer) receiveFile(answersDir, + processedAnswersDir, getReceiveAnswerTimeout()); + if (answer != null) + if (log.isTraceEnabled()) + log.trace("Received detached answer #" + answer.getUuid()); + return answer; + } + + public void sendRequest(DetachedRequest request) throws Exception { + sendFile(requestsDir, request); + if (log.isTraceEnabled()) + log.trace("Sent detached request #" + request.getUuid() + + " for ref '" + request.getRef() + "', path=" + + request.getPath()); + } + + protected void sendFile(File dir, DetachedCommunication detCom) + throws Exception { + final File file; + if (getXmlConverter() != null) + file = new File(dir.getPath() + File.separator + + sdf.format(new Date()) + '-' + detCom.getUuid() + ".xml"); + else + file = new File(dir.getPath() + File.separator + detCom.getUuid()); + + File lockFile = createLockFile(file); + if (getXmlConverter() != null) { + FileOutputStream outFile = new FileOutputStream(file); + try { + StreamResult result = new StreamResult(outFile); + getXmlConverter().marshallCommunication(detCom, result); + } finally { + IOUtils.closeQuietly(outFile); + } + } else { + ObjectOutputStream out = new ObjectOutputStream( + new FileOutputStream(file)); + try { + out.writeObject(detCom); + } finally { + IOUtils.closeQuietly(out); + } + } + lockFile.delete(); + } + /** + * @param timeout + * in ms, 0 is no timeout + */ + protected synchronized DetachedCommunication receiveFile(File dir, + File processedDir, long timeout) throws Exception { + long begin = System.currentTimeMillis(); File file = null; - while (file == null) { - File[] files = requestDir.listFiles(); + while (file == null && isActive()) { + if (!dir.exists()) + throw new DetachedException("Dir " + dir + " does not exist."); + + File[] files = dir.listFiles(notLockFileFilter); if (files.length > 0) file = files[0]; - else - Thread.sleep(1000); + else { + try { + wait(100); + } catch (InterruptedException e) { + // silent + } + } + + long duration = System.currentTimeMillis() - begin; + if (timeout != 0 && duration > timeout) { + throw new DetachedException("Receive file timed out after " + + duration + "ms."); + } } - ObjectInputStream in = new ObjectInputStream(new FileInputStream(file)); - DetachedStepRequest request = (DetachedStepRequest) in.readObject(); - in.close(); + if (!isActive()) + return null; - file.delete();// move it to a processed dir instead? - return request; + File lockFile = nameLockFile(file); + while (lockFile.exists()) + // FIXME: implements time out + Thread.sleep(100); + + // Read the file + final DetachedCommunication detCom; + if (FilenameUtils.getExtension(file.getName()).equals("xml")) { + if (getXmlConverter() == null) + throw new DetachedException("No XML converter defined."); + FileInputStream in = new FileInputStream(file); + try { + StreamSource source = new StreamSource(in); + detCom = getXmlConverter().unmarshallCommunication(source); + } finally { + IOUtils.closeQuietly(in); + } + } else { + ObjectInputStream in = new ObjectInputStream(new FileInputStream( + file)); + try { + detCom = (DetachedCommunication) in.readObject(); + } finally { + IOUtils.closeQuietly(in); + } + } + // Move to processed dir + FileUtils.moveFileToDirectory(file, processedDir, false); + return detCom; } - public void sendAnswer(DetachedStepAnswer answer) throws Exception { - // TODO Auto-generated method stub + protected File createLockFile(File file) { + File lockFile = nameLockFile(file); + try { + lockFile.createNewFile(); + } catch (IOException e) { + throw new DetachedException("Cannot create lock file " + lockFile); + } + return lockFile; + } + protected File nameLockFile(File file) { + return new File(file.getAbsolutePath() + "." + lockFileExt); } - public DetachedStepAnswer receiveAnswer() throws Exception { - // TODO Auto-generated method stub - return null; + public void setBaseDir(File baseDir) { + this.baseDir = baseDir; } - public void sendRequest(DetachedStepRequest request) throws Exception { - File file = new File(requestDir.getPath() + File.separator - + request.getUuid()); - ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream( - file)); - out.writeObject(request); - out.close(); + private void createIfNotExist(File dir) { + if (!dir.exists()) { + log.warn("Dir " + dir.getAbsolutePath() + + " does not exist. Creating it..."); + dir.mkdirs(); + } } - public void setRequestDir(File requestDir) { - this.requestDir = requestDir; + public void afterPropertiesSet() throws Exception { + this.requestsDir = new File(baseDir.getAbsolutePath() + File.separator + + "requests"); + this.answersDir = new File(baseDir.getAbsolutePath() + File.separator + + "answers"); + this.processedRequestsDir = new File(baseDir.getAbsolutePath() + + File.separator + "processed" + File.separator + "requests"); + this.processedAnswersDir = new File(baseDir.getAbsolutePath() + + File.separator + "processed" + File.separator + "answers"); + this.cleanedRequestsDir = new File(baseDir.getAbsolutePath() + + File.separator + "cleaned" + File.separator + "requests"); + this.cleanedAnswersDir = new File(baseDir.getAbsolutePath() + + File.separator + "cleaned" + File.separator + "answers"); + + createIfNotExist(requestsDir); + createIfNotExist(answersDir); + createIfNotExist(processedRequestsDir); + createIfNotExist(processedAnswersDir); + createIfNotExist(cleanedRequestsDir); + createIfNotExist(cleanedAnswersDir); + log.info("Detached File Driver initialized on " + baseDir); } - public void setAnswerDir(File answerDir) { - this.answerDir = answerDir; + public void cleanPreviousRuns() throws Exception { + + // Clean requests and answers from previous builds + File[] remainingRequests = requestsDir.listFiles(); + for (int i = 0; i < remainingRequests.length; i++) { + FileUtils.moveFileToDirectory(remainingRequests[i], + cleanedRequestsDir, false); + } + + File[] remainingAnswers = answersDir.listFiles(); + for (int i = 0; i < remainingAnswers.length; i++) { + FileUtils.moveFileToDirectory(remainingAnswers[i], + cleanedAnswersDir, false); + } + log.info("Clean previous runs of File Driver on " + baseDir); + } }