From d768c949136ad4bc95557aa4e706e22a3a392ae9 Mon Sep 17 00:00:00 2001 From: Mathieu Baudier Date: Sun, 25 Jun 2017 11:47:16 +0200 Subject: [PATCH] Introduce http runner --- org.argeo.slc.core/pom.xml | 5 + .../execution/http/RunnerHttpContext.java | 84 ++++++ .../core/execution/http/RunnerServlet.java | 265 ++++++++++++++++++ .../core/execution/http/ServiceChannel.java | 68 +++++ .../core/execution/http/WebServiceTask.java | 74 +++++ 5 files changed, 496 insertions(+) create mode 100644 org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerHttpContext.java create mode 100644 org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerServlet.java create mode 100644 org.argeo.slc.core/src/org/argeo/slc/core/execution/http/ServiceChannel.java create mode 100644 org.argeo.slc.core/src/org/argeo/slc/core/execution/http/WebServiceTask.java diff --git a/org.argeo.slc.core/pom.xml b/org.argeo.slc.core/pom.xml index 2b43cf242..d4445db19 100644 --- a/org.argeo.slc.core/pom.xml +++ b/org.argeo.slc.core/pom.xml @@ -31,6 +31,11 @@ org.argeo.node.api ${version.argeo-commons} + + org.argeo.commons + org.argeo.cms + ${version.argeo-commons} + org.argeo.commons org.argeo.jcr diff --git a/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerHttpContext.java b/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerHttpContext.java new file mode 100644 index 000000000..6a67168de --- /dev/null +++ b/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerHttpContext.java @@ -0,0 +1,84 @@ +package org.argeo.slc.core.execution.http; + +import java.io.IOException; +import java.net.URL; +import java.security.AccessControlContext; +import java.security.AccessController; +import java.security.PrivilegedAction; + +import javax.security.auth.Subject; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.argeo.cms.auth.HttpRequestCallbackHandler; +import org.argeo.node.NodeConstants; +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; +import org.osgi.service.http.HttpContext; + +class RunnerHttpContext implements HttpContext { + final static String HEADER_WWW_AUTHENTICATE = "WWW-Authenticate"; + private final static Log log = LogFactory.getLog(RunnerHttpContext.class); + + private final BundleContext bc = FrameworkUtil.getBundle(getClass()).getBundleContext(); + + private final String httpAuthRealm; + + public RunnerHttpContext(String httpAuthrealm) { + this.httpAuthRealm = httpAuthrealm; + } + + @Override + public boolean handleSecurity(final HttpServletRequest request, HttpServletResponse response) throws IOException { + LoginContext lc; + try { + lc = new LoginContext(NodeConstants.LOGIN_CONTEXT_USER, new HttpRequestCallbackHandler(request, response)); + lc.login(); + } catch (LoginException e) { + lc = processUnauthorized(request, response); + if (lc == null) + return false; + } + Subject.doAs(lc.getSubject(), new PrivilegedAction() { + + @Override + public Void run() { + request.setAttribute(REMOTE_USER, AccessController.getContext()); + return null; + } + + }); + + return true; + } + + @Override + public URL getResource(String name) { + return bc.getBundle().getResource(name); + } + + @Override + public String getMimeType(String name) { + return null; + } + + protected LoginContext processUnauthorized(HttpServletRequest request, HttpServletResponse response) { + askForWwwAuth(request, response); + return null; + } + + protected void askForWwwAuth(HttpServletRequest request, HttpServletResponse response) { + response.setStatus(401); + // if (org.argeo.cms.internal.kernel.Activator.getAcceptorCredentials() + // != null && !forceBasic)// SPNEGO + // response.setHeader(HttpUtils.HEADER_WWW_AUTHENTICATE, "Negotiate"); + // else + response.setHeader(HEADER_WWW_AUTHENTICATE, "Basic realm=\"" + httpAuthRealm + "\""); + + } + +} diff --git a/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerServlet.java b/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerServlet.java new file mode 100644 index 000000000..cebf4d82a --- /dev/null +++ b/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerServlet.java @@ -0,0 +1,265 @@ +package org.argeo.slc.core.execution.http; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.AccessControlContext; +import java.security.PrivilegedAction; +import java.util.Collection; +import java.util.GregorianCalendar; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import javax.jcr.Node; +import javax.jcr.Property; +import javax.jcr.Repository; +import javax.jcr.RepositoryException; +import javax.jcr.Session; +import javax.jcr.nodetype.NodeType; +import javax.naming.ldap.LdapName; +import javax.security.auth.Subject; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.argeo.cms.auth.CmsSession; +import org.argeo.jcr.JcrUtils; +import org.argeo.node.NodeUtils; +import org.argeo.slc.SlcException; +import org.argeo.slc.execution.ExecutionProcess; +import org.argeo.slc.jcr.SlcNames; +import org.argeo.slc.jcr.SlcTypes; +import org.osgi.framework.BundleContext; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceReference; +import org.osgi.service.http.HttpContext; +import org.osgi.service.http.HttpService; +import org.osgi.util.tracker.ServiceTracker; + +public class RunnerServlet extends HttpServlet { + private final static Log log = LogFactory.getLog(RunnerServlet.class); + + private static final long serialVersionUID = -317016687309065291L; + + private Path baseDir; + private BundleContext bc; + private ExecutorService executor; + + public RunnerServlet(BundleContext bc, Path baseDir) { + this.bc = bc; + this.baseDir = baseDir; + this.executor = Executors.newFixedThreadPool(20); + } + + @Override + protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + super.service(req, resp); + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + // InputStream in = req.getInputStream(); + // Gson gson = new Gson(); + // JsonParser jsonParser = new JsonParser(); + // BufferedReader reader = new BufferedReader(new InputStreamReader(in, + // Charset.forName("UTF-8"))); + // JsonElement payload = jsonParser.parse(reader); + // String payloadStr = gson.toJson(payload); + String path = req.getPathInfo(); + InputStream in = req.getInputStream(); + OutputStream out = resp.getOutputStream(); + + String tokens[] = path.split("/"); + // first token alway empty + String workgroup = tokens[1]; + + CmsSession cmsSession = getByLocalId(req.getSession().getId()); + + boolean authorized = false; + for (String role : cmsSession.getAuthorization().getRoles()) { + if (role.startsWith("cn=" + workgroup)) { + authorized = true; + break; + } + } + if (!authorized) { + resp.setStatus(403); + return; + } + LdapName userDn = cmsSession.getUserDn(); + AccessControlContext acc = (AccessControlContext) req.getAttribute(HttpContext.REMOTE_USER); + Subject subject = Subject.getSubject(acc); + // flow path + StringBuilder sb = new StringBuilder(""); + for (int i = 2; i < tokens.length; i++) { + sb.append('/').append(tokens[i]); + } + String flowPath = sb.toString(); + String ext = FilenameUtils.getExtension(flowPath.toString()); + + // JCR + Repository repository = bc.getService(bc.getServiceReference(Repository.class)); + Session session = Subject.doAs(subject, new PrivilegedAction() { + + @Override + public Session run() { + try { + return repository.login(); + } catch (RepositoryException e) { + throw new RuntimeException("Cannot login", e); + } + } + + }); + UUID processUuid = UUID.randomUUID(); + GregorianCalendar started = new GregorianCalendar(); + Node groupHome = NodeUtils.getGroupHome(session, workgroup); + String processPath = SlcNames.SLC_SYSTEM + "/" + SlcNames.SLC_PROCESSES + "/" + + JcrUtils.dateAsPath(started, true) + processUuid; + Node processNode = JcrUtils.mkdirs(groupHome, processPath, SlcTypes.SLC_PROCESS); + Node realizedFlowNode; + try { + processNode.setProperty(SlcNames.SLC_UUID, processUuid.toString()); + processNode.setProperty(SlcNames.SLC_STATUS, ExecutionProcess.RUNNING); + realizedFlowNode = processNode.addNode(SlcNames.SLC_FLOW); + realizedFlowNode.addMixin(SlcTypes.SLC_REALIZED_FLOW); + realizedFlowNode.setProperty(SlcNames.SLC_STARTED, started); + Node addressNode = realizedFlowNode.addNode(SlcNames.SLC_ADDRESS, NodeType.NT_ADDRESS); + addressNode.setProperty(Property.JCR_PATH, flowPath); + processNode.getSession().save(); + } catch (RepositoryException e1) { + throw new SlcException("Cannot register SLC process", e1); + } + + if (log.isDebugEnabled()) + log.debug(userDn + " " + workgroup + " " + flowPath); + + try { + resp.setHeader("Content-Type", "application/json"); + ServiceChannel serviceChannel = new ServiceChannel(Channels.newChannel(in), Channels.newChannel(out), + executor); + Callable task; + if (ext.equals("api")) { + String uri = Files.readAllLines(baseDir.resolve(flowPath.substring(1))).get(0); + task = new WebServiceTask(serviceChannel, uri); + } else { + task = createTask(serviceChannel, flowPath); + } + + if (task == null) + throw new SlcException("No task found for " + flowPath); + + // execute + Future f = executor.submit(task); + int written = f.get(); + if (log.isTraceEnabled()) + log.trace("Written " + written + " bytes"); + try { + processNode.setProperty(SlcNames.SLC_STATUS, ExecutionProcess.COMPLETED); + realizedFlowNode.setProperty(SlcNames.SLC_COMPLETED, new GregorianCalendar()); + processNode.getSession().save(); + } catch (RepositoryException e1) { + throw new SlcException("Cannot update SLC process status", e1); + } + } catch (Exception e) { + try { + processNode.setProperty(SlcNames.SLC_STATUS, ExecutionProcess.ERROR); + realizedFlowNode.setProperty(SlcNames.SLC_COMPLETED, new GregorianCalendar()); + processNode.getSession().save(); + } catch (RepositoryException e1) { + throw new SlcException("Cannot update SLC process status", e1); + } + throw new SlcException("Task " + flowPath + " failed", e); + } finally { + JcrUtils.logoutQuietly(session); + } + + // JsonElement answer = jsonParser.parse(answerStr); + // resp.setHeader("Content-Type", "application/json"); + // JsonWriter jsonWriter = gson.newJsonWriter(resp.getWriter()); + // jsonWriter.setIndent(" "); + // gson.toJson(answer, jsonWriter); + // jsonWriter.flush(); + } + + protected Callable createTask(ServiceChannel serviceChannel, String flowPath) { + return null; + } + + protected Path getBaseDir() { + return baseDir; + } + + public static void register(BundleContext bc, String alias, RunnerServlet runnerServlet, String httpAuthrealm) { + try { + ServiceTracker serviceTracker = new ServiceTracker(bc, + HttpService.class, null) { + + @Override + public HttpService addingService(ServiceReference reference) { + // TODO Auto-generated method stub + HttpService httpService = super.addingService(reference); + try { + httpService.registerServlet(alias, runnerServlet, null, new RunnerHttpContext(httpAuthrealm)); + } catch (Exception e) { + throw new SlcException("Cannot register servlet", e); + } + return httpService; + } + + }; + // ServiceReference ref = + // bc.getServiceReference(HttpService.class); + // HttpService httpService = bc.getService(ref); + // httpService.registerServlet(alias, runnerServlet, null, null); + // bc.ungetService(ref); + serviceTracker.open(); + } catch (Exception e) { + throw new SlcException("Cannot register servlet", e); + } + } + + public static void unregister(BundleContext bc, String alias) { + try { + ServiceReference ref = bc.getServiceReference(HttpService.class); + if (ref == null) + return; + HttpService httpService = bc.getService(ref); + httpService.unregister(alias); + bc.ungetService(ref); + } catch (Exception e) { + throw new SlcException("Cannot unregister servlet", e); + } + } + + CmsSession getByLocalId(String localId) { + // BundleContext bc = + // FrameworkUtil.getBundle(RunnerServlet.class).getBundleContext(); + Collection> sr; + try { + sr = bc.getServiceReferences(CmsSession.class, "(" + CmsSession.SESSION_LOCAL_ID + "=" + localId + ")"); + } catch (InvalidSyntaxException e) { + throw new SlcException("Cannot get CMS session for id " + localId, e); + } + ServiceReference cmsSessionRef; + if (sr.size() == 1) { + cmsSessionRef = sr.iterator().next(); + return (CmsSession) bc.getService(cmsSessionRef); + } else if (sr.size() == 0) { + return null; + } else + throw new SlcException(sr.size() + " CMS sessions registered for " + localId); + + } + +} diff --git a/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/ServiceChannel.java b/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/ServiceChannel.java new file mode 100644 index 000000000..3e6a1ab89 --- /dev/null +++ b/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/ServiceChannel.java @@ -0,0 +1,68 @@ +package org.argeo.slc.core.execution.http; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousByteChannel; +import java.nio.channels.CompletionHandler; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +public class ServiceChannel implements AsynchronousByteChannel { + private final ReadableByteChannel in; + private final WritableByteChannel out; + + private boolean open = true; + + private ExecutorService executor; + + public ServiceChannel(ReadableByteChannel in, WritableByteChannel out, ExecutorService executor) { + this.in = in; + this.out = out; + this.executor = executor; + } + + @Override + public Future read(ByteBuffer dst) { + return executor.submit(() -> in.read(dst)); + } + + @Override + public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { + try { + Future res = read(dst); + handler.completed(res.get(), attachment); + } catch (Exception e) { + handler.failed(e, attachment); + } + } + + @Override + public Future write(ByteBuffer src) { + return executor.submit(() -> out.write(src)); + } + + @Override + public void write(ByteBuffer src, A attachment, CompletionHandler handler) { + try { + Future res = write(src); + handler.completed(res.get(), attachment); + } catch (Exception e) { + handler.failed(e, attachment); + } + } + + @Override + /** NB: it does not close the underlying channels. */ + public void close() throws IOException { + open = false; + notifyAll(); + } + + @Override + public boolean isOpen() { + return open; + } + +} diff --git a/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/WebServiceTask.java b/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/WebServiceTask.java new file mode 100644 index 000000000..b94223294 --- /dev/null +++ b/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/WebServiceTask.java @@ -0,0 +1,74 @@ +package org.argeo.slc.core.execution.http; + +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousByteChannel; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.methods.InputStreamRequestEntity; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.RequestEntity; + +public class WebServiceTask implements Callable { + private String url; + private String requContentType; + private String respContentType; + + private AsynchronousByteChannel channel; + + public WebServiceTask(AsynchronousByteChannel channel, String url) { + this(url, "application/json", "application/json", channel); + } + + public WebServiceTask(String url, String requContentType, String respContentType, AsynchronousByteChannel channel) { + this.url = url; + this.requContentType = requContentType; + this.respContentType = respContentType; + this.channel = channel; + } + + @Override + public Integer call() throws Exception { + // Webservice + HttpClient httpClient = new HttpClient(); + PostMethod postMethod = new PostMethod(url); + InputStream in = Channels.newInputStream(channel); + RequestEntity requestEntity = new InputStreamRequestEntity(in, requContentType); + // StringRequestEntity requestEntity = new + // StringRequestEntity(payloadStr, "application/json", "UTF-8"); + postMethod.setRequestEntity(requestEntity); + httpClient.executeMethod(postMethod); + InputStream answerIn = postMethod.getResponseBodyAsStream(); + ReadableByteChannel answer = Channels.newChannel(answerIn); + ByteBuffer buffer = ByteBuffer.allocate(8 * 1024); + int read = 0; + Integer writeRes = 0; + while (read != -1) { + read = answer.read(buffer); + if (read <= 0) + break; + buffer.flip(); + Future f = channel.write(buffer); + writeRes = writeRes + f.get(); + buffer.clear(); + } + return writeRes; + } + + public String getUrl() { + return url; + } + + public String getRequContentType() { + return requContentType; + } + + public String getRespContentType() { + return respContentType; + } + +} -- 2.39.2