]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/FileDriver.java
Improve detached
[gpl/argeo-slc.git] / runtime / org.argeo.slc.detached / src / main / java / org / argeo / slc / detached / drivers / FileDriver.java
1 package org.argeo.slc.detached.drivers;
2
3 import java.io.File;
4 import java.io.FileFilter;
5 import java.io.FileInputStream;
6 import java.io.FileOutputStream;
7 import java.io.IOException;
8 import java.io.ObjectInputStream;
9 import java.io.ObjectOutputStream;
10 import java.text.MessageFormat;
11 import java.text.SimpleDateFormat;
12 import java.util.Date;
13
14 import javax.xml.transform.stream.StreamResult;
15 import javax.xml.transform.stream.StreamSource;
16
17 import org.apache.commons.io.FileUtils;
18 import org.apache.commons.io.FilenameUtils;
19 import org.apache.commons.io.IOUtils;
20 import org.apache.commons.io.filefilter.NotFileFilter;
21 import org.apache.commons.io.filefilter.SuffixFileFilter;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.argeo.slc.detached.DetachedAnswer;
25 import org.argeo.slc.detached.DetachedClient;
26 import org.argeo.slc.detached.DetachedCommunication;
27 import org.argeo.slc.detached.DetachedException;
28 import org.argeo.slc.detached.DetachedRequest;
29 import org.springframework.beans.factory.InitializingBean;
30
31 public class FileDriver extends AbstractDriver implements DetachedClient,
32 InitializingBean {
33 private final static Log log = LogFactory.getLog(FileDriver.class);
34 private final static SimpleDateFormat sdf = new SimpleDateFormat(
35 "yyMMdd_HHmmss_SSS");
36 private final static MessageFormat mf = new MessageFormat("{0,number,000}");
37
38 private File baseDir;
39 private File requestsDir;
40 private File answersDir;
41 private File processedRequestsDir;
42 private File processedAnswersDir;
43 private File cleanedRequestsDir;
44 private File cleanedAnswersDir;
45
46 private String lockFileExt = "lck";
47 private FileFilter notLockFileFilter = new NotFileFilter(
48 new SuffixFileFilter("." + lockFileExt));
49
50 // Counters to avoid naming files with same prefix
51 private long lastSentTime = 0;
52 private int counter = 0;
53
54 public synchronized DetachedRequest receiveRequest() throws Exception {
55 DetachedRequest request = (DetachedRequest) receiveFile(requestsDir,
56 processedRequestsDir, 0);
57 if (request != null)
58 if (log.isTraceEnabled())
59 log.trace("Received detached request #" + request.getUuid()
60 + " for ref '" + request.getRef() + "', path="
61 + request.getPath());
62 return request;
63 }
64
65 public synchronized void sendAnswer(DetachedAnswer answer) throws Exception {
66 sendFile(answersDir, answer);
67 if (log.isTraceEnabled())
68 log.trace("Sent detached answer #" + answer.getUuid());
69 }
70
71 public synchronized DetachedAnswer receiveAnswer() throws Exception {
72 DetachedAnswer answer = (DetachedAnswer) receiveFile(answersDir,
73 processedAnswersDir, getReceiveAnswerTimeout());
74 if (answer != null)
75 if (log.isTraceEnabled())
76 log.trace("Received detached answer #" + answer.getUuid());
77 return answer;
78 }
79
80 public synchronized void sendRequest(DetachedRequest request) throws Exception {
81 sendFile(requestsDir, request);
82 if (log.isTraceEnabled())
83 log.trace("Sent detached request #" + request.getUuid()
84 + " for ref '" + request.getRef() + "', path="
85 + request.getPath());
86 }
87
88 protected synchronized void sendFile(File dir, DetachedCommunication detCom)
89 throws Exception {
90 final String ext;
91 if (getXmlConverter() != null)
92 ext = ".xml";
93 else
94 ext = "";
95
96 // Check counters
97 Date nowDate = new Date();
98 long nowMs = nowDate.getTime();
99 if (nowMs == lastSentTime) {
100 counter++;
101 } else {
102 counter = 0;
103 }
104
105 // Create file path
106 StringBuffer filePath = new StringBuffer(dir.getPath());
107 filePath.append(File.separatorChar).append(sdf.format(nowDate)).append(
108 '-');
109 filePath.append(mf.format(new Object[] { new Long(counter) })).append(
110 '-');
111 filePath.append(detCom.getUuid()).append(ext);
112 File file = new File(filePath.toString());
113
114 File lockFile = createLockFile(file);
115 if (getXmlConverter() != null) {// xml
116 FileOutputStream outFile = new FileOutputStream(file);
117 try {
118 StreamResult result = new StreamResult(outFile);
119 getXmlConverter().marshallCommunication(detCom, result);
120 } finally {
121 IOUtils.closeQuietly(outFile);
122 }
123 } else {// serialize
124 ObjectOutputStream out = new ObjectOutputStream(
125 new FileOutputStream(file));
126 try {
127 out.writeObject(detCom);
128 } finally {
129 IOUtils.closeQuietly(out);
130 }
131 }
132 lockFile.delete();
133 }
134
135 /**
136 * @param timeout
137 * in ms, 0 is no timeout
138 */
139 protected synchronized DetachedCommunication receiveFile(File dir,
140 File processedDir, long timeout) throws Exception {
141 long begin = System.currentTimeMillis();
142 File file = null;
143 while (file == null && isActive()) {
144 if (!dir.exists())
145 throw new DetachedException("Dir " + dir + " does not exist.");
146
147 File[] files = dir.listFiles(notLockFileFilter);
148 if (files.length > 0)
149 file = files[0];
150 else {
151 try {
152 wait(100);
153 } catch (InterruptedException e) {
154 // silent
155 }
156 }
157
158 long duration = System.currentTimeMillis() - begin;
159 if (timeout != 0 && duration > timeout) {
160 throw new DetachedException("Receive file timed out after "
161 + duration + "ms.");
162 }
163 }
164
165 if (!isActive())
166 return null;
167
168 File lockFile = nameLockFile(file);
169 while (lockFile.exists())
170 // FIXME: implements time out
171 Thread.sleep(100);
172
173 // Read the file
174 final DetachedCommunication detCom;
175 if (FilenameUtils.getExtension(file.getName()).equals("xml")) {
176 if (getXmlConverter() == null)
177 throw new DetachedException("No XML converter defined.");
178 FileInputStream in = new FileInputStream(file);
179 try {
180 StreamSource source = new StreamSource(in);
181 detCom = getXmlConverter().unmarshallCommunication(source);
182 } finally {
183 IOUtils.closeQuietly(in);
184 }
185 } else {
186 ObjectInputStream in = new ObjectInputStream(new FileInputStream(
187 file));
188 try {
189 detCom = (DetachedCommunication) in.readObject();
190 } finally {
191 IOUtils.closeQuietly(in);
192 }
193 }
194 // Move to processed dir
195 FileUtils.moveFileToDirectory(file, processedDir, false);
196 return detCom;
197 }
198
199 protected File createLockFile(File file) {
200 File lockFile = nameLockFile(file);
201 try {
202 lockFile.createNewFile();
203 } catch (IOException e) {
204 throw new DetachedException("Cannot create lock file " + lockFile);
205 }
206 return lockFile;
207 }
208
209 protected File nameLockFile(File file) {
210 return new File(file.getAbsolutePath() + "." + lockFileExt);
211 }
212
213 public void setBaseDir(File baseDir) {
214 this.baseDir = baseDir;
215 }
216
217 private void createIfNotExist(File dir) {
218 if (!dir.exists()) {
219 log.warn("Dir " + dir.getAbsolutePath()
220 + " does not exist. Creating it...");
221 dir.mkdirs();
222 }
223 }
224
225 public void afterPropertiesSet() throws Exception {
226 this.requestsDir = new File(baseDir.getAbsolutePath() + File.separator
227 + "requests");
228 this.answersDir = new File(baseDir.getAbsolutePath() + File.separator
229 + "answers");
230 this.processedRequestsDir = new File(baseDir.getAbsolutePath()
231 + File.separator + "processed" + File.separator + "requests");
232 this.processedAnswersDir = new File(baseDir.getAbsolutePath()
233 + File.separator + "processed" + File.separator + "answers");
234 this.cleanedRequestsDir = new File(baseDir.getAbsolutePath()
235 + File.separator + "cleaned" + File.separator + "requests");
236 this.cleanedAnswersDir = new File(baseDir.getAbsolutePath()
237 + File.separator + "cleaned" + File.separator + "answers");
238
239 createIfNotExist(requestsDir);
240 createIfNotExist(answersDir);
241 createIfNotExist(processedRequestsDir);
242 createIfNotExist(processedAnswersDir);
243 createIfNotExist(cleanedRequestsDir);
244 createIfNotExist(cleanedAnswersDir);
245 log.info("Detached File Driver initialized on " + baseDir);
246 }
247
248 public void cleanPreviousRuns() throws Exception {
249
250 // Clean requests and answers from previous builds
251 File[] remainingRequests = requestsDir.listFiles();
252 for (int i = 0; i < remainingRequests.length; i++) {
253 FileUtils.moveFileToDirectory(remainingRequests[i],
254 cleanedRequestsDir, false);
255 }
256
257 File[] remainingAnswers = answersDir.listFiles();
258 for (int i = 0; i < remainingAnswers.length; i++) {
259 FileUtils.moveFileToDirectory(remainingAnswers[i],
260 cleanedAnswersDir, false);
261 }
262 log.info("Clean previous runs of File Driver on " + baseDir);
263
264 }
265
266 }