]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/FileDriver.java
Update headers
[gpl/argeo-slc.git] / runtime / org.argeo.slc.detached / src / main / java / org / argeo / slc / detached / drivers / FileDriver.java
1 /*
2 * Copyright (C) 2007-2012 Mathieu Baudier
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.argeo.slc.detached.drivers;
17
18 import java.io.File;
19 import java.io.FileFilter;
20 import java.io.FileInputStream;
21 import java.io.FileOutputStream;
22 import java.io.IOException;
23 import java.io.ObjectInputStream;
24 import java.io.ObjectOutputStream;
25 import java.text.MessageFormat;
26 import java.text.SimpleDateFormat;
27 import java.util.Date;
28
29 import javax.xml.transform.stream.StreamResult;
30 import javax.xml.transform.stream.StreamSource;
31
32 import org.apache.commons.io.FileUtils;
33 import org.apache.commons.io.FilenameUtils;
34 import org.apache.commons.io.IOUtils;
35 import org.apache.commons.io.filefilter.NotFileFilter;
36 import org.apache.commons.io.filefilter.SuffixFileFilter;
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.argeo.slc.detached.DetachedAnswer;
40 import org.argeo.slc.detached.DetachedClient;
41 import org.argeo.slc.detached.DetachedCommunication;
42 import org.argeo.slc.detached.DetachedDriver;
43 import org.argeo.slc.detached.DetachedException;
44 import org.argeo.slc.detached.DetachedRequest;
45 import org.argeo.slc.detached.DetachedXmlConverter;
46 import org.springframework.beans.factory.InitializingBean;
47
48 /**
49 * Implements both <code>DetachedClient</code> and <code>DetachedDriver</code>
50 * using File protocol
51 */
52 public class FileDriver implements DetachedClient, DetachedDriver,
53 InitializingBean {
54 private final static Log log = LogFactory.getLog(FileDriver.class);
55 private final static SimpleDateFormat sdf = new SimpleDateFormat(
56 "yyMMdd_HHmmss_SSS");
57 private final static MessageFormat mf = new MessageFormat("{0,number,000}");
58
59 private File baseDir;
60 private File requestsDir;
61 private File answersDir;
62 private File processedRequestsDir;
63 private File processedAnswersDir;
64 private File cleanedRequestsDir;
65 private File cleanedAnswersDir;
66
67 private String lockFileExt = "lck";
68 private FileFilter notLockFileFilter = new NotFileFilter(
69 new SuffixFileFilter("." + lockFileExt));
70
71 // Counters to avoid naming files with same prefix
72 private long lastSentTime = 0;
73 private int counter = 0;
74
75 private DetachedXmlConverter xmlConverter = null;
76
77 private long receiveAnswerTimeout = 10000l;
78
79 private boolean active = true;
80
81 public synchronized DetachedRequest receiveRequest() throws Exception {
82 DetachedRequest request = (DetachedRequest) receiveFile(requestsDir,
83 processedRequestsDir, 0);
84 if (request != null)
85 if (log.isTraceEnabled())
86 log.trace("Received detached request #" + request.getUuid()
87 + " for ref '" + request.getRef() + "', path="
88 + request.getPath());
89 return request;
90 }
91
92 public synchronized void sendAnswer(DetachedAnswer answer) throws Exception {
93 sendFile(answersDir, answer);
94 if (log.isTraceEnabled())
95 log.trace("Sent detached answer #" + answer.getUuid());
96 }
97
98 public synchronized DetachedAnswer receiveAnswer() throws Exception {
99 DetachedAnswer answer = (DetachedAnswer) receiveFile(answersDir,
100 processedAnswersDir, getReceiveAnswerTimeout());
101 if (answer != null)
102 if (log.isTraceEnabled())
103 log.trace("Received detached answer #" + answer.getUuid());
104 return answer;
105 }
106
107 public synchronized void sendRequest(DetachedRequest request)
108 throws Exception {
109 sendFile(requestsDir, request);
110 if (log.isTraceEnabled())
111 log.trace("Sent detached request #" + request.getUuid()
112 + " for ref '" + request.getRef() + "', path="
113 + request.getPath());
114 }
115
116 protected synchronized void sendFile(File dir, DetachedCommunication detCom)
117 throws Exception {
118 final String ext;
119 if (getXmlConverter() != null)
120 ext = ".xml";
121 else
122 ext = "";
123
124 // Check counters
125 Date nowDate = new Date();
126 long nowMs = nowDate.getTime();
127 if (nowMs == lastSentTime) {
128 counter++;
129 } else {
130 counter = 0;
131 }
132
133 // Create file path
134 StringBuffer filePath = new StringBuffer(dir.getPath());
135 filePath.append(File.separatorChar).append(sdf.format(nowDate)).append(
136 '-');
137 filePath.append(mf.format(new Object[] { new Long(counter) })).append(
138 '-');
139 filePath.append(detCom.getUuid()).append(ext);
140 File file = new File(filePath.toString());
141
142 File lockFile = createLockFile(file);
143 if (getXmlConverter() != null) {// xml
144 FileOutputStream outFile = new FileOutputStream(file);
145 try {
146 StreamResult result = new StreamResult(outFile);
147 getXmlConverter().marshallCommunication(detCom, result);
148 } finally {
149 IOUtils.closeQuietly(outFile);
150 }
151 } else {// serialize
152 ObjectOutputStream out = new ObjectOutputStream(
153 new FileOutputStream(file));
154 try {
155 out.writeObject(detCom);
156 } finally {
157 IOUtils.closeQuietly(out);
158 }
159 }
160 lockFile.delete();
161 }
162
163 /**
164 * @param timeout
165 * in ms, 0 is no timeout
166 */
167 protected synchronized DetachedCommunication receiveFile(File dir,
168 File processedDir, long timeout) throws Exception {
169 long begin = System.currentTimeMillis();
170 File file = null;
171 while (file == null && isActive()) {
172 if (!dir.exists())
173 throw new DetachedException("Dir " + dir + " does not exist.");
174
175 File[] files = dir.listFiles(notLockFileFilter);
176 if (files.length > 0)
177 file = files[0];
178 else {
179 try {
180 wait(100);
181 } catch (InterruptedException e) {
182 // silent
183 }
184 }
185
186 long duration = System.currentTimeMillis() - begin;
187 if (timeout != 0 && duration > timeout) {
188 throw new DetachedException("Receive file timed out after "
189 + duration + "ms.");
190 }
191 }
192
193 if (!isActive()) {
194 log.debug("DetachedDriver is not active. Leaving receiveFile");
195 return null;
196 }
197
198 File lockFile = nameLockFile(file);
199 while (lockFile.exists())
200 // FIXME: implements time out
201 Thread.sleep(100);
202
203 // Read the file
204 final DetachedCommunication detCom;
205 if (FilenameUtils.getExtension(file.getName()).equals("xml")) {
206 if (getXmlConverter() == null)
207 throw new DetachedException("No XML converter defined.");
208 FileInputStream in = new FileInputStream(file);
209 try {
210 StreamSource source = new StreamSource(in);
211 detCom = getXmlConverter().unmarshallCommunication(source);
212 } finally {
213 IOUtils.closeQuietly(in);
214 }
215 } else {
216 ObjectInputStream in = new ObjectInputStream(new FileInputStream(
217 file));
218 try {
219 detCom = (DetachedCommunication) in.readObject();
220 } finally {
221 IOUtils.closeQuietly(in);
222 }
223 }
224 // Move to processed dir
225 FileUtils.moveFileToDirectory(file, processedDir, false);
226 return detCom;
227 }
228
229 public synchronized void stop() {
230 log.debug("Stopping Detached Driver");
231 active = false;
232 notifyAll();
233 }
234
235 private synchronized boolean isActive() {
236 return active;
237 }
238
239 protected File createLockFile(File file) {
240 File lockFile = nameLockFile(file);
241 try {
242 lockFile.createNewFile();
243 } catch (IOException e) {
244 throw new DetachedException("Cannot create lock file " + lockFile);
245 }
246 return lockFile;
247 }
248
249 protected File nameLockFile(File file) {
250 return new File(file.getAbsolutePath() + "." + lockFileExt);
251 }
252
253 public void setBaseDir(File baseDir) {
254 this.baseDir = baseDir;
255 }
256
257 private void createIfNotExist(File dir) {
258 if (!dir.exists()) {
259 log.warn("Dir " + dir.getAbsolutePath()
260 + " does not exist. Creating it...");
261 dir.mkdirs();
262 }
263 }
264
265 public void afterPropertiesSet() throws Exception {
266 this.requestsDir = new File(baseDir.getAbsolutePath() + File.separator
267 + "requests");
268 this.answersDir = new File(baseDir.getAbsolutePath() + File.separator
269 + "answers");
270 this.processedRequestsDir = new File(baseDir.getAbsolutePath()
271 + File.separator + "processed" + File.separator + "requests");
272 this.processedAnswersDir = new File(baseDir.getAbsolutePath()
273 + File.separator + "processed" + File.separator + "answers");
274 this.cleanedRequestsDir = new File(baseDir.getAbsolutePath()
275 + File.separator + "cleaned" + File.separator + "requests");
276 this.cleanedAnswersDir = new File(baseDir.getAbsolutePath()
277 + File.separator + "cleaned" + File.separator + "answers");
278
279 createIfNotExist(requestsDir);
280 createIfNotExist(answersDir);
281 createIfNotExist(processedRequestsDir);
282 createIfNotExist(processedAnswersDir);
283 createIfNotExist(cleanedRequestsDir);
284 createIfNotExist(cleanedAnswersDir);
285 if (log.isDebugEnabled())
286 log.debug("Detached File Driver initialized on " + baseDir);
287 }
288
289 public void cleanPreviousRuns() throws Exception {
290
291 // Clean requests and answers from previous builds
292 File[] remainingRequests = requestsDir.listFiles();
293 for (int i = 0; i < remainingRequests.length; i++) {
294 FileUtils.moveFileToDirectory(remainingRequests[i],
295 cleanedRequestsDir, false);
296 }
297
298 File[] remainingAnswers = answersDir.listFiles();
299 for (int i = 0; i < remainingAnswers.length; i++) {
300 FileUtils.moveFileToDirectory(remainingAnswers[i],
301 cleanedAnswersDir, false);
302 }
303 log.info("Clean previous runs of File Driver on " + baseDir);
304
305 }
306
307 public long getReceiveAnswerTimeout() {
308 return receiveAnswerTimeout;
309 }
310
311 public void setReceiveAnswerTimeout(long receiveAnswerTimeout) {
312 this.receiveAnswerTimeout = receiveAnswerTimeout;
313 }
314
315 public DetachedXmlConverter getXmlConverter() {
316 return xmlConverter;
317 }
318
319 public void setXmlConverter(DetachedXmlConverter xmlConverter) {
320 this.xmlConverter = xmlConverter;
321 }
322 }