]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.detached/src/main/java/org/argeo/slc/detached/DetachedExecutionServerImpl.java
Update licence headers
[gpl/argeo-slc.git] / runtime / org.argeo.slc.detached / src / main / java / org / argeo / slc / detached / DetachedExecutionServerImpl.java
1 /*
2 * Copyright (C) 2007-2012 Argeo GmbH
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.argeo.slc.detached;
17
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.Iterator;
21 import java.util.List;
22 import java.util.Map;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.argeo.slc.detached.admin.CloseSession;
27 import org.argeo.slc.detached.admin.OpenSession;
28 import org.osgi.framework.BundleContext;
29 import org.osgi.framework.Constants;
30 import org.osgi.util.tracker.ServiceTracker;
31 import org.springframework.beans.factory.DisposableBean;
32 import org.springframework.beans.factory.InitializingBean;
33 import org.springframework.context.ApplicationContext;
34 import org.springframework.context.ApplicationContextAware;
35 import org.springframework.osgi.context.BundleContextAware;
36
37 /** Default implementation of a detached server. */
38 public class DetachedExecutionServerImpl implements DetachedExecutionServer,
39 BundleContextAware, InitializingBean, DisposableBean,
40 ApplicationContextAware {
41 private final static Log log = LogFactory
42 .getLog(DetachedExecutionServerImpl.class);
43
44 private final DetachedContextImpl detachedContext;
45
46 private DetachedSession currentSession;
47
48 /**
49 * Session being replayed, skipping the steps in the current session. If
50 * null, no session is replayed
51 */
52 private DetachedSession replayedSession = null;
53
54 private BundleContext bundleContext;
55 private ApplicationContext applicationContext;
56
57 private final static String ALL_APP_CONTEXTS_KEY = "__allApplicationContexts";
58
59 private Map/* <String,ServiceTracker> */appContextServiceTrackers = Collections
60 .synchronizedMap(new HashMap());
61
62 public DetachedExecutionServerImpl() {
63 detachedContext = new DetachedContextImpl();
64 currentSession = new DetachedSession();
65 currentSession.setUuid(Long.toString(System.currentTimeMillis()));
66 }
67
68 public synchronized DetachedAnswer executeRequest(DetachedRequest request) {
69 if(log.isDebugEnabled())
70 log.debug("Received " + request);
71
72 DetachedAnswer answer = null;
73 try {
74 Object obj = retrieveStep(request);
75
76 if (obj == null)
77 throw new DetachedException("Could not find action with ref "
78 + request.getRef());
79
80 // Execute actions
81 if (obj instanceof DetachedStep) {
82 answer = processStep((DetachedStep) obj, request);
83
84 } else if (obj instanceof DetachedAdminCommand) {
85 answer = processAdminCommand((DetachedAdminCommand) obj,
86 request);
87 }
88
89 if (answer == null) {
90 throw new DetachedException("Unknown action type "
91 + obj.getClass() + " for action with ref "
92 + request.getRef());
93 }
94 } catch (Exception e) {
95 answer = new DetachedAnswer(request);
96 answer.setStatus(DetachedAnswer.ERROR);
97 log.error("Error executing request " + request, e);
98 }
99
100 currentSession.getRequests().add(request);
101 currentSession.getAnswers().add(answer);
102 if(log.isDebugEnabled())
103 log.debug("Sent " + answer);
104 return answer;
105 }
106
107 protected synchronized Object retrieveStep(DetachedRequest request)
108 throws Exception {
109
110 // Check whether there is a cached object
111 if (request.getCachedObject() != null) {
112 Object cachedObj = request.getCachedObject();
113 if (log.isTraceEnabled())
114 log.trace("Use cached object " + cachedObj + " for request "
115 + request);
116 return cachedObj;
117 }
118
119 // Check its own app context (typically for admin steps)
120 if (applicationContext.containsBean(request.getRef())) {
121 try {
122 Object obj = applicationContext.getBean(request.getRef());
123 if (log.isTraceEnabled())
124 log.trace("Retrieve from server app context " + obj
125 + " for request " + request);
126 return obj;
127 } catch (Exception e) {
128 if (log.isTraceEnabled())
129 log.trace("Could not retrieve " + request.getRef()
130 + " from server app context: " + e);
131 }
132 }
133
134 // Check whether the source bundle is set
135 String bundleName = request.getProperties().getProperty(
136 Constants.BUNDLE_SYMBOLICNAME);
137
138 ApplicationContext sourceAppContext = null;
139 if (bundleName != null) {
140 if (!appContextServiceTrackers.containsKey(bundleName)) {
141 ServiceTracker nSt = new ServiceTracker(bundleContext,
142 bundleContext.createFilter("(Bundle-SymbolicName="
143 + bundleName + ")"), null);
144 nSt.open();
145 appContextServiceTrackers.put(bundleName, nSt);
146 }
147 ServiceTracker st = (ServiceTracker) appContextServiceTrackers
148 .get(bundleName);
149 sourceAppContext = (ApplicationContext) st.getService();
150 if (log.isTraceEnabled())
151 log.trace("Use source application context from bundle "
152 + bundleName);
153
154 Object obj = null;
155 try {
156 obj = sourceAppContext.getBean(request.getRef());
157 } catch (Exception e) {
158 if (log.isTraceEnabled())
159 log.trace("Could not retrieve " + request.getRef()
160 + " from app context of " + bundleName + ": " + e);
161 }
162 return obj;
163 }
164
165 // no bundle name specified or it failed
166 if (!appContextServiceTrackers.containsKey(ALL_APP_CONTEXTS_KEY)) {
167 ServiceTracker nSt = new ServiceTracker(bundleContext,
168 ApplicationContext.class.getName(), null);
169 nSt.open();
170 appContextServiceTrackers.put(ALL_APP_CONTEXTS_KEY, nSt);
171 }
172 ServiceTracker st = (ServiceTracker) appContextServiceTrackers
173 .get(ALL_APP_CONTEXTS_KEY);
174 Object[] arr = st.getServices();
175 for (int i = 0; i < arr.length; i++) {
176 ApplicationContext appC = (ApplicationContext) arr[i];
177 if (appC.containsBean(request.getRef())) {
178 sourceAppContext = appC;
179 if (log.isTraceEnabled())
180 log
181 .trace("Retrieved source application context "
182 + "by scanning all published application contexts.");
183 try {
184 Object obj = sourceAppContext.getBean(request.getRef());
185 return obj;
186 } catch (Exception e) {
187 if (log.isTraceEnabled())
188 log.trace("Could not retrieve " + request.getRef()
189 + " from app context " + appC + ": " + e);
190 }
191 }
192 }
193
194 throw new Exception(
195 "Cannot find any published application context containing bean "
196 + request.getRef());
197 }
198
199 protected synchronized DetachedAnswer processStep(DetachedStep obj,
200 DetachedRequest request) {
201 DetachedAnswer answer;
202
203 StringBuffer skippedLog = new StringBuffer();
204 boolean execute = true;
205
206 if (replayedSession != null) {
207 // Skip execution of already successful steps
208 int stepIndex = currentSession.getExecutedStepCount();
209
210 if (stepIndex < replayedSession.getExecutedStepCount()) {
211 DetachedAnswer previousAnswer = (DetachedAnswer) replayedSession
212 .getAnswers().get(stepIndex);
213 DetachedRequest previousRequest = (DetachedRequest) replayedSession
214 .getRequests().get(stepIndex);
215
216 // check step names
217 if (!previousRequest.getRef().equals(request.getRef())) {
218 String msg = "New request is not consistent with previous ref. previousRef="
219 + previousRequest.getRef()
220 + ", newRef="
221 + request.getRef() + "\n";
222 skippedLog.append(msg);
223 log.warn(msg);
224 }
225
226 if (previousAnswer.getStatus() != DetachedAnswer.ERROR) {
227 // if no error occurred in the replayedSession,
228 // skip the step
229 execute = false;
230 String msg = "Skipped Step " + request.getRef()
231 + " (stepIndex=" + stepIndex + ")";
232 skippedLog.append(msg);
233 log.info(msg);
234
235 } else {
236 // if an error occurred, execute the step and leave
237 // skipUntillError mode (even if replayedSession
238 // has more steps)
239 log.info("### End of SkipUntilError Mode ###");
240 log.info("Step " + request.getRef()
241 + " was previously in error, executing it again."
242 + " (stepIndex=" + stepIndex + ").");
243 replayedSession = null;
244 }
245 } else {
246 // went further as skip count, doing nothing.
247 }
248 }
249
250 if (execute) {
251 DetachedStep step = (DetachedStep) obj;
252 // Actually execute the step
253 answer = step.execute(detachedContext, request);
254 } else {
255 answer = new DetachedAnswer(request);
256 answer.setStatus(DetachedAnswer.SKIPPED);
257 answer.setLog(skippedLog.toString());
258 }
259 return answer;
260 }
261
262 protected synchronized DetachedAnswer processAdminCommand(
263 DetachedAdminCommand obj, DetachedRequest request) {
264 DetachedAnswer answer;
265 if (obj instanceof OpenSession) {
266 DetachedSession newSession = ((OpenSession) obj).execute(request,
267 bundleContext);
268
269 log.debug("Creating new DetachedSession : " + newSession);
270
271 if ((currentSession != null) && currentSession.lastActionIsError()
272 && DetachedSession.SKIP_UNTIL_ERROR.equals(newSession.getDoItAgainPolicy())) {
273 // switch to replay mode
274 log.info("### Start SkipUntilError Mode ###");
275 replayedSession = currentSession;
276 }
277
278 currentSession = newSession;
279
280 answer = new DetachedAnswer(request, "Session #"
281 + currentSession.getUuid() + " open.");
282 } else if (obj instanceof CloseSession) {
283 if (currentSession == null)
284 throw new DetachedException("There is no open session to close");
285 answer = new DetachedAnswer(request, "Session #"
286 + currentSession.getUuid() + " closed.");
287 answer.setStatus(DetachedAnswer.CLOSED_SESSION);
288 } else {
289 answer = null;
290 }
291 return answer;
292 }
293
294 protected synchronized String dumpSessionsHistory(
295 DetachedRequest requestCurrent, DetachedAnswer answerCurrent) {
296 StringBuffer buf = new StringBuffer(
297 "##\n## SESSIONS HISTORY DUMP\n##\n");
298 buf.append("# CURRENT\n");
299 buf.append("Current session: ").append(currentSession)
300 .append('\n');
301 buf.append("Current request: ").append(requestCurrent).append('\n');
302 buf.append("Current answer: ").append(answerCurrent).append('\n');
303
304 buf.append("# CURRENT SESSION\n");
305
306 List requests = currentSession.getRequests();
307 List answers = currentSession.getAnswers();
308 for (int j = 0; j < requests.size(); j++) {
309 DetachedRequest request = (DetachedRequest) requests.get(j);
310 buf.append('\t').append(j).append(". ").append(request)
311 .append('\n');
312 if (answers.size() > j) {
313 DetachedAnswer answer = (DetachedAnswer) answers.get(j);
314 buf.append('\t').append(j).append(". ").append(answer).append(
315 '\n');
316 }
317 }
318
319 buf.append("# DETACHED CONTEXT\n");
320 buf.append(detachedContext).append('\n');
321
322 buf.append("##\n## END OF SESSIONS HISTORY DUMP\n##\n");
323 return buf.toString();
324 }
325
326 public void setBundleContext(BundleContext bundleContext) {
327 this.bundleContext = bundleContext;
328 }
329
330 public void afterPropertiesSet() throws Exception {
331 log.debug("Detached execution server initialized.");
332 }
333
334 public synchronized void destroy() throws Exception {
335 Iterator/* <String> */keys = appContextServiceTrackers.keySet()
336 .iterator();
337 while (keys.hasNext()) {
338 ServiceTracker st = (ServiceTracker) appContextServiceTrackers
339 .get(keys.next());
340 st.close();
341 }
342 appContextServiceTrackers.clear();
343
344 log.debug("Detached execution server closed.");
345 }
346
347 public void setApplicationContext(ApplicationContext applicationContext) {
348 this.applicationContext = applicationContext;
349 }
350
351 }