]> git.argeo.org Git - gpl/argeo-slc.git/blob - org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerServlet.java
Make processes persistence switchable
[gpl/argeo-slc.git] / org.argeo.slc.core / src / org / argeo / slc / core / execution / http / RunnerServlet.java
1 package org.argeo.slc.core.execution.http;
2
3 import java.io.ByteArrayInputStream;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.io.OutputStream;
7 import java.nio.channels.Channels;
8 import java.nio.charset.StandardCharsets;
9 import java.nio.file.Files;
10 import java.nio.file.Path;
11 import java.security.AccessControlContext;
12 import java.security.PrivilegedAction;
13 import java.util.Collection;
14 import java.util.GregorianCalendar;
15 import java.util.Map;
16 import java.util.UUID;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Future;
20
21 import javax.jcr.Node;
22 import javax.jcr.Property;
23 import javax.jcr.Repository;
24 import javax.jcr.RepositoryException;
25 import javax.jcr.Session;
26 import javax.jcr.nodetype.NodeType;
27 import javax.security.auth.Subject;
28 import javax.servlet.ServletException;
29 import javax.servlet.http.HttpServlet;
30 import javax.servlet.http.HttpServletRequest;
31 import javax.servlet.http.HttpServletResponse;
32
33 import org.apache.commons.io.FilenameUtils;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.argeo.cms.auth.CmsSession;
37 import org.argeo.jcr.JcrUtils;
38 import org.argeo.node.NodeUtils;
39 import org.argeo.slc.SlcException;
40 import org.argeo.slc.SlcNames;
41 import org.argeo.slc.SlcTypes;
42 import org.argeo.slc.execution.ExecutionProcess;
43 import org.osgi.framework.BundleContext;
44 import org.osgi.framework.InvalidSyntaxException;
45 import org.osgi.framework.ServiceReference;
46 import org.osgi.service.http.HttpContext;
47 import org.osgi.service.http.HttpService;
48 import org.osgi.util.tracker.ServiceTracker;
49
50 public class RunnerServlet extends HttpServlet {
51 private final static Log log = LogFactory.getLog(RunnerServlet.class);
52
53 private static final long serialVersionUID = -317016687309065291L;
54
55 private Path baseDir;
56 private BundleContext bc;
57 private ExecutorService executor;
58 private boolean persistProcesses = true;
59
60 public RunnerServlet(BundleContext bc, Path baseDir, ExecutorService executor) {
61 this.bc = bc;
62 this.baseDir = baseDir;
63 this.executor = executor;
64 }
65
66 protected void setPersistProcesses(boolean persistProcesses) {
67 this.persistProcesses = persistProcesses;
68 }
69
70 @Override
71 protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
72 super.service(req, resp);
73 }
74
75 @Override
76 protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
77 InputStream in;
78 // Deal with x-www-form-urlencoded
79 // FIXME make it more robust an generic
80 Map<String, String[]> params = req.getParameterMap();
81 if (params.size() != 0) {
82 String json = params.keySet().iterator().next();
83 in = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
84 } else {
85 in = req.getInputStream();
86 }
87
88 // InputStream in = req.getInputStream();
89 // Gson gson = new Gson();
90 // JsonParser jsonParser = new JsonParser();
91 // BufferedReader reader = new BufferedReader(new InputStreamReader(in,
92 // Charset.forName("UTF-8")));
93 // JsonElement payload = jsonParser.parse(reader);
94 // String payloadStr = gson.toJson(payload);
95 //
96 // log.debug(payloadStr);
97 // if (true)
98 // return;
99
100 String path = req.getPathInfo();
101 // InputStream in = req.getInputStream();
102 OutputStream out = resp.getOutputStream();
103
104 String tokens[] = path.split("/");
105 // first token alway empty
106 String workgroup = tokens[1];
107
108 CmsSession cmsSession = getByLocalId(req.getSession().getId());
109 // FIXME make it more robust
110 if (cmsSession != null) {// multiuser
111 boolean authorized = false;
112 for (String role : cmsSession.getAuthorization().getRoles()) {
113 if (role.startsWith("cn=" + workgroup) || role.startsWith("uid=" + workgroup)) {
114 authorized = true;
115 break;
116 }
117 }
118 if (!authorized) {
119 resp.setStatus(403);
120 return;
121 }
122 }
123 // LdapName userDn = cmsSession.getUserDn();
124 AccessControlContext acc = (AccessControlContext) req.getAttribute(HttpContext.REMOTE_USER);
125 Subject subject = Subject.getSubject(acc);
126 // flow path
127 StringBuilder sb = new StringBuilder("");
128 for (int i = 2; i < tokens.length; i++) {
129 if (i != 2)
130 sb.append('/');
131 sb.append(tokens[i]);
132 }
133 String flowName = sb.toString();
134 String ext = FilenameUtils.getExtension(flowName.toString());
135
136 Session session = null;
137 Node realizedFlowNode = null;
138 if (persistProcesses) {
139 // JCR
140 Repository repository;
141 try {
142 ServiceReference<Repository> sr = bc.getServiceReferences(Repository.class, "(cn=home)").iterator()
143 .next();
144 repository = bc.getService(sr);
145
146 } catch (InvalidSyntaxException e2) {
147 throw new SlcException("Cannot find home repository", e2);
148 }
149 session = Subject.doAs(subject, new PrivilegedAction<Session>() {
150
151 @Override
152 public Session run() {
153 try {
154 return repository.login();
155 } catch (RepositoryException e) {
156 throw new RuntimeException("Cannot login", e);
157 }
158 }
159
160 });
161 UUID processUuid = UUID.randomUUID();
162 GregorianCalendar started = new GregorianCalendar();
163 Node groupHome = NodeUtils.getGroupHome(session, workgroup);
164 if (groupHome == null) {
165 groupHome = NodeUtils.getUserHome(session);
166 }
167 String processPath = SlcNames.SLC_SYSTEM + "/" + SlcNames.SLC_PROCESSES + "/"
168 + JcrUtils.dateAsPath(started, true) + processUuid;
169 Node processNode = JcrUtils.mkdirs(groupHome, processPath, SlcTypes.SLC_PROCESS);
170 try {
171 processNode.setProperty(SlcNames.SLC_UUID, processUuid.toString());
172 processNode.setProperty(SlcNames.SLC_STATUS, ExecutionProcess.RUNNING);
173 realizedFlowNode = processNode.addNode(SlcNames.SLC_FLOW);
174 realizedFlowNode.addMixin(SlcTypes.SLC_REALIZED_FLOW);
175 realizedFlowNode.setProperty(SlcNames.SLC_STARTED, started);
176 realizedFlowNode.setProperty(SlcNames.SLC_NAME, flowName);
177 Node addressNode = realizedFlowNode.addNode(SlcNames.SLC_ADDRESS, NodeType.NT_ADDRESS);
178 addressNode.setProperty(Property.JCR_PATH, flowName);
179 processNode.getSession().save();
180 } catch (RepositoryException e1) {
181 throw new SlcException("Cannot register SLC process", e1);
182 }
183 if (log.isTraceEnabled())
184 log.trace(session.getUserID() + " " + workgroup + " " + flowName);
185 }
186
187 try (ServiceChannel serviceChannel = new ServiceChannel(Channels.newChannel(in), Channels.newChannel(out),
188 executor)) {
189 resp.setHeader("Content-Type", "application/json");
190
191 Callable<Integer> task;
192 if (ext.equals("api")) {
193 String uri = Files.readAllLines(baseDir.resolve(flowName)).get(0);
194 task = new WebServiceTask(serviceChannel, uri);
195 } else {
196 task = createTask(serviceChannel, flowName);
197 }
198
199 if (task == null)
200 throw new SlcException("No task found for " + flowName);
201
202 // execute
203 Future<Integer> f = executor.submit(task);
204 int written = f.get();
205 if (log.isTraceEnabled())
206 log.trace("Written " + written + " bytes");
207 if (persistProcesses)
208 try {
209 Node processNode = realizedFlowNode.getParent();
210 processNode.setProperty(SlcNames.SLC_STATUS, ExecutionProcess.COMPLETED);
211 realizedFlowNode.setProperty(SlcNames.SLC_COMPLETED, new GregorianCalendar());
212 processNode.getSession().save();
213 } catch (RepositoryException e1) {
214 throw new SlcException("Cannot update SLC process status", e1);
215 }
216 } catch (Exception e) {
217 if (persistProcesses)
218 try {
219 Node processNode = realizedFlowNode.getParent();
220 processNode.setProperty(SlcNames.SLC_STATUS, ExecutionProcess.ERROR);
221 realizedFlowNode.setProperty(SlcNames.SLC_COMPLETED, new GregorianCalendar());
222 processNode.getSession().save();
223 } catch (RepositoryException e1) {
224 throw new SlcException("Cannot update SLC process status", e1);
225 }
226 throw new SlcException("Task " + flowName + " failed", e);
227 } finally {
228 JcrUtils.logoutQuietly(session);
229 }
230
231 // JsonElement answer = jsonParser.parse(answerStr);
232 // resp.setHeader("Content-Type", "application/json");
233 // JsonWriter jsonWriter = gson.newJsonWriter(resp.getWriter());
234 // jsonWriter.setIndent(" ");
235 // gson.toJson(answer, jsonWriter);
236 // jsonWriter.flush();
237 }
238
239 protected Callable<Integer> createTask(ServiceChannel serviceChannel, String flowName) {
240 return null;
241 }
242
243 protected Path getBaseDir() {
244 return baseDir;
245 }
246
247 protected HttpContext getHttpContext(String httpAuthrealm) {
248 return null;
249 }
250
251 public static void register(BundleContext bc, String alias, RunnerServlet runnerServlet, String httpAuthrealm) {
252 try {
253 ServiceTracker<HttpService, HttpService> serviceTracker = new ServiceTracker<HttpService, HttpService>(bc,
254 HttpService.class, null) {
255
256 @Override
257 public HttpService addingService(ServiceReference<HttpService> reference) {
258 // TODO Auto-generated method stub
259 HttpService httpService = super.addingService(reference);
260 try {
261 HttpContext httpContext = runnerServlet.getHttpContext(httpAuthrealm);
262 if (httpContext == null)
263 httpContext = new RunnerHttpContext(httpAuthrealm);
264 httpService.registerServlet(alias, runnerServlet, null, httpContext);
265 } catch (Exception e) {
266 throw new SlcException("Cannot register servlet", e);
267 }
268 return httpService;
269 }
270
271 };
272 // ServiceReference<HttpService> ref =
273 // bc.getServiceReference(HttpService.class);
274 // HttpService httpService = bc.getService(ref);
275 // httpService.registerServlet(alias, runnerServlet, null, null);
276 // bc.ungetService(ref);
277 serviceTracker.open();
278 } catch (Exception e) {
279 throw new SlcException("Cannot register servlet", e);
280 }
281 }
282
283 public static void unregister(BundleContext bc, String alias) {
284 try {
285 ServiceReference<HttpService> ref = bc.getServiceReference(HttpService.class);
286 if (ref == null)
287 return;
288 HttpService httpService = bc.getService(ref);
289 httpService.unregister(alias);
290 bc.ungetService(ref);
291 } catch (Exception e) {
292 throw new SlcException("Cannot unregister servlet", e);
293 }
294 }
295
296 CmsSession getByLocalId(String localId) {
297 // BundleContext bc =
298 // FrameworkUtil.getBundle(RunnerServlet.class).getBundleContext();
299 Collection<ServiceReference<CmsSession>> sr;
300 try {
301 sr = bc.getServiceReferences(CmsSession.class, "(" + CmsSession.SESSION_LOCAL_ID + "=" + localId + ")");
302 } catch (InvalidSyntaxException e) {
303 throw new SlcException("Cannot get CMS session for id " + localId, e);
304 }
305 ServiceReference<CmsSession> cmsSessionRef;
306 if (sr.size() == 1) {
307 cmsSessionRef = sr.iterator().next();
308 return (CmsSession) bc.getService(cmsSessionRef);
309 } else if (sr.size() == 0) {
310 return null;
311 } else
312 throw new SlcException(sr.size() + " CMS sessions registered for " + localId);
313
314 }
315
316 }