2 * Copyright (C) 2007-2012 Mathieu Baudier
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org
.argeo
.slc
.detached
.drivers
;
19 import java
.io
.FileFilter
;
20 import java
.io
.FileInputStream
;
21 import java
.io
.FileOutputStream
;
22 import java
.io
.IOException
;
23 import java
.io
.ObjectInputStream
;
24 import java
.io
.ObjectOutputStream
;
25 import java
.text
.MessageFormat
;
26 import java
.text
.SimpleDateFormat
;
27 import java
.util
.Date
;
29 import javax
.xml
.transform
.stream
.StreamResult
;
30 import javax
.xml
.transform
.stream
.StreamSource
;
32 import org
.apache
.commons
.io
.FileUtils
;
33 import org
.apache
.commons
.io
.FilenameUtils
;
34 import org
.apache
.commons
.io
.IOUtils
;
35 import org
.apache
.commons
.io
.filefilter
.NotFileFilter
;
36 import org
.apache
.commons
.io
.filefilter
.SuffixFileFilter
;
37 import org
.apache
.commons
.logging
.Log
;
38 import org
.apache
.commons
.logging
.LogFactory
;
39 import org
.argeo
.slc
.detached
.DetachedAnswer
;
40 import org
.argeo
.slc
.detached
.DetachedClient
;
41 import org
.argeo
.slc
.detached
.DetachedCommunication
;
42 import org
.argeo
.slc
.detached
.DetachedDriver
;
43 import org
.argeo
.slc
.detached
.DetachedException
;
44 import org
.argeo
.slc
.detached
.DetachedRequest
;
45 import org
.argeo
.slc
.detached
.DetachedXmlConverter
;
46 import org
.springframework
.beans
.factory
.InitializingBean
;
49 * Implements both <code>DetachedClient</code> and <code>DetachedDriver</code>
52 public class FileDriver
implements DetachedClient
, DetachedDriver
,
54 private final static Log log
= LogFactory
.getLog(FileDriver
.class);
55 private final static SimpleDateFormat sdf
= new SimpleDateFormat(
57 private final static MessageFormat mf
= new MessageFormat("{0,number,000}");
60 private File requestsDir
;
61 private File answersDir
;
62 private File processedRequestsDir
;
63 private File processedAnswersDir
;
64 private File cleanedRequestsDir
;
65 private File cleanedAnswersDir
;
67 private String lockFileExt
= "lck";
68 private FileFilter notLockFileFilter
= new NotFileFilter(
69 new SuffixFileFilter("." + lockFileExt
));
71 // Counters to avoid naming files with same prefix
72 private long lastSentTime
= 0;
73 private int counter
= 0;
75 private DetachedXmlConverter xmlConverter
= null;
77 private long receiveAnswerTimeout
= 10000l;
79 private boolean active
= true;
81 public synchronized DetachedRequest
receiveRequest() throws Exception
{
82 DetachedRequest request
= (DetachedRequest
) receiveFile(requestsDir
,
83 processedRequestsDir
, 0);
85 if (log
.isTraceEnabled())
86 log
.trace("Received detached request #" + request
.getUuid()
87 + " for ref '" + request
.getRef() + "', path="
92 public synchronized void sendAnswer(DetachedAnswer answer
) throws Exception
{
93 sendFile(answersDir
, answer
);
94 if (log
.isTraceEnabled())
95 log
.trace("Sent detached answer #" + answer
.getUuid());
98 public synchronized DetachedAnswer
receiveAnswer() throws Exception
{
99 DetachedAnswer answer
= (DetachedAnswer
) receiveFile(answersDir
,
100 processedAnswersDir
, getReceiveAnswerTimeout());
102 if (log
.isTraceEnabled())
103 log
.trace("Received detached answer #" + answer
.getUuid());
107 public synchronized void sendRequest(DetachedRequest request
)
109 sendFile(requestsDir
, request
);
110 if (log
.isTraceEnabled())
111 log
.trace("Sent detached request #" + request
.getUuid()
112 + " for ref '" + request
.getRef() + "', path="
113 + request
.getPath());
116 protected synchronized void sendFile(File dir
, DetachedCommunication detCom
)
119 if (getXmlConverter() != null)
125 Date nowDate
= new Date();
126 long nowMs
= nowDate
.getTime();
127 if (nowMs
== lastSentTime
) {
134 StringBuffer filePath
= new StringBuffer(dir
.getPath());
135 filePath
.append(File
.separatorChar
).append(sdf
.format(nowDate
)).append(
137 filePath
.append(mf
.format(new Object
[] { new Long(counter
) })).append(
139 filePath
.append(detCom
.getUuid()).append(ext
);
140 File file
= new File(filePath
.toString());
142 File lockFile
= createLockFile(file
);
143 if (getXmlConverter() != null) {// xml
144 FileOutputStream outFile
= new FileOutputStream(file
);
146 StreamResult result
= new StreamResult(outFile
);
147 getXmlConverter().marshallCommunication(detCom
, result
);
149 IOUtils
.closeQuietly(outFile
);
152 ObjectOutputStream out
= new ObjectOutputStream(
153 new FileOutputStream(file
));
155 out
.writeObject(detCom
);
157 IOUtils
.closeQuietly(out
);
165 * in ms, 0 is no timeout
167 protected synchronized DetachedCommunication
receiveFile(File dir
,
168 File processedDir
, long timeout
) throws Exception
{
169 long begin
= System
.currentTimeMillis();
171 while (file
== null && isActive()) {
173 throw new DetachedException("Dir " + dir
+ " does not exist.");
175 File
[] files
= dir
.listFiles(notLockFileFilter
);
176 if (files
.length
> 0)
181 } catch (InterruptedException e
) {
186 long duration
= System
.currentTimeMillis() - begin
;
187 if (timeout
!= 0 && duration
> timeout
) {
188 throw new DetachedException("Receive file timed out after "
194 log
.debug("DetachedDriver is not active. Leaving receiveFile");
198 File lockFile
= nameLockFile(file
);
199 while (lockFile
.exists())
200 // FIXME: implements time out
204 final DetachedCommunication detCom
;
205 if (FilenameUtils
.getExtension(file
.getName()).equals("xml")) {
206 if (getXmlConverter() == null)
207 throw new DetachedException("No XML converter defined.");
208 FileInputStream in
= new FileInputStream(file
);
210 StreamSource source
= new StreamSource(in
);
211 detCom
= getXmlConverter().unmarshallCommunication(source
);
213 IOUtils
.closeQuietly(in
);
216 ObjectInputStream in
= new ObjectInputStream(new FileInputStream(
219 detCom
= (DetachedCommunication
) in
.readObject();
221 IOUtils
.closeQuietly(in
);
224 // Move to processed dir
225 FileUtils
.moveFileToDirectory(file
, processedDir
, false);
229 public synchronized void stop() {
230 log
.debug("Stopping Detached Driver");
235 private synchronized boolean isActive() {
239 protected File
createLockFile(File file
) {
240 File lockFile
= nameLockFile(file
);
242 lockFile
.createNewFile();
243 } catch (IOException e
) {
244 throw new DetachedException("Cannot create lock file " + lockFile
);
249 protected File
nameLockFile(File file
) {
250 return new File(file
.getAbsolutePath() + "." + lockFileExt
);
253 public void setBaseDir(File baseDir
) {
254 this.baseDir
= baseDir
;
257 private void createIfNotExist(File dir
) {
259 log
.warn("Dir " + dir
.getAbsolutePath()
260 + " does not exist. Creating it...");
265 public void afterPropertiesSet() throws Exception
{
266 this.requestsDir
= new File(baseDir
.getAbsolutePath() + File
.separator
268 this.answersDir
= new File(baseDir
.getAbsolutePath() + File
.separator
270 this.processedRequestsDir
= new File(baseDir
.getAbsolutePath()
271 + File
.separator
+ "processed" + File
.separator
+ "requests");
272 this.processedAnswersDir
= new File(baseDir
.getAbsolutePath()
273 + File
.separator
+ "processed" + File
.separator
+ "answers");
274 this.cleanedRequestsDir
= new File(baseDir
.getAbsolutePath()
275 + File
.separator
+ "cleaned" + File
.separator
+ "requests");
276 this.cleanedAnswersDir
= new File(baseDir
.getAbsolutePath()
277 + File
.separator
+ "cleaned" + File
.separator
+ "answers");
279 createIfNotExist(requestsDir
);
280 createIfNotExist(answersDir
);
281 createIfNotExist(processedRequestsDir
);
282 createIfNotExist(processedAnswersDir
);
283 createIfNotExist(cleanedRequestsDir
);
284 createIfNotExist(cleanedAnswersDir
);
285 if (log
.isDebugEnabled())
286 log
.debug("Detached File Driver initialized on " + baseDir
);
289 public void cleanPreviousRuns() throws Exception
{
291 // Clean requests and answers from previous builds
292 File
[] remainingRequests
= requestsDir
.listFiles();
293 for (int i
= 0; i
< remainingRequests
.length
; i
++) {
294 FileUtils
.moveFileToDirectory(remainingRequests
[i
],
295 cleanedRequestsDir
, false);
298 File
[] remainingAnswers
= answersDir
.listFiles();
299 for (int i
= 0; i
< remainingAnswers
.length
; i
++) {
300 FileUtils
.moveFileToDirectory(remainingAnswers
[i
],
301 cleanedAnswersDir
, false);
303 log
.info("Clean previous runs of File Driver on " + baseDir
);
307 public long getReceiveAnswerTimeout() {
308 return receiveAnswerTimeout
;
311 public void setReceiveAnswerTimeout(long receiveAnswerTimeout
) {
312 this.receiveAnswerTimeout
= receiveAnswerTimeout
;
315 public DetachedXmlConverter
getXmlConverter() {
319 public void setXmlConverter(DetachedXmlConverter xmlConverter
) {
320 this.xmlConverter
= xmlConverter
;