]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java
Ignore log files
[gpl/argeo-slc.git] / runtime / org.argeo.slc.detached / src / main / java / org / argeo / slc / detached / DetachedExecutionServerImpl.java
1 package org.argeo.slc.detached;
2
3 import java.io.PrintWriter;
4 import java.io.StringWriter;
5 import java.util.Collections;
6 import java.util.HashMap;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Vector;
11
12 import org.apache.commons.io.IOUtils;
13 import org.apache.commons.logging.Log;
14 import org.apache.commons.logging.LogFactory;
15 import org.argeo.slc.detached.admin.CloseSession;
16 import org.argeo.slc.detached.admin.OpenSession;
17 import org.osgi.framework.BundleContext;
18 import org.osgi.framework.Constants;
19 import org.osgi.util.tracker.ServiceTracker;
20 import org.springframework.beans.factory.DisposableBean;
21 import org.springframework.beans.factory.InitializingBean;
22 import org.springframework.context.ApplicationContext;
23 import org.springframework.context.ApplicationContextAware;
24 import org.springframework.osgi.context.BundleContextAware;
25
26 /** Default implementation of a detached server. */
27 public class DetachedExecutionServerImpl implements DetachedExecutionServer,
28 BundleContextAware, InitializingBean, DisposableBean,
29 ApplicationContextAware {
30 private final static Log log = LogFactory
31 .getLog(DetachedExecutionServerImpl.class);
32
33 private final DetachedContextImpl detachedContext;
34 private final List sessions;
35
36 private int skipCount = 1;// start skipCount at 1 since the first step is
37 // always an open session
38
39 private BundleContext bundleContext;
40 private ApplicationContext applicationContext;
41
42 private final static String ALL_APP_CONTEXTS_KEY = "__allApplicationContexts";
43
44 private Map/* <String,ServiceTracker> */appContextServiceTrackers = Collections
45 .synchronizedMap(new HashMap());
46
47 public DetachedExecutionServerImpl() {
48 detachedContext = new DetachedContextImpl();
49 sessions = new Vector();
50 }
51
52 public synchronized DetachedAnswer executeRequest(DetachedRequest request) {
53 log.info("Received " + request);
54
55 DetachedAnswer answer = null;
56 try {
57 Object obj = retrieveStep(request);
58
59 if (obj == null)
60 throw new DetachedException("Could not find action with ref "
61 + request.getRef());
62
63 // Execute actions
64 if (obj instanceof DetachedStep) {
65 answer = processStep((DetachedStep) obj, request);
66
67 } else if (obj instanceof DetachedAdminCommand) {
68 answer = processAdminCommand((DetachedAdminCommand) obj,
69 request);
70 }
71
72 if (answer == null) {
73 throw new DetachedException("Unknown action type "
74 + obj.getClass() + " for action with ref "
75 + request.getRef());
76 }
77 } catch (Exception e) {
78 answer = new DetachedAnswer(request);
79 answer.setStatus(DetachedAnswer.ERROR);
80 StringWriter writer = new StringWriter();
81 e.printStackTrace(new PrintWriter(writer));
82 answer.setLog(writer.toString());
83 IOUtils.closeQuietly(writer);
84 }
85
86 // Case where current session is unexpectedly null
87 if (getCurrentSession() == null) {
88 log
89 .error("CURRENT SESSION IS NULL."
90 + " Detached status is inconsistent dumping sessions history:");
91 log.error(dumpSessionsHistory(request, answer));
92 if (answer != null) {
93 answer.setStatus(DetachedAnswer.ERROR);
94 answer
95 .addToLog("CURRENT SESSION IS NULL."
96 + " Detached status is inconsistent, see detached log for more details.");
97 return answer;
98 } else {
99 throw new DetachedException(
100 "Answer is null. Cannot return it. See log for more details.");
101 }
102
103 }
104
105 getCurrentSession().getRequests().add(request);
106 getCurrentSession().getAnswers().add(answer);
107 log.info("Sent " + answer);
108 return answer;
109 }
110
111 protected synchronized Object retrieveStep(DetachedRequest request)
112 throws Exception {
113
114 // Check whether there is a cached object
115 if (request.getCachedObject() != null) {
116 Object cachedObj = request.getCachedObject();
117 if (log.isTraceEnabled())
118 log.trace("Use cached object " + cachedObj + " for request "
119 + request);
120 return cachedObj;
121 }
122
123 // Check its own app context (typically for admin steps)
124 if (applicationContext.containsBean(request.getRef())) {
125 try {
126 Object obj = applicationContext.getBean(request.getRef());
127 if (log.isTraceEnabled())
128 log.trace("Retrieve from server app context " + obj
129 + " for request " + request);
130 return obj;
131 } catch (Exception e) {
132 if (log.isTraceEnabled())
133 log.trace("Could not retrieve " + request.getRef()
134 + " from server app context: " + e);
135 }
136 }
137
138 // Check whether the source bundle is set
139 String bundleName = request.getProperties().getProperty(
140 Constants.BUNDLE_SYMBOLICNAME);
141
142 ApplicationContext sourceAppContext = null;
143 if (bundleName != null) {
144 if (!appContextServiceTrackers.containsKey(bundleName)) {
145 ServiceTracker nSt = new ServiceTracker(bundleContext,
146 bundleContext.createFilter("(Bundle-SymbolicName="
147 + bundleName + ")"), null);
148 nSt.open();
149 appContextServiceTrackers.put(bundleName, nSt);
150 }
151 ServiceTracker st = (ServiceTracker) appContextServiceTrackers
152 .get(bundleName);
153 sourceAppContext = (ApplicationContext) st.getService();
154 if (log.isTraceEnabled())
155 log.trace("Use source application context from bundle "
156 + bundleName);
157
158 Object obj = null;
159 try {
160 obj = sourceAppContext.getBean(request.getRef());
161 } catch (Exception e) {
162 if (log.isTraceEnabled())
163 log.trace("Could not retrieve " + request.getRef()
164 + " from app context of " + bundleName + ": " + e);
165 }
166 return obj;
167 }
168
169 // no bundle name specified or it failed
170 if (!appContextServiceTrackers.containsKey(ALL_APP_CONTEXTS_KEY)) {
171 ServiceTracker nSt = new ServiceTracker(bundleContext,
172 ApplicationContext.class.getName(), null);
173 nSt.open();
174 appContextServiceTrackers.put(ALL_APP_CONTEXTS_KEY, nSt);
175 }
176 ServiceTracker st = (ServiceTracker) appContextServiceTrackers
177 .get(ALL_APP_CONTEXTS_KEY);
178 Object[] arr = st.getServices();
179 for (int i = 0; i < arr.length; i++) {
180 ApplicationContext appC = (ApplicationContext) arr[i];
181 if (appC.containsBean(request.getRef())) {
182 sourceAppContext = appC;
183 if (log.isTraceEnabled())
184 log
185 .trace("Retrieved source application context "
186 + "by scanning all published application contexts.");
187 try {
188 Object obj = sourceAppContext.getBean(request.getRef());
189 return obj;
190 } catch (Exception e) {
191 if (log.isTraceEnabled())
192 log.trace("Could not retrieve " + request.getRef()
193 + " from app context " + appC + ": " + e);
194 }
195 }
196 }
197
198 // ServiceReference[] refs = bundleContext.getAllServiceReferences(
199 // ApplicationContext.class.getName(), null);
200 // Object obj = null;
201 // for (int i = 0; i < refs.length; i++) {
202 // ApplicationContext appContext = (ApplicationContext)
203 // bundleContext
204 // .getService(refs[i]);
205 // try {
206 // obj = appContext.getBean(request.getRef());
207 // } catch (Exception e) {
208 // // silent
209 // if (log.isTraceEnabled())
210 // log.trace("Could not find ref " + request.getRef(), e);
211 // }
212 // if (obj != null) {
213 // break;
214 // }
215 // }
216 // return obj;
217
218 throw new Exception(
219 "Cannot find any published application context containing bean "
220 + request.getRef());
221 }
222
223 protected synchronized DetachedAnswer processStep(DetachedStep obj,
224 DetachedRequest request) {
225 DetachedAnswer answer;
226 if (getCurrentSession() == null)
227 throw new DetachedException("No open session.");
228
229 StringBuffer skippedLog = new StringBuffer();
230 boolean execute = true;
231 if (getPreviousSession() != null && !getPreviousSession().isClosed()) {
232 if (getCurrentSession().getDoItAgainPolicy().equals(
233 DetachedSession.SKIP_UNTIL_ERROR)) {
234 // Skip execution of already successful steps
235 if (getPreviousSession().getAnswers().size() > skipCount) {
236 DetachedAnswer previousAnswer = (DetachedAnswer) getPreviousSession()
237 .getAnswers().get(skipCount);
238 DetachedRequest previousRequest = (DetachedRequest) getPreviousSession()
239 .getRequests().get(skipCount);
240 // Check paths
241 if (!previousRequest.getPath().equals(request.getPath())) {
242 String msg = "New request is not consistent with previous path. previousPath="
243 + previousRequest.getPath()
244 + ", newPath="
245 + request.getPath() + "\n";
246 skippedLog.append(msg);
247 log.warn(msg);
248 }
249
250 if (previousAnswer.getStatus() != DetachedAnswer.ERROR) {
251 execute = false;
252 String msg = "Skipped path " + request.getPath()
253 + " (skipCount=" + skipCount + ")";
254 skippedLog.append(msg);
255 log.info(msg);
256 skipCount++;
257 } else {
258 log
259 .info("Path "
260 + request.getPath()
261 + " was previously in error, executing it again."
262 + " (skipCount=" + skipCount
263 + "). Reset skip count to 1");
264 skipCount = 1;
265 }
266 } else {
267 // went further as skip count, doing nothing.
268 }
269 }
270 }
271
272 if (execute) {
273 DetachedStep step = (DetachedStep) obj;
274 // Actually execute the step
275 answer = step.execute(detachedContext, request);
276 } else {
277 answer = new DetachedAnswer(request);
278 answer.setStatus(DetachedAnswer.SKIPPED);
279 answer.setLog(skippedLog.toString());
280 }
281 return answer;
282 }
283
284 protected synchronized DetachedAnswer processAdminCommand(
285 DetachedAdminCommand obj, DetachedRequest request) {
286 DetachedAnswer answer;
287 if (obj instanceof OpenSession) {
288 if (getCurrentSession() != null) {
289 // TODO: better understand why there is sometimes two open
290 // sessions sent.
291 log.warn("There is already an open session #"
292 + getCurrentSession().getUuid() + ". Closing it...");
293 DetachedAnswer answerT = new DetachedAnswer(
294 request,
295 "Session #"
296 + getCurrentSession().getUuid()
297 + " forcibly closed. THIS ANSWER WAS NOT SENT BACK.");
298 answerT.setStatus(DetachedAnswer.CLOSED_SESSION);
299 getCurrentSession().getAnswers().add(answerT);
300 }
301 sessions.add(((OpenSession) obj).execute(request, bundleContext));
302 answer = new DetachedAnswer(request, "Session #"
303 + getCurrentSession().getUuid() + " open.");
304 } else if (obj instanceof CloseSession) {
305 if (getCurrentSession() == null)
306 throw new DetachedException("There is no open session to close");
307 answer = new DetachedAnswer(request, "Session #"
308 + getCurrentSession().getUuid() + " closed.");
309 answer.setStatus(DetachedAnswer.CLOSED_SESSION);
310 } else {
311 answer = null;
312 }
313 return answer;
314 }
315
316 /**
317 * Returns the current session based on the list of previous sessions.
318 *
319 * @return the current session or null if there is no session yet defined or
320 * if the last registered session is null or in error.
321 */
322 protected synchronized final DetachedSession getCurrentSession() {
323 if (sessions.size() == 0) {
324 return null;
325 } else {
326 DetachedSession session = (DetachedSession) sessions.get(sessions
327 .size() - 1);
328 List answers = session.getAnswers();
329 if (answers.size() > 0) {
330 DetachedAnswer lastAnswer = (DetachedAnswer) answers
331 .get(answers.size() - 1);
332 if (lastAnswer.getStatus() == DetachedAnswer.ERROR
333 || lastAnswer.getStatus() == DetachedAnswer.CLOSED_SESSION)
334 return null;
335 }
336 return session;
337 }
338 }
339
340 protected synchronized String dumpSessionsHistory(
341 DetachedRequest requestCurrent, DetachedAnswer answerCurrent) {
342 StringBuffer buf = new StringBuffer(
343 "##\n## SESSIONS HISTORY DUMP\n##\n");
344 buf.append("# CURRENT\n");
345 buf.append("Current session: ").append(getCurrentSession())
346 .append('\n');
347 buf.append("Current request: ").append(requestCurrent).append('\n');
348 buf.append("Current answer: ").append(answerCurrent).append('\n');
349 buf.append("Skip count: ").append(skipCount).append('\n');
350
351 buf.append("# SESSIONS\n");
352 for (int i = 0; i < sessions.size(); i++) {
353 DetachedSession session = (DetachedSession) sessions.get(i);
354 buf.append(i).append(". ").append(session).append('\n');
355 List requests = session.getRequests();
356 List answers = session.getAnswers();
357 for (int j = 0; j < requests.size(); j++) {
358 DetachedRequest request = (DetachedRequest) requests.get(j);
359 buf.append('\t').append(j).append(". ").append(request).append(
360 '\n');
361 if (answers.size() > j) {
362 DetachedAnswer answer = (DetachedAnswer) answers.get(j);
363 buf.append('\t').append(j).append(". ").append(answer)
364 .append('\n');
365 }
366 }
367 }
368
369 buf.append("# DETACHED CONTEXT\n");
370 buf.append(detachedContext).append('\n');
371
372 buf.append("##\n## END OF SESSIONS HISTORY DUMP\n##\n");
373 return buf.toString();
374 }
375
376 protected synchronized final DetachedSession getPreviousSession() {
377 if (sessions.size() < 2)
378 return null;
379 else
380 return (DetachedSession) sessions.get(sessions.size() - 2);
381 }
382
383 public void setBundleContext(BundleContext bundleContext) {
384 this.bundleContext = bundleContext;
385 }
386
387 public void afterPropertiesSet() throws Exception {
388 log.info("Detached execution server initialized.");
389 }
390
391 public synchronized void destroy() throws Exception {
392 Iterator/* <String> */keys = appContextServiceTrackers.keySet()
393 .iterator();
394 while (keys.hasNext()) {
395 ServiceTracker st = (ServiceTracker) appContextServiceTrackers
396 .get(keys.next());
397 st.close();
398 }
399 appContextServiceTrackers.clear();
400
401 log.info("Detached execution server closed.");
402 }
403
404 public void setApplicationContext(ApplicationContext applicationContext) {
405 this.applicationContext = applicationContext;
406 }
407
408 }