1 package org
.argeo
.slc
.core
.execution
.http
;
3 import java
.io
.ByteArrayInputStream
;
4 import java
.io
.IOException
;
5 import java
.io
.InputStream
;
6 import java
.io
.OutputStream
;
7 import java
.nio
.channels
.Channels
;
8 import java
.nio
.charset
.StandardCharsets
;
9 import java
.nio
.file
.Files
;
10 import java
.nio
.file
.Path
;
11 import java
.security
.AccessControlContext
;
12 import java
.security
.PrivilegedAction
;
13 import java
.util
.Collection
;
14 import java
.util
.GregorianCalendar
;
16 import java
.util
.UUID
;
17 import java
.util
.concurrent
.Callable
;
18 import java
.util
.concurrent
.ExecutorService
;
19 import java
.util
.concurrent
.Future
;
21 import javax
.jcr
.Node
;
22 import javax
.jcr
.Property
;
23 import javax
.jcr
.Repository
;
24 import javax
.jcr
.RepositoryException
;
25 import javax
.jcr
.Session
;
26 import javax
.jcr
.nodetype
.NodeType
;
27 import javax
.security
.auth
.Subject
;
28 import javax
.servlet
.ServletException
;
29 import javax
.servlet
.http
.HttpServlet
;
30 import javax
.servlet
.http
.HttpServletRequest
;
31 import javax
.servlet
.http
.HttpServletResponse
;
33 import org
.apache
.commons
.io
.FilenameUtils
;
34 import org
.apache
.commons
.logging
.Log
;
35 import org
.apache
.commons
.logging
.LogFactory
;
36 import org
.argeo
.cms
.auth
.CmsSession
;
37 import org
.argeo
.jcr
.JcrUtils
;
38 import org
.argeo
.node
.NodeUtils
;
39 import org
.argeo
.slc
.SlcException
;
40 import org
.argeo
.slc
.SlcNames
;
41 import org
.argeo
.slc
.SlcTypes
;
42 import org
.argeo
.slc
.execution
.ExecutionProcess
;
43 import org
.osgi
.framework
.BundleContext
;
44 import org
.osgi
.framework
.InvalidSyntaxException
;
45 import org
.osgi
.framework
.ServiceReference
;
46 import org
.osgi
.service
.http
.HttpContext
;
47 import org
.osgi
.service
.http
.HttpService
;
48 import org
.osgi
.util
.tracker
.ServiceTracker
;
50 public class RunnerServlet
extends HttpServlet
{
51 private final static Log log
= LogFactory
.getLog(RunnerServlet
.class);
53 private static final long serialVersionUID
= -317016687309065291L;
56 private BundleContext bc
;
57 private ExecutorService executor
;
58 private boolean persistProcesses
= true;
60 public RunnerServlet(BundleContext bc
, Path baseDir
, ExecutorService executor
) {
62 this.baseDir
= baseDir
;
63 this.executor
= executor
;
66 protected void setPersistProcesses(boolean persistProcesses
) {
67 this.persistProcesses
= persistProcesses
;
71 protected void service(HttpServletRequest req
, HttpServletResponse resp
) throws ServletException
, IOException
{
72 super.service(req
, resp
);
76 protected void doPost(HttpServletRequest req
, HttpServletResponse resp
) throws ServletException
, IOException
{
78 // Deal with x-www-form-urlencoded
79 // FIXME make it more robust an generic
80 Map
<String
, String
[]> params
= req
.getParameterMap();
81 if (params
.size() != 0) {
82 String json
= params
.keySet().iterator().next();
83 in
= new ByteArrayInputStream(json
.getBytes(StandardCharsets
.UTF_8
));
85 in
= req
.getInputStream();
88 String path
= req
.getPathInfo();
89 // InputStream in = req.getInputStream();
90 OutputStream out
= resp
.getOutputStream();
92 String tokens
[] = path
.split("/");
93 // first token alway empty
94 String workgroup
= tokens
[1];
96 CmsSession cmsSession
= getByLocalId(req
.getSession().getId());
97 // FIXME make it more robust
98 if (cmsSession
!= null) {// multiuser
99 boolean authorized
= false;
100 for (String role
: cmsSession
.getAuthorization().getRoles()) {
101 if (role
.startsWith("cn=" + workgroup
) || role
.startsWith("uid=" + workgroup
)) {
111 // LdapName userDn = cmsSession.getUserDn();
112 AccessControlContext acc
= (AccessControlContext
) req
.getAttribute(HttpContext
.REMOTE_USER
);
113 Subject subject
= Subject
.getSubject(acc
);
115 StringBuilder sb
= new StringBuilder("");
116 for (int i
= 2; i
< tokens
.length
; i
++) {
119 sb
.append(tokens
[i
]);
121 String flowName
= sb
.toString();
122 String ext
= FilenameUtils
.getExtension(flowName
.toString());
124 Session session
= null;
125 Node realizedFlowNode
= null;
126 if (persistProcesses
) {
128 Repository repository
;
130 ServiceReference
<Repository
> sr
= bc
.getServiceReferences(Repository
.class, "(cn=home)").iterator()
132 repository
= bc
.getService(sr
);
134 } catch (InvalidSyntaxException e2
) {
135 throw new SlcException("Cannot find home repository", e2
);
137 session
= Subject
.doAs(subject
, new PrivilegedAction
<Session
>() {
140 public Session
run() {
142 return repository
.login();
143 } catch (RepositoryException e
) {
144 throw new RuntimeException("Cannot login", e
);
149 UUID processUuid
= UUID
.randomUUID();
150 GregorianCalendar started
= new GregorianCalendar();
151 Node groupHome
= NodeUtils
.getGroupHome(session
, workgroup
);
152 if (groupHome
== null) {
153 groupHome
= NodeUtils
.getUserHome(session
);
155 String processPath
= SlcNames
.SLC_SYSTEM
+ "/" + SlcNames
.SLC_PROCESSES
+ "/"
156 + JcrUtils
.dateAsPath(started
, true) + processUuid
;
157 Node processNode
= JcrUtils
.mkdirs(groupHome
, processPath
, SlcTypes
.SLC_PROCESS
);
159 processNode
.setProperty(SlcNames
.SLC_UUID
, processUuid
.toString());
160 processNode
.setProperty(SlcNames
.SLC_STATUS
, ExecutionProcess
.RUNNING
);
161 realizedFlowNode
= processNode
.addNode(SlcNames
.SLC_FLOW
);
162 realizedFlowNode
.addMixin(SlcTypes
.SLC_REALIZED_FLOW
);
163 realizedFlowNode
.setProperty(SlcNames
.SLC_STARTED
, started
);
164 realizedFlowNode
.setProperty(SlcNames
.SLC_NAME
, flowName
);
165 Node addressNode
= realizedFlowNode
.addNode(SlcNames
.SLC_ADDRESS
, NodeType
.NT_ADDRESS
);
166 addressNode
.setProperty(Property
.JCR_PATH
, flowName
);
167 processNode
.getSession().save();
168 } catch (RepositoryException e1
) {
169 throw new SlcException("Cannot register SLC process", e1
);
171 if (log
.isTraceEnabled())
172 log
.trace(session
.getUserID() + " " + workgroup
+ " " + flowName
);
175 boolean isFailed
= false;
176 try (ServiceChannel serviceChannel
= new ServiceChannel(Channels
.newChannel(in
), Channels
.newChannel(out
),
178 resp
.setHeader("Content-Type", "application/json");
180 Callable
<Integer
> task
;
181 if (ext
.equals("api")) {
182 String uri
= Files
.readAllLines(baseDir
.resolve(flowName
)).get(0);
183 task
= new WebServiceTask(serviceChannel
, uri
);
185 task
= createTask(serviceChannel
, flowName
);
189 throw new SlcException("No task found for " + flowName
);
192 Future
<Integer
> f
= executor
.submit(task
);
193 int written
= f
.get();
195 if (written
< 0) {// error
199 if (log
.isTraceEnabled())
200 log
.trace("Written " + written
+ " bytes");
201 if (persistProcesses
)
203 Node processNode
= realizedFlowNode
.getParent();
204 processNode
.setProperty(SlcNames
.SLC_STATUS
,
205 isFailed ? ExecutionProcess
.ERROR
: ExecutionProcess
.COMPLETED
);
206 realizedFlowNode
.setProperty(SlcNames
.SLC_COMPLETED
, new GregorianCalendar());
207 processNode
.getSession().save();
208 } catch (RepositoryException e1
) {
209 throw new SlcException("Cannot update SLC process status", e1
);
211 } catch (Exception e
) {
212 if (persistProcesses
)
214 Node processNode
= realizedFlowNode
.getParent();
215 processNode
.setProperty(SlcNames
.SLC_STATUS
, ExecutionProcess
.ERROR
);
216 realizedFlowNode
.setProperty(SlcNames
.SLC_COMPLETED
, new GregorianCalendar());
217 processNode
.getSession().save();
218 } catch (RepositoryException e1
) {
219 throw new SlcException("Cannot update SLC process status", e1
);
221 // throw new SlcException("Task " + flowName + " failed", e);
224 JcrUtils
.logoutQuietly(session
);
228 resp
.setStatus(HttpServletResponse
.SC_INTERNAL_SERVER_ERROR
);
232 protected Callable
<Integer
> createTask(ServiceChannel serviceChannel
, String flowName
) {
236 protected Path
getBaseDir() {
240 protected HttpContext
getHttpContext(String httpAuthrealm
) {
244 public static void register(BundleContext bc
, String alias
, RunnerServlet runnerServlet
, String httpAuthrealm
) {
246 ServiceTracker
<HttpService
, HttpService
> serviceTracker
= new ServiceTracker
<HttpService
, HttpService
>(bc
,
247 HttpService
.class, null) {
250 public HttpService
addingService(ServiceReference
<HttpService
> reference
) {
251 // TODO Auto-generated method stub
252 HttpService httpService
= super.addingService(reference
);
254 HttpContext httpContext
= runnerServlet
.getHttpContext(httpAuthrealm
);
255 if (httpContext
== null)
256 httpContext
= new RunnerHttpContext(httpAuthrealm
);
257 httpService
.registerServlet(alias
, runnerServlet
, null, httpContext
);
258 } catch (Exception e
) {
259 throw new SlcException("Cannot register servlet", e
);
265 // ServiceReference<HttpService> ref =
266 // bc.getServiceReference(HttpService.class);
267 // HttpService httpService = bc.getService(ref);
268 // httpService.registerServlet(alias, runnerServlet, null, null);
269 // bc.ungetService(ref);
270 serviceTracker
.open();
271 } catch (Exception e
) {
272 throw new SlcException("Cannot register servlet", e
);
276 public static void unregister(BundleContext bc
, String alias
) {
278 ServiceReference
<HttpService
> ref
= bc
.getServiceReference(HttpService
.class);
281 HttpService httpService
= bc
.getService(ref
);
282 httpService
.unregister(alias
);
283 bc
.ungetService(ref
);
284 } catch (Exception e
) {
285 throw new SlcException("Cannot unregister servlet", e
);
289 CmsSession
getByLocalId(String localId
) {
290 // BundleContext bc =
291 // FrameworkUtil.getBundle(RunnerServlet.class).getBundleContext();
292 Collection
<ServiceReference
<CmsSession
>> sr
;
294 sr
= bc
.getServiceReferences(CmsSession
.class, "(" + CmsSession
.SESSION_LOCAL_ID
+ "=" + localId
+ ")");
295 } catch (InvalidSyntaxException e
) {
296 throw new SlcException("Cannot get CMS session for id " + localId
, e
);
298 ServiceReference
<CmsSession
> cmsSessionRef
;
299 if (sr
.size() == 1) {
300 cmsSessionRef
= sr
.iterator().next();
301 return (CmsSession
) bc
.getService(cmsSessionRef
);
302 } else if (sr
.size() == 0) {
305 throw new SlcException(sr
.size() + " CMS sessions registered for " + localId
);
309 protected ExecutorService
getExecutor() {