Introduce http runner
authorMathieu Baudier <mbaudier@argeo.org>
Sun, 25 Jun 2017 09:47:16 +0000 (11:47 +0200)
committerMathieu Baudier <mbaudier@argeo.org>
Sun, 25 Jun 2017 09:47:16 +0000 (11:47 +0200)
org.argeo.slc.core/pom.xml
org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerHttpContext.java [new file with mode: 0644]
org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerServlet.java [new file with mode: 0644]
org.argeo.slc.core/src/org/argeo/slc/core/execution/http/ServiceChannel.java [new file with mode: 0644]
org.argeo.slc.core/src/org/argeo/slc/core/execution/http/WebServiceTask.java [new file with mode: 0644]

index 2b43cf24291d589aa922485d14a5bdaf91e796df..d4445db19078bf024ce06391c713b223d573f2c9 100644 (file)
                        <artifactId>org.argeo.node.api</artifactId>
                        <version>${version.argeo-commons}</version>
                </dependency>
+               <dependency>
+                       <groupId>org.argeo.commons</groupId>
+                       <artifactId>org.argeo.cms</artifactId>
+                       <version>${version.argeo-commons}</version>
+               </dependency>
                <dependency>
                        <groupId>org.argeo.commons</groupId>
                        <artifactId>org.argeo.jcr</artifactId>
diff --git a/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerHttpContext.java b/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerHttpContext.java
new file mode 100644 (file)
index 0000000..6a67168
--- /dev/null
@@ -0,0 +1,84 @@
+package org.argeo.slc.core.execution.http;
+
+import java.io.IOException;
+import java.net.URL;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.cms.auth.HttpRequestCallbackHandler;
+import org.argeo.node.NodeConstants;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.service.http.HttpContext;
+
+class RunnerHttpContext implements HttpContext {
+       final static String HEADER_WWW_AUTHENTICATE = "WWW-Authenticate";
+       private final static Log log = LogFactory.getLog(RunnerHttpContext.class);
+
+       private final BundleContext bc = FrameworkUtil.getBundle(getClass()).getBundleContext();
+
+       private final String httpAuthRealm;
+
+       public RunnerHttpContext(String httpAuthrealm) {
+               this.httpAuthRealm = httpAuthrealm;
+       }
+
+       @Override
+       public boolean handleSecurity(final HttpServletRequest request, HttpServletResponse response) throws IOException {
+               LoginContext lc;
+               try {
+                       lc = new LoginContext(NodeConstants.LOGIN_CONTEXT_USER, new HttpRequestCallbackHandler(request, response));
+                       lc.login();
+               } catch (LoginException e) {
+                       lc = processUnauthorized(request, response);
+                       if (lc == null)
+                               return false;
+               }
+               Subject.doAs(lc.getSubject(), new PrivilegedAction<Void>() {
+
+                       @Override
+                       public Void run() {
+                               request.setAttribute(REMOTE_USER, AccessController.getContext());
+                               return null;
+                       }
+
+               });
+
+               return true;
+       }
+
+       @Override
+       public URL getResource(String name) {
+               return bc.getBundle().getResource(name);
+       }
+
+       @Override
+       public String getMimeType(String name) {
+               return null;
+       }
+
+       protected LoginContext processUnauthorized(HttpServletRequest request, HttpServletResponse response) {
+               askForWwwAuth(request, response);
+               return null;
+       }
+
+       protected void askForWwwAuth(HttpServletRequest request, HttpServletResponse response) {
+               response.setStatus(401);
+               // if (org.argeo.cms.internal.kernel.Activator.getAcceptorCredentials()
+               // != null && !forceBasic)// SPNEGO
+               // response.setHeader(HttpUtils.HEADER_WWW_AUTHENTICATE, "Negotiate");
+               // else
+               response.setHeader(HEADER_WWW_AUTHENTICATE, "Basic realm=\"" + httpAuthRealm + "\"");
+
+       }
+
+}
diff --git a/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerServlet.java b/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/RunnerServlet.java
new file mode 100644 (file)
index 0000000..cebf4d8
--- /dev/null
@@ -0,0 +1,265 @@
+package org.argeo.slc.core.execution.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.AccessControlContext;
+import java.security.PrivilegedAction;
+import java.util.Collection;
+import java.util.GregorianCalendar;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import javax.jcr.Node;
+import javax.jcr.Property;
+import javax.jcr.Repository;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.nodetype.NodeType;
+import javax.naming.ldap.LdapName;
+import javax.security.auth.Subject;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.argeo.cms.auth.CmsSession;
+import org.argeo.jcr.JcrUtils;
+import org.argeo.node.NodeUtils;
+import org.argeo.slc.SlcException;
+import org.argeo.slc.execution.ExecutionProcess;
+import org.argeo.slc.jcr.SlcNames;
+import org.argeo.slc.jcr.SlcTypes;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.http.HttpContext;
+import org.osgi.service.http.HttpService;
+import org.osgi.util.tracker.ServiceTracker;
+
+public class RunnerServlet extends HttpServlet {
+       private final static Log log = LogFactory.getLog(RunnerServlet.class);
+
+       private static final long serialVersionUID = -317016687309065291L;
+
+       private Path baseDir;
+       private BundleContext bc;
+       private ExecutorService executor;
+
+       public RunnerServlet(BundleContext bc, Path baseDir) {
+               this.bc = bc;
+               this.baseDir = baseDir;
+               this.executor = Executors.newFixedThreadPool(20);
+       }
+
+       @Override
+       protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+               super.service(req, resp);
+       }
+
+       @Override
+       protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+               // InputStream in = req.getInputStream();
+               // Gson gson = new Gson();
+               // JsonParser jsonParser = new JsonParser();
+               // BufferedReader reader = new BufferedReader(new InputStreamReader(in,
+               // Charset.forName("UTF-8")));
+               // JsonElement payload = jsonParser.parse(reader);
+               // String payloadStr = gson.toJson(payload);
+               String path = req.getPathInfo();
+               InputStream in = req.getInputStream();
+               OutputStream out = resp.getOutputStream();
+
+               String tokens[] = path.split("/");
+               // first token alway empty
+               String workgroup = tokens[1];
+
+               CmsSession cmsSession = getByLocalId(req.getSession().getId());
+
+               boolean authorized = false;
+               for (String role : cmsSession.getAuthorization().getRoles()) {
+                       if (role.startsWith("cn=" + workgroup)) {
+                               authorized = true;
+                               break;
+                       }
+               }
+               if (!authorized) {
+                       resp.setStatus(403);
+                       return;
+               }
+               LdapName userDn = cmsSession.getUserDn();
+               AccessControlContext acc = (AccessControlContext) req.getAttribute(HttpContext.REMOTE_USER);
+               Subject subject = Subject.getSubject(acc);
+               // flow path
+               StringBuilder sb = new StringBuilder("");
+               for (int i = 2; i < tokens.length; i++) {
+                       sb.append('/').append(tokens[i]);
+               }
+               String flowPath = sb.toString();
+               String ext = FilenameUtils.getExtension(flowPath.toString());
+
+               // JCR
+               Repository repository = bc.getService(bc.getServiceReference(Repository.class));
+               Session session = Subject.doAs(subject, new PrivilegedAction<Session>() {
+
+                       @Override
+                       public Session run() {
+                               try {
+                                       return repository.login();
+                               } catch (RepositoryException e) {
+                                       throw new RuntimeException("Cannot login", e);
+                               }
+                       }
+
+               });
+               UUID processUuid = UUID.randomUUID();
+               GregorianCalendar started = new GregorianCalendar();
+               Node groupHome = NodeUtils.getGroupHome(session, workgroup);
+               String processPath = SlcNames.SLC_SYSTEM + "/" + SlcNames.SLC_PROCESSES + "/"
+                               + JcrUtils.dateAsPath(started, true) + processUuid;
+               Node processNode = JcrUtils.mkdirs(groupHome, processPath, SlcTypes.SLC_PROCESS);
+               Node realizedFlowNode;
+               try {
+                       processNode.setProperty(SlcNames.SLC_UUID, processUuid.toString());
+                       processNode.setProperty(SlcNames.SLC_STATUS, ExecutionProcess.RUNNING);
+                       realizedFlowNode = processNode.addNode(SlcNames.SLC_FLOW);
+                       realizedFlowNode.addMixin(SlcTypes.SLC_REALIZED_FLOW);
+                       realizedFlowNode.setProperty(SlcNames.SLC_STARTED, started);
+                       Node addressNode = realizedFlowNode.addNode(SlcNames.SLC_ADDRESS, NodeType.NT_ADDRESS);
+                       addressNode.setProperty(Property.JCR_PATH, flowPath);
+                       processNode.getSession().save();
+               } catch (RepositoryException e1) {
+                       throw new SlcException("Cannot register SLC process", e1);
+               }
+
+               if (log.isDebugEnabled())
+                       log.debug(userDn + " " + workgroup + " " + flowPath);
+
+               try {
+                       resp.setHeader("Content-Type", "application/json");
+                       ServiceChannel serviceChannel = new ServiceChannel(Channels.newChannel(in), Channels.newChannel(out),
+                                       executor);
+                       Callable<Integer> task;
+                       if (ext.equals("api")) {
+                               String uri = Files.readAllLines(baseDir.resolve(flowPath.substring(1))).get(0);
+                               task = new WebServiceTask(serviceChannel, uri);
+                       } else {
+                               task = createTask(serviceChannel, flowPath);
+                       }
+
+                       if (task == null)
+                               throw new SlcException("No task found for " + flowPath);
+
+                       // execute
+                       Future<Integer> f = executor.submit(task);
+                       int written = f.get();
+                       if (log.isTraceEnabled())
+                               log.trace("Written " + written + " bytes");
+                       try {
+                               processNode.setProperty(SlcNames.SLC_STATUS, ExecutionProcess.COMPLETED);
+                               realizedFlowNode.setProperty(SlcNames.SLC_COMPLETED, new GregorianCalendar());
+                               processNode.getSession().save();
+                       } catch (RepositoryException e1) {
+                               throw new SlcException("Cannot update SLC process status", e1);
+                       }
+               } catch (Exception e) {
+                       try {
+                               processNode.setProperty(SlcNames.SLC_STATUS, ExecutionProcess.ERROR);
+                               realizedFlowNode.setProperty(SlcNames.SLC_COMPLETED, new GregorianCalendar());
+                               processNode.getSession().save();
+                       } catch (RepositoryException e1) {
+                               throw new SlcException("Cannot update SLC process status", e1);
+                       }
+                       throw new SlcException("Task " + flowPath + " failed", e);
+               } finally {
+                       JcrUtils.logoutQuietly(session);
+               }
+
+               // JsonElement answer = jsonParser.parse(answerStr);
+               // resp.setHeader("Content-Type", "application/json");
+               // JsonWriter jsonWriter = gson.newJsonWriter(resp.getWriter());
+               // jsonWriter.setIndent(" ");
+               // gson.toJson(answer, jsonWriter);
+               // jsonWriter.flush();
+       }
+
+       protected Callable<Integer> createTask(ServiceChannel serviceChannel, String flowPath) {
+               return null;
+       }
+
+       protected Path getBaseDir() {
+               return baseDir;
+       }
+
+       public static void register(BundleContext bc, String alias, RunnerServlet runnerServlet, String httpAuthrealm) {
+               try {
+                       ServiceTracker<HttpService, HttpService> serviceTracker = new ServiceTracker<HttpService, HttpService>(bc,
+                                       HttpService.class, null) {
+
+                               @Override
+                               public HttpService addingService(ServiceReference<HttpService> reference) {
+                                       // TODO Auto-generated method stub
+                                       HttpService httpService = super.addingService(reference);
+                                       try {
+                                               httpService.registerServlet(alias, runnerServlet, null, new RunnerHttpContext(httpAuthrealm));
+                                       } catch (Exception e) {
+                                               throw new SlcException("Cannot register servlet", e);
+                                       }
+                                       return httpService;
+                               }
+
+                       };
+                       // ServiceReference<HttpService> ref =
+                       // bc.getServiceReference(HttpService.class);
+                       // HttpService httpService = bc.getService(ref);
+                       // httpService.registerServlet(alias, runnerServlet, null, null);
+                       // bc.ungetService(ref);
+                       serviceTracker.open();
+               } catch (Exception e) {
+                       throw new SlcException("Cannot register servlet", e);
+               }
+       }
+
+       public static void unregister(BundleContext bc, String alias) {
+               try {
+                       ServiceReference<HttpService> ref = bc.getServiceReference(HttpService.class);
+                       if (ref == null)
+                               return;
+                       HttpService httpService = bc.getService(ref);
+                       httpService.unregister(alias);
+                       bc.ungetService(ref);
+               } catch (Exception e) {
+                       throw new SlcException("Cannot unregister servlet", e);
+               }
+       }
+
+       CmsSession getByLocalId(String localId) {
+               // BundleContext bc =
+               // FrameworkUtil.getBundle(RunnerServlet.class).getBundleContext();
+               Collection<ServiceReference<CmsSession>> sr;
+               try {
+                       sr = bc.getServiceReferences(CmsSession.class, "(" + CmsSession.SESSION_LOCAL_ID + "=" + localId + ")");
+               } catch (InvalidSyntaxException e) {
+                       throw new SlcException("Cannot get CMS session for id " + localId, e);
+               }
+               ServiceReference<CmsSession> cmsSessionRef;
+               if (sr.size() == 1) {
+                       cmsSessionRef = sr.iterator().next();
+                       return (CmsSession) bc.getService(cmsSessionRef);
+               } else if (sr.size() == 0) {
+                       return null;
+               } else
+                       throw new SlcException(sr.size() + " CMS sessions registered for " + localId);
+
+       }
+
+}
diff --git a/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/ServiceChannel.java b/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/ServiceChannel.java
new file mode 100644 (file)
index 0000000..3e6a1ab
--- /dev/null
@@ -0,0 +1,68 @@
+package org.argeo.slc.core.execution.http;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousByteChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+public class ServiceChannel implements AsynchronousByteChannel {
+       private final ReadableByteChannel in;
+       private final WritableByteChannel out;
+
+       private boolean open = true;
+
+       private ExecutorService executor;
+
+       public ServiceChannel(ReadableByteChannel in, WritableByteChannel out, ExecutorService executor) {
+               this.in = in;
+               this.out = out;
+               this.executor = executor;
+       }
+
+       @Override
+       public Future<Integer> read(ByteBuffer dst) {
+               return executor.submit(() -> in.read(dst));
+       }
+
+       @Override
+       public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
+               try {
+                       Future<Integer> res = read(dst);
+                       handler.completed(res.get(), attachment);
+               } catch (Exception e) {
+                       handler.failed(e, attachment);
+               }
+       }
+
+       @Override
+       public Future<Integer> write(ByteBuffer src) {
+               return executor.submit(() -> out.write(src));
+       }
+
+       @Override
+       public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
+               try {
+                       Future<Integer> res = write(src);
+                       handler.completed(res.get(), attachment);
+               } catch (Exception e) {
+                       handler.failed(e, attachment);
+               }
+       }
+
+       @Override
+       /** NB: it does not close the underlying channels. */
+       public void close() throws IOException {
+               open = false;
+               notifyAll();
+       }
+
+       @Override
+       public boolean isOpen() {
+               return open;
+       }
+
+}
diff --git a/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/WebServiceTask.java b/org.argeo.slc.core/src/org/argeo/slc/core/execution/http/WebServiceTask.java
new file mode 100644 (file)
index 0000000..b942232
--- /dev/null
@@ -0,0 +1,74 @@
+package org.argeo.slc.core.execution.http;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousByteChannel;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.InputStreamRequestEntity;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.RequestEntity;
+
+public class WebServiceTask implements Callable<Integer> {
+       private String url;
+       private String requContentType;
+       private String respContentType;
+
+       private AsynchronousByteChannel channel;
+
+       public WebServiceTask(AsynchronousByteChannel channel, String url) {
+               this(url, "application/json", "application/json", channel);
+       }
+
+       public WebServiceTask(String url, String requContentType, String respContentType, AsynchronousByteChannel channel) {
+               this.url = url;
+               this.requContentType = requContentType;
+               this.respContentType = respContentType;
+               this.channel = channel;
+       }
+
+       @Override
+       public Integer call() throws Exception {
+               // Webservice
+               HttpClient httpClient = new HttpClient();
+               PostMethod postMethod = new PostMethod(url);
+               InputStream in = Channels.newInputStream(channel);
+               RequestEntity requestEntity = new InputStreamRequestEntity(in, requContentType);
+               // StringRequestEntity requestEntity = new
+               // StringRequestEntity(payloadStr, "application/json", "UTF-8");
+               postMethod.setRequestEntity(requestEntity);
+               httpClient.executeMethod(postMethod);
+               InputStream answerIn = postMethod.getResponseBodyAsStream();
+               ReadableByteChannel answer = Channels.newChannel(answerIn);
+               ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
+               int read = 0;
+               Integer writeRes = 0;
+               while (read != -1) {
+                       read = answer.read(buffer);
+                       if (read <= 0)
+                               break;
+                       buffer.flip();
+                       Future<Integer> f = channel.write(buffer);
+                       writeRes = writeRes + f.get();
+                       buffer.clear();
+               }
+               return writeRes;
+       }
+
+       public String getUrl() {
+               return url;
+       }
+
+       public String getRequContentType() {
+               return requContentType;
+       }
+
+       public String getRespContentType() {
+               return respContentType;
+       }
+
+}