2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org
.argeo
.slc
.detached
.drivers
;
20 import java
.io
.FileFilter
;
21 import java
.io
.FileInputStream
;
22 import java
.io
.FileOutputStream
;
23 import java
.io
.IOException
;
24 import java
.io
.ObjectInputStream
;
25 import java
.io
.ObjectOutputStream
;
26 import java
.text
.MessageFormat
;
27 import java
.text
.SimpleDateFormat
;
28 import java
.util
.Date
;
30 import javax
.xml
.transform
.stream
.StreamResult
;
31 import javax
.xml
.transform
.stream
.StreamSource
;
33 import org
.apache
.commons
.io
.FileUtils
;
34 import org
.apache
.commons
.io
.FilenameUtils
;
35 import org
.apache
.commons
.io
.IOUtils
;
36 import org
.apache
.commons
.io
.filefilter
.NotFileFilter
;
37 import org
.apache
.commons
.io
.filefilter
.SuffixFileFilter
;
38 import org
.apache
.commons
.logging
.Log
;
39 import org
.apache
.commons
.logging
.LogFactory
;
40 import org
.argeo
.slc
.detached
.DetachedAnswer
;
41 import org
.argeo
.slc
.detached
.DetachedClient
;
42 import org
.argeo
.slc
.detached
.DetachedCommunication
;
43 import org
.argeo
.slc
.detached
.DetachedDriver
;
44 import org
.argeo
.slc
.detached
.DetachedException
;
45 import org
.argeo
.slc
.detached
.DetachedRequest
;
46 import org
.argeo
.slc
.detached
.DetachedXmlConverter
;
47 import org
.springframework
.beans
.factory
.InitializingBean
;
50 * Implements both <code>DetachedClient</code> and <code>DetachedDriver</code>
53 public class FileDriver
implements DetachedClient
, DetachedDriver
,
55 private final static Log log
= LogFactory
.getLog(FileDriver
.class);
56 private final static SimpleDateFormat sdf
= new SimpleDateFormat(
58 private final static MessageFormat mf
= new MessageFormat("{0,number,000}");
61 private File requestsDir
;
62 private File answersDir
;
63 private File processedRequestsDir
;
64 private File processedAnswersDir
;
65 private File cleanedRequestsDir
;
66 private File cleanedAnswersDir
;
68 private String lockFileExt
= "lck";
69 private FileFilter notLockFileFilter
= new NotFileFilter(
70 new SuffixFileFilter("." + lockFileExt
));
72 // Counters to avoid naming files with same prefix
73 private long lastSentTime
= 0;
74 private int counter
= 0;
76 private DetachedXmlConverter xmlConverter
= null;
78 private long receiveAnswerTimeout
= 10000l;
80 private boolean active
= true;
82 public synchronized DetachedRequest
receiveRequest() throws Exception
{
83 DetachedRequest request
= (DetachedRequest
) receiveFile(requestsDir
,
84 processedRequestsDir
, 0);
86 if (log
.isTraceEnabled())
87 log
.trace("Received detached request #" + request
.getUuid()
88 + " for ref '" + request
.getRef() + "', path="
93 public synchronized void sendAnswer(DetachedAnswer answer
) throws Exception
{
94 sendFile(answersDir
, answer
);
95 if (log
.isTraceEnabled())
96 log
.trace("Sent detached answer #" + answer
.getUuid());
99 public synchronized DetachedAnswer
receiveAnswer() throws Exception
{
100 DetachedAnswer answer
= (DetachedAnswer
) receiveFile(answersDir
,
101 processedAnswersDir
, getReceiveAnswerTimeout());
103 if (log
.isTraceEnabled())
104 log
.trace("Received detached answer #" + answer
.getUuid());
108 public synchronized void sendRequest(DetachedRequest request
)
110 sendFile(requestsDir
, request
);
111 if (log
.isTraceEnabled())
112 log
.trace("Sent detached request #" + request
.getUuid()
113 + " for ref '" + request
.getRef() + "', path="
114 + request
.getPath());
117 protected synchronized void sendFile(File dir
, DetachedCommunication detCom
)
120 if (getXmlConverter() != null)
126 Date nowDate
= new Date();
127 long nowMs
= nowDate
.getTime();
128 if (nowMs
== lastSentTime
) {
135 StringBuffer filePath
= new StringBuffer(dir
.getPath());
136 filePath
.append(File
.separatorChar
).append(sdf
.format(nowDate
)).append(
138 filePath
.append(mf
.format(new Object
[] { new Long(counter
) })).append(
140 filePath
.append(detCom
.getUuid()).append(ext
);
141 File file
= new File(filePath
.toString());
143 File lockFile
= createLockFile(file
);
144 if (getXmlConverter() != null) {// xml
145 FileOutputStream outFile
= new FileOutputStream(file
);
147 StreamResult result
= new StreamResult(outFile
);
148 getXmlConverter().marshallCommunication(detCom
, result
);
150 IOUtils
.closeQuietly(outFile
);
153 ObjectOutputStream out
= new ObjectOutputStream(
154 new FileOutputStream(file
));
156 out
.writeObject(detCom
);
158 IOUtils
.closeQuietly(out
);
166 * in ms, 0 is no timeout
168 protected synchronized DetachedCommunication
receiveFile(File dir
,
169 File processedDir
, long timeout
) throws Exception
{
170 long begin
= System
.currentTimeMillis();
172 while (file
== null && isActive()) {
174 throw new DetachedException("Dir " + dir
+ " does not exist.");
176 File
[] files
= dir
.listFiles(notLockFileFilter
);
177 if (files
.length
> 0)
182 } catch (InterruptedException e
) {
187 long duration
= System
.currentTimeMillis() - begin
;
188 if (timeout
!= 0 && duration
> timeout
) {
189 throw new DetachedException("Receive file timed out after "
195 log
.debug("DetachedDriver is not active. Leaving receiveFile");
199 File lockFile
= nameLockFile(file
);
200 while (lockFile
.exists())
201 // FIXME: implements time out
205 final DetachedCommunication detCom
;
206 if (FilenameUtils
.getExtension(file
.getName()).equals("xml")) {
207 if (getXmlConverter() == null)
208 throw new DetachedException("No XML converter defined.");
209 FileInputStream in
= new FileInputStream(file
);
211 StreamSource source
= new StreamSource(in
);
212 detCom
= getXmlConverter().unmarshallCommunication(source
);
214 IOUtils
.closeQuietly(in
);
217 ObjectInputStream in
= new ObjectInputStream(new FileInputStream(
220 detCom
= (DetachedCommunication
) in
.readObject();
222 IOUtils
.closeQuietly(in
);
225 // Move to processed dir
226 FileUtils
.moveFileToDirectory(file
, processedDir
, false);
230 public synchronized void stop() {
231 log
.debug("Stopping Detached Driver");
236 private synchronized boolean isActive() {
240 protected File
createLockFile(File file
) {
241 File lockFile
= nameLockFile(file
);
243 lockFile
.createNewFile();
244 } catch (IOException e
) {
245 throw new DetachedException("Cannot create lock file " + lockFile
);
250 protected File
nameLockFile(File file
) {
251 return new File(file
.getAbsolutePath() + "." + lockFileExt
);
254 public void setBaseDir(File baseDir
) {
255 this.baseDir
= baseDir
;
258 private void createIfNotExist(File dir
) {
260 log
.warn("Dir " + dir
.getAbsolutePath()
261 + " does not exist. Creating it...");
266 public void afterPropertiesSet() throws Exception
{
267 this.requestsDir
= new File(baseDir
.getAbsolutePath() + File
.separator
269 this.answersDir
= new File(baseDir
.getAbsolutePath() + File
.separator
271 this.processedRequestsDir
= new File(baseDir
.getAbsolutePath()
272 + File
.separator
+ "processed" + File
.separator
+ "requests");
273 this.processedAnswersDir
= new File(baseDir
.getAbsolutePath()
274 + File
.separator
+ "processed" + File
.separator
+ "answers");
275 this.cleanedRequestsDir
= new File(baseDir
.getAbsolutePath()
276 + File
.separator
+ "cleaned" + File
.separator
+ "requests");
277 this.cleanedAnswersDir
= new File(baseDir
.getAbsolutePath()
278 + File
.separator
+ "cleaned" + File
.separator
+ "answers");
280 createIfNotExist(requestsDir
);
281 createIfNotExist(answersDir
);
282 createIfNotExist(processedRequestsDir
);
283 createIfNotExist(processedAnswersDir
);
284 createIfNotExist(cleanedRequestsDir
);
285 createIfNotExist(cleanedAnswersDir
);
286 if (log
.isDebugEnabled())
287 log
.debug("Detached File Driver initialized on " + baseDir
);
290 public void cleanPreviousRuns() throws Exception
{
292 // Clean requests and answers from previous builds
293 File
[] remainingRequests
= requestsDir
.listFiles();
294 for (int i
= 0; i
< remainingRequests
.length
; i
++) {
295 FileUtils
.moveFileToDirectory(remainingRequests
[i
],
296 cleanedRequestsDir
, false);
299 File
[] remainingAnswers
= answersDir
.listFiles();
300 for (int i
= 0; i
< remainingAnswers
.length
; i
++) {
301 FileUtils
.moveFileToDirectory(remainingAnswers
[i
],
302 cleanedAnswersDir
, false);
304 log
.info("Clean previous runs of File Driver on " + baseDir
);
308 public long getReceiveAnswerTimeout() {
309 return receiveAnswerTimeout
;
312 public void setReceiveAnswerTimeout(long receiveAnswerTimeout
) {
313 this.receiveAnswerTimeout
= receiveAnswerTimeout
;
316 public DetachedXmlConverter
getXmlConverter() {
320 public void setXmlConverter(DetachedXmlConverter xmlConverter
) {
321 this.xmlConverter
= xmlConverter
;