]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/drivers/FileDriver.java
Bug 299 - improve Detached Design (DetachServer added)
[gpl/argeo-slc.git] / runtime / org.argeo.slc.detached / src / main / java / org / argeo / slc / detached / drivers / FileDriver.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.detached.drivers;
18
19 import java.io.File;
20 import java.io.FileFilter;
21 import java.io.FileInputStream;
22 import java.io.FileOutputStream;
23 import java.io.IOException;
24 import java.io.ObjectInputStream;
25 import java.io.ObjectOutputStream;
26 import java.text.MessageFormat;
27 import java.text.SimpleDateFormat;
28 import java.util.Date;
29
30 import javax.xml.transform.stream.StreamResult;
31 import javax.xml.transform.stream.StreamSource;
32
33 import org.apache.commons.io.FileUtils;
34 import org.apache.commons.io.FilenameUtils;
35 import org.apache.commons.io.IOUtils;
36 import org.apache.commons.io.filefilter.NotFileFilter;
37 import org.apache.commons.io.filefilter.SuffixFileFilter;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.argeo.slc.detached.DetachedAnswer;
41 import org.argeo.slc.detached.DetachedClient;
42 import org.argeo.slc.detached.DetachedCommunication;
43 import org.argeo.slc.detached.DetachedDriver;
44 import org.argeo.slc.detached.DetachedException;
45 import org.argeo.slc.detached.DetachedRequest;
46 import org.argeo.slc.detached.DetachedXmlConverter;
47 import org.springframework.beans.factory.InitializingBean;
48
49 /**
50 * Implements both <code>DetachedClient</code> and <code>DetachedDriver</code>
51 * using File protocol
52 */
53 public class FileDriver implements DetachedClient, DetachedDriver,
54 InitializingBean {
55 private final static Log log = LogFactory.getLog(FileDriver.class);
56 private final static SimpleDateFormat sdf = new SimpleDateFormat(
57 "yyMMdd_HHmmss_SSS");
58 private final static MessageFormat mf = new MessageFormat("{0,number,000}");
59
60 private File baseDir;
61 private File requestsDir;
62 private File answersDir;
63 private File processedRequestsDir;
64 private File processedAnswersDir;
65 private File cleanedRequestsDir;
66 private File cleanedAnswersDir;
67
68 private String lockFileExt = "lck";
69 private FileFilter notLockFileFilter = new NotFileFilter(
70 new SuffixFileFilter("." + lockFileExt));
71
72 // Counters to avoid naming files with same prefix
73 private long lastSentTime = 0;
74 private int counter = 0;
75
76 private DetachedXmlConverter xmlConverter = null;
77
78 private long receiveAnswerTimeout = 10000l;
79
80 private boolean active = true;
81
82 public synchronized DetachedRequest receiveRequest() throws Exception {
83 DetachedRequest request = (DetachedRequest) receiveFile(requestsDir,
84 processedRequestsDir, 0);
85 if (request != null)
86 if (log.isTraceEnabled())
87 log.trace("Received detached request #" + request.getUuid()
88 + " for ref '" + request.getRef() + "', path="
89 + request.getPath());
90 return request;
91 }
92
93 public synchronized void sendAnswer(DetachedAnswer answer) throws Exception {
94 sendFile(answersDir, answer);
95 if (log.isTraceEnabled())
96 log.trace("Sent detached answer #" + answer.getUuid());
97 }
98
99 public synchronized DetachedAnswer receiveAnswer() throws Exception {
100 DetachedAnswer answer = (DetachedAnswer) receiveFile(answersDir,
101 processedAnswersDir, getReceiveAnswerTimeout());
102 if (answer != null)
103 if (log.isTraceEnabled())
104 log.trace("Received detached answer #" + answer.getUuid());
105 return answer;
106 }
107
108 public synchronized void sendRequest(DetachedRequest request)
109 throws Exception {
110 sendFile(requestsDir, request);
111 if (log.isTraceEnabled())
112 log.trace("Sent detached request #" + request.getUuid()
113 + " for ref '" + request.getRef() + "', path="
114 + request.getPath());
115 }
116
117 protected synchronized void sendFile(File dir, DetachedCommunication detCom)
118 throws Exception {
119 final String ext;
120 if (getXmlConverter() != null)
121 ext = ".xml";
122 else
123 ext = "";
124
125 // Check counters
126 Date nowDate = new Date();
127 long nowMs = nowDate.getTime();
128 if (nowMs == lastSentTime) {
129 counter++;
130 } else {
131 counter = 0;
132 }
133
134 // Create file path
135 StringBuffer filePath = new StringBuffer(dir.getPath());
136 filePath.append(File.separatorChar).append(sdf.format(nowDate)).append(
137 '-');
138 filePath.append(mf.format(new Object[] { new Long(counter) })).append(
139 '-');
140 filePath.append(detCom.getUuid()).append(ext);
141 File file = new File(filePath.toString());
142
143 File lockFile = createLockFile(file);
144 if (getXmlConverter() != null) {// xml
145 FileOutputStream outFile = new FileOutputStream(file);
146 try {
147 StreamResult result = new StreamResult(outFile);
148 getXmlConverter().marshallCommunication(detCom, result);
149 } finally {
150 IOUtils.closeQuietly(outFile);
151 }
152 } else {// serialize
153 ObjectOutputStream out = new ObjectOutputStream(
154 new FileOutputStream(file));
155 try {
156 out.writeObject(detCom);
157 } finally {
158 IOUtils.closeQuietly(out);
159 }
160 }
161 lockFile.delete();
162 }
163
164 /**
165 * @param timeout
166 * in ms, 0 is no timeout
167 */
168 protected synchronized DetachedCommunication receiveFile(File dir,
169 File processedDir, long timeout) throws Exception {
170 long begin = System.currentTimeMillis();
171 File file = null;
172 while (file == null && isActive()) {
173 if (!dir.exists())
174 throw new DetachedException("Dir " + dir + " does not exist.");
175
176 File[] files = dir.listFiles(notLockFileFilter);
177 if (files.length > 0)
178 file = files[0];
179 else {
180 try {
181 wait(100);
182 } catch (InterruptedException e) {
183 // silent
184 }
185 }
186
187 long duration = System.currentTimeMillis() - begin;
188 if (timeout != 0 && duration > timeout) {
189 throw new DetachedException("Receive file timed out after "
190 + duration + "ms.");
191 }
192 }
193
194 if (!isActive()) {
195 log.debug("DetachedDriver is not active. Leaving receiveFile");
196 return null;
197 }
198
199 File lockFile = nameLockFile(file);
200 while (lockFile.exists())
201 // FIXME: implements time out
202 Thread.sleep(100);
203
204 // Read the file
205 final DetachedCommunication detCom;
206 if (FilenameUtils.getExtension(file.getName()).equals("xml")) {
207 if (getXmlConverter() == null)
208 throw new DetachedException("No XML converter defined.");
209 FileInputStream in = new FileInputStream(file);
210 try {
211 StreamSource source = new StreamSource(in);
212 detCom = getXmlConverter().unmarshallCommunication(source);
213 } finally {
214 IOUtils.closeQuietly(in);
215 }
216 } else {
217 ObjectInputStream in = new ObjectInputStream(new FileInputStream(
218 file));
219 try {
220 detCom = (DetachedCommunication) in.readObject();
221 } finally {
222 IOUtils.closeQuietly(in);
223 }
224 }
225 // Move to processed dir
226 FileUtils.moveFileToDirectory(file, processedDir, false);
227 return detCom;
228 }
229
230 public synchronized void stop() {
231 log.debug("Stopping Detached Driver");
232 active = false;
233 notifyAll();
234 }
235
236 private synchronized boolean isActive() {
237 return active;
238 }
239
240 protected File createLockFile(File file) {
241 File lockFile = nameLockFile(file);
242 try {
243 lockFile.createNewFile();
244 } catch (IOException e) {
245 throw new DetachedException("Cannot create lock file " + lockFile);
246 }
247 return lockFile;
248 }
249
250 protected File nameLockFile(File file) {
251 return new File(file.getAbsolutePath() + "." + lockFileExt);
252 }
253
254 public void setBaseDir(File baseDir) {
255 this.baseDir = baseDir;
256 }
257
258 private void createIfNotExist(File dir) {
259 if (!dir.exists()) {
260 log.warn("Dir " + dir.getAbsolutePath()
261 + " does not exist. Creating it...");
262 dir.mkdirs();
263 }
264 }
265
266 public void afterPropertiesSet() throws Exception {
267 this.requestsDir = new File(baseDir.getAbsolutePath() + File.separator
268 + "requests");
269 this.answersDir = new File(baseDir.getAbsolutePath() + File.separator
270 + "answers");
271 this.processedRequestsDir = new File(baseDir.getAbsolutePath()
272 + File.separator + "processed" + File.separator + "requests");
273 this.processedAnswersDir = new File(baseDir.getAbsolutePath()
274 + File.separator + "processed" + File.separator + "answers");
275 this.cleanedRequestsDir = new File(baseDir.getAbsolutePath()
276 + File.separator + "cleaned" + File.separator + "requests");
277 this.cleanedAnswersDir = new File(baseDir.getAbsolutePath()
278 + File.separator + "cleaned" + File.separator + "answers");
279
280 createIfNotExist(requestsDir);
281 createIfNotExist(answersDir);
282 createIfNotExist(processedRequestsDir);
283 createIfNotExist(processedAnswersDir);
284 createIfNotExist(cleanedRequestsDir);
285 createIfNotExist(cleanedAnswersDir);
286 if (log.isDebugEnabled())
287 log.debug("Detached File Driver initialized on " + baseDir);
288 }
289
290 public void cleanPreviousRuns() throws Exception {
291
292 // Clean requests and answers from previous builds
293 File[] remainingRequests = requestsDir.listFiles();
294 for (int i = 0; i < remainingRequests.length; i++) {
295 FileUtils.moveFileToDirectory(remainingRequests[i],
296 cleanedRequestsDir, false);
297 }
298
299 File[] remainingAnswers = answersDir.listFiles();
300 for (int i = 0; i < remainingAnswers.length; i++) {
301 FileUtils.moveFileToDirectory(remainingAnswers[i],
302 cleanedAnswersDir, false);
303 }
304 log.info("Clean previous runs of File Driver on " + baseDir);
305
306 }
307
308 public long getReceiveAnswerTimeout() {
309 return receiveAnswerTimeout;
310 }
311
312 public void setReceiveAnswerTimeout(long receiveAnswerTimeout) {
313 this.receiveAnswerTimeout = receiveAnswerTimeout;
314 }
315
316 public DetachedXmlConverter getXmlConverter() {
317 return xmlConverter;
318 }
319
320 public void setXmlConverter(DetachedXmlConverter xmlConverter) {
321 this.xmlConverter = xmlConverter;
322 }
323 }