Improve events and web sockets
authorMathieu Baudier <mbaudier@argeo.org>
Tue, 19 Jul 2022 10:19:38 +0000 (12:19 +0200)
committerMathieu Baudier <mbaudier@argeo.org>
Tue, 19 Jul 2022 10:19:38 +0000 (12:19 +0200)
25 files changed:
org.argeo.api.cms/src/org/argeo/api/cms/CmsConstants.java
org.argeo.api.cms/src/org/argeo/api/cms/CmsContext.java
org.argeo.api.cms/src/org/argeo/api/cms/CmsEventSubscriber.java [new file with mode: 0644]
org.argeo.cms.ee/OSGI-INF/statusHandler.xml [new file with mode: 0644]
org.argeo.cms.ee/bnd.bnd
org.argeo.cms.ee/build.properties
org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/CmsWebSocketConfigurator.java [deleted file]
org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/TestEndpoint.java [deleted file]
org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/WebSocketTest.java [deleted file]
org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/WebSocketView.java [deleted file]
org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/package-info.java [deleted file]
org.argeo.cms.ee/src/org/argeo/cms/websocket/server/CmsWebSocketConfigurator.java [new file with mode: 0644]
org.argeo.cms.ee/src/org/argeo/cms/websocket/server/EventEndpoint.java [new file with mode: 0644]
org.argeo.cms.ee/src/org/argeo/cms/websocket/server/StatusEndpoints.java [new file with mode: 0644]
org.argeo.cms.ee/src/org/argeo/cms/websocket/server/TestEndpoint.java [new file with mode: 0644]
org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketEventClient.java [new file with mode: 0644]
org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketTest.java [new file with mode: 0644]
org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketView.java [new file with mode: 0644]
org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebsocketEndpoints.java [new file with mode: 0644]
org.argeo.cms.ee/src/org/argeo/cms/websocket/server/package-info.java [new file with mode: 0644]
org.argeo.cms.lib.equinox/src/org/argeo/cms/servlet/internal/jetty/JettyConfig.java
org.argeo.cms.lib.jetty/src/org/argeo/cms/jetty/CmsJettyServer.java
org.argeo.cms.lib.jetty/src/org/argeo/cms/jetty/JettyHttpContext.java
org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsContextImpl.java
swt/org.argeo.cms.swt/src/org/argeo/cms/swt/auth/CmsLogin.java

index 578c82431d8362fc711c939c968d686258cfb645..207b0a8df98b7d44419d3aa505fae663fb6fa334 100644 (file)
@@ -83,6 +83,7 @@ public interface CmsConstants {
         * COMPONENT PROPERTIES
         */
        String CONTEXT_PATH = "context.path";
+       String EVENT_TOPICS = "event.topics";
 
        /*
         * INIT FRAMEWORK PROPERTIES
index 05108beac3e2ad0bc5a18842d8419255ced8b308..64bb4255c9922103f931aeee855a94fdd24a80cb 100644 (file)
@@ -2,6 +2,7 @@ package org.argeo.api.cms;
 
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 
 import javax.security.auth.Subject;
 
@@ -27,6 +28,12 @@ public interface CmsContext {
 
        /** Get the CMS session of this subject. */
        CmsSession getCmsSession(Subject subject);
-       
+
        CmsState getCmsState();
+
+       void sendEvent(String topic, Map<String, Object> event);
+
+       void addEventSubscriber(String topic, CmsEventSubscriber eventSubscriber);
+
+       void removeEventSubscriber(String topic, CmsEventSubscriber eventSubscriber);
 }
diff --git a/org.argeo.api.cms/src/org/argeo/api/cms/CmsEventSubscriber.java b/org.argeo.api.cms/src/org/argeo/api/cms/CmsEventSubscriber.java
new file mode 100644 (file)
index 0000000..9ca5eaa
--- /dev/null
@@ -0,0 +1,8 @@
+package org.argeo.api.cms;
+
+import java.util.Map;
+
+public interface CmsEventSubscriber {
+
+       void onEvent(String topic, Map<String, Object> properties);
+}
diff --git a/org.argeo.cms.ee/OSGI-INF/statusHandler.xml b/org.argeo.cms.ee/OSGI-INF/statusHandler.xml
new file mode 100644 (file)
index 0000000..a530c33
--- /dev/null
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0" immmediate="true" name="Status Handler">
+   <implementation class="org.argeo.cms.websocket.server.StatusEndpoints"/>
+   <service>
+      <provide interface="com.sun.net.httpserver.HttpHandler"/>
+   </service>
+   <property name="context.path" type="String" value="/cms/status"/>
+</scr:component>
index 6fae1ea24e0c96c65008794e1e35b5579968d578..f09995c002a580a5684daf890e871e04148b319d 100644 (file)
@@ -8,4 +8,5 @@ javax.servlet.*;version="[3,5)",\
 
 Service-Component:\
 OSGI-INF/pkgServletContext.xml,\
-OSGI-INF/pkgServlet.xml
+OSGI-INF/pkgServlet.xml,\
+OSGI-INF/statusHandler.xml,\
index ee94f53be160fa0642e9d1cfd612c545df535738..eb170c950aaa7112587443ba3a75cda6088d5dce 100644 (file)
@@ -1,5 +1,6 @@
-output.. = bin/
 bin.includes = META-INF/,\
                .,\
-               OSGI-INF/jettyServiceFactory.xml
+               OSGI-INF/jettyServiceFactory.xml,\
+               OSGI-INF/statusHandler.xml
 source.. = src/
+output.. = bin/
diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/CmsWebSocketConfigurator.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/CmsWebSocketConfigurator.java
deleted file mode 100644 (file)
index 46dabc2..0000000
+++ /dev/null
@@ -1,109 +0,0 @@
-package org.argeo.cms.websocket.javax.server;
-
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.List;
-
-import javax.security.auth.Subject;
-import javax.security.auth.login.LoginContext;
-import javax.websocket.Extension;
-import javax.websocket.HandshakeResponse;
-import javax.websocket.server.HandshakeRequest;
-import javax.websocket.server.ServerEndpointConfig;
-import javax.websocket.server.ServerEndpointConfig.Configurator;
-
-import org.argeo.api.cms.CmsAuth;
-import org.argeo.api.cms.CmsLog;
-import org.argeo.cms.auth.RemoteAuthCallbackHandler;
-import org.argeo.cms.auth.RemoteAuthSession;
-import org.argeo.cms.servlet.ServletHttpSession;
-
-/**
- * <strong>Disabled until third party issues are solved.</strong>. Customises
- * the initialisation of a new web socket.
- */
-public class CmsWebSocketConfigurator extends Configurator {
-       public final static String WEBSOCKET_SUBJECT = "org.argeo.cms.websocket.subject";
-       public final static String REMOTE_USER = "org.osgi.service.http.authentication.remote.user";
-
-       private final static CmsLog log = CmsLog.getLog(CmsWebSocketConfigurator.class);
-       final static String HEADER_WWW_AUTHENTICATE = "WWW-Authenticate";
-
-       @Override
-       public boolean checkOrigin(String originHeaderValue) {
-               return true;
-       }
-
-       @Override
-       public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException {
-               try {
-                       return endpointClass.getDeclaredConstructor().newInstance();
-               } catch (Exception e) {
-                       throw new IllegalArgumentException("Cannot get endpoint instance", e);
-               }
-       }
-
-       @Override
-       public List<Extension> getNegotiatedExtensions(List<Extension> installed, List<Extension> requested) {
-               return requested;
-       }
-
-       @Override
-       public String getNegotiatedSubprotocol(List<String> supported, List<String> requested) {
-               if ((requested == null) || (requested.size() == 0))
-                       return "";
-               if ((supported == null) || (supported.isEmpty()))
-                       return "";
-               for (String possible : requested) {
-                       if (possible == null)
-                               continue;
-                       if (supported.contains(possible))
-                               return possible;
-               }
-               return "";
-       }
-
-       @Override
-       public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
-               if (true)
-                       return;
-
-               RemoteAuthSession httpSession = new ServletHttpSession(
-                               (javax.servlet.http.HttpSession) request.getHttpSession());
-               if (log.isDebugEnabled() && httpSession != null)
-                       log.debug("Web socket HTTP session id: " + httpSession.getId());
-
-               if (httpSession == null) {
-                       rejectResponse(response, null);
-               }
-               try {
-                       LoginContext lc = new LoginContext(CmsAuth.LOGIN_CONTEXT_USER, new RemoteAuthCallbackHandler(httpSession));
-                       lc.login();
-                       if (log.isDebugEnabled())
-                               log.debug("Web socket logged-in as " + lc.getSubject());
-                       Subject.doAs(lc.getSubject(), new PrivilegedAction<Void>() {
-
-                               @Override
-                               public Void run() {
-                                       sec.getUserProperties().put(REMOTE_USER, AccessController.getContext());
-                                       return null;
-                               }
-
-                       });
-               } catch (Exception e) {
-                       rejectResponse(response, e);
-               }
-       }
-
-       /**
-        * Behaviour when the web socket could not be authenticated. Throws an
-        * {@link IllegalStateException} by default.
-        * 
-        * @param e can be null
-        */
-       protected void rejectResponse(HandshakeResponse response, Exception e) {
-               // violent implementation, as suggested in
-               // https://stackoverflow.com/questions/21763829/jsr-356-how-to-abort-a-websocket-connection-during-the-handshake
-//             throw new IllegalStateException("Web socket cannot be authenticated");
-       }
-}
diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/TestEndpoint.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/TestEndpoint.java
deleted file mode 100644 (file)
index e01f6f7..0000000
+++ /dev/null
@@ -1,178 +0,0 @@
-package org.argeo.cms.websocket.javax.server;
-
-import java.io.IOException;
-import java.security.AccessControlContext;
-import java.util.Hashtable;
-import java.util.Map;
-
-import javax.security.auth.Subject;
-import javax.websocket.CloseReason;
-import javax.websocket.OnClose;
-import javax.websocket.OnError;
-import javax.websocket.OnMessage;
-import javax.websocket.OnOpen;
-import javax.websocket.RemoteEndpoint;
-import javax.websocket.Session;
-import javax.websocket.server.ServerEndpoint;
-
-import org.argeo.api.cms.CmsLog;
-import org.argeo.cms.integration.CmsExceptionsChain;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.event.Event;
-import org.osgi.service.event.EventConstants;
-import org.osgi.service.event.EventHandler;
-import org.osgi.service.http.context.ServletContextHelper;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/** Provides WebSocket access. */
-@ServerEndpoint(value = "/ws/test/events/")
-public class TestEndpoint implements EventHandler {
-       private final static CmsLog log = CmsLog.getLog(TestEndpoint.class);
-
-       final static String TOPICS_BASE = "/test";
-       final static String INPUT = "input";
-       final static String TOPIC = "topic";
-       final static String VIEW_UID = "viewUid";
-       final static String COMPUTATION_UID = "computationUid";
-       final static String MESSAGES = "messages";
-       final static String ERRORS = "errors";
-
-       final static String EXCEPTION = "exception";
-       final static String MESSAGE = "message";
-
-       private BundleContext bc = FrameworkUtil.getBundle(TestEndpoint.class).getBundleContext();
-
-       private String wsSessionId;
-       private RemoteEndpoint.Basic remote;
-       private ServiceRegistration<EventHandler> eventHandlerSr;
-
-       // json
-       private ObjectMapper objectMapper = new ObjectMapper();
-
-       private WebSocketView view;
-
-       @OnOpen
-       public void onWebSocketConnect(Session session) {
-               wsSessionId = session.getId();
-
-               // 24h timeout
-               session.setMaxIdleTimeout(1000 * 60 * 60 * 24);
-
-               Map<String, Object> userProperties = session.getUserProperties();
-               Subject subject = null;
-//             AccessControlContext accessControlContext = (AccessControlContext) userProperties
-//                             .get(ServletContextHelper.REMOTE_USER);
-//             Subject subject = Subject.getSubject(accessControlContext);
-//             // Deal with authentication failure
-//             if (subject == null) {
-//                     try {
-//                             CloseReason.CloseCode closeCode = new CloseReason.CloseCode() {
-//
-//                                     @Override
-//                                     public int getCode() {
-//                                             return 4001;
-//                                     }
-//                             };
-//                             session.close(new CloseReason(closeCode, "Unauthorized"));
-//                             if (log.isTraceEnabled())
-//                                     log.trace("Unauthorized web socket " + wsSessionId + ". Closing with code " + closeCode.getCode()
-//                                                     + ".");
-//                             return;
-//                     } catch (IOException e) {
-//                             // silent
-//                     }
-//                     return;// ignore
-//             }
-
-               if (log.isDebugEnabled())
-                       log.debug("WS#" + wsSessionId + " open for: " + subject);
-               remote = session.getBasicRemote();
-               view = new WebSocketView(subject);
-
-               // OSGi events
-               String[] topics = new String[] { TOPICS_BASE + "/*" };
-               Hashtable<String, Object> ht = new Hashtable<>();
-               ht.put(EventConstants.EVENT_TOPIC, topics);
-               ht.put(EventConstants.EVENT_FILTER, "(" + VIEW_UID + "=" + view.getUid() + ")");
-               eventHandlerSr = bc.registerService(EventHandler.class, this, ht);
-
-               if (log.isDebugEnabled())
-                       log.debug("New view " + view.getUid() + " opened, via web socket.");
-       }
-
-       @OnMessage
-       public void onWebSocketText(Session session, String message) throws JsonMappingException, JsonProcessingException {
-               try {
-                       if (log.isTraceEnabled())
-                               log.trace("WS#" + view.getUid() + " received:\n" + message + "\n");
-//                     JsonNode jsonNode = objectMapper.readTree(message);
-//                     String topic = jsonNode.get(TOPIC).textValue();
-
-                       final String computationUid = null;
-//                     if (MY_TOPIC.equals(topic)) {
-//                             view.checkRole(SPECIFIC_ROLE);
-//                             computationUid= process();
-//                     }
-                       remote.sendText("ACK");
-               } catch (Exception e) {
-                       log.error("Error when receiving web socket message", e);
-                       sendSystemErrorMessage(e);
-               }
-       }
-
-       @OnClose
-       public void onWebSocketClose(CloseReason reason) {
-               if (eventHandlerSr != null)
-                       eventHandlerSr.unregister();
-               if (view != null && log.isDebugEnabled())
-                       log.debug("WS#" + view.getUid() + " closed: " + reason);
-       }
-
-       @OnError
-       public void onWebSocketError(Throwable cause) {
-               if (view != null) {
-                       log.error("WS#" + view.getUid() + " ERROR", cause);
-               } else {
-                       if (log.isTraceEnabled())
-                               log.error("Error in web socket session " + wsSessionId, cause);
-               }
-       }
-
-       @Override
-       public void handleEvent(Event event) {
-               try {
-                       Object uid = event.getProperty(COMPUTATION_UID);
-                       Exception exception = (Exception) event.getProperty(EXCEPTION);
-                       if (exception != null) {
-                               CmsExceptionsChain systemErrors = new CmsExceptionsChain(exception);
-                               String sent = systemErrors.toJsonString(objectMapper);
-                               remote.sendText(sent);
-                               return;
-                       }
-                       String topic = event.getTopic();
-                       if (log.isTraceEnabled())
-                               log.trace("WS#" + view.getUid() + " " + topic + ": notify event " + topic + "#" + uid + ", " + event);
-               } catch (Exception e) {
-                       log.error("Error when handling event for WebSocket", e);
-                       sendSystemErrorMessage(e);
-               }
-
-       }
-
-       /** Sends an error message in JSON format. */
-       protected void sendSystemErrorMessage(Exception e) {
-               CmsExceptionsChain systemErrors = new CmsExceptionsChain(e);
-               try {
-                       if (remote != null)
-                               remote.sendText(systemErrors.toJsonString(objectMapper));
-               } catch (Exception e1) {
-                       log.error("Cannot send WebSocket system error messages " + systemErrors, e1);
-               }
-       }
-}
diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/WebSocketTest.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/WebSocketTest.java
deleted file mode 100644 (file)
index 819837b..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.argeo.cms.websocket.javax.server;
-
-import java.net.URI;
-import java.net.http.HttpClient;
-import java.net.http.WebSocket;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.TimeUnit;
-
-/** Tests connectivity to the web socket server. */
-public class WebSocketTest {
-
-       public static void main(String[] args) throws Exception {
-               CompletableFuture<Boolean> received = new CompletableFuture<>();
-               WebSocket.Listener listener = new WebSocket.Listener() {
-
-                       public CompletionStage<?> onText(WebSocket webSocket, CharSequence message, boolean last) {
-                               System.out.println(message);
-                               CompletionStage<String> res = CompletableFuture.completedStage(message.toString());
-                               received.complete(true);
-                               return res;
-                       }
-               };
-
-               HttpClient client = HttpClient.newHttpClient();
-               CompletableFuture<WebSocket> ws = client.newWebSocketBuilder()
-                               .buildAsync(URI.create("ws://localhost:7070/ws/test/events/"), listener);
-               WebSocket webSocket = ws.get();
-               webSocket.sendText("TEST", true);
-
-               received.get(10, TimeUnit.SECONDS);
-               webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "");
-       }
-
-}
diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/WebSocketView.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/WebSocketView.java
deleted file mode 100644 (file)
index a5da88b..0000000
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.argeo.cms.websocket.javax.server;
-
-import java.security.Principal;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-
-import javax.security.auth.Subject;
-import javax.security.auth.x500.X500Principal;
-
-import org.osgi.service.useradmin.Role;
-
-/**
- * Abstraction of a single Frontend view, that is a web browser page. There can
- * be multiple views within one single authenticated HTTP session.
- */
-public class WebSocketView {
-       private final String uid;
-       private Subject subject;
-
-       public WebSocketView(Subject subject) {
-               this.uid = UUID.randomUUID().toString();
-               this.subject = subject;
-       }
-
-       public String getUid() {
-               return uid;
-       }
-
-       public Set<String> getRoles() {
-               return roles(subject);
-       }
-
-       public boolean isInRole(String role) {
-               return getRoles().contains(role);
-       }
-
-       public void checkRole(String role) {
-               checkRole(subject, role);
-       }
-
-       public final static Set<String> roles(Subject subject) {
-               Set<String> roles = new HashSet<String>();
-               X500Principal principal = subject.getPrincipals(X500Principal.class).iterator().next();
-               String username = principal.getName();
-               roles.add(username);
-               for (Principal group : subject.getPrincipals()) {
-                       if (group instanceof Role)
-                               roles.add(group.getName());
-               }
-               return roles;
-       }
-
-       public static void checkRole(Subject subject, String role) {
-               Set<String> roles = roles(subject);
-               if (!roles.contains(role))
-                       throw new IllegalStateException("User is not in role " + role);
-       }
-
-}
diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/package-info.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/package-info.java
deleted file mode 100644 (file)
index 564c881..0000000
+++ /dev/null
@@ -1,2 +0,0 @@
-/** Argeo CMS websocket integration. */
-package org.argeo.cms.websocket.javax.server;
\ No newline at end of file
diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/CmsWebSocketConfigurator.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/CmsWebSocketConfigurator.java
new file mode 100644 (file)
index 0000000..880eb0e
--- /dev/null
@@ -0,0 +1,120 @@
+package org.argeo.cms.websocket.server;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.List;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginContext;
+import javax.websocket.Extension;
+import javax.websocket.HandshakeResponse;
+import javax.websocket.server.HandshakeRequest;
+import javax.websocket.server.ServerEndpointConfig;
+import javax.websocket.server.ServerEndpointConfig.Configurator;
+
+import org.argeo.api.cms.CmsAuth;
+import org.argeo.api.cms.CmsLog;
+import org.argeo.api.cms.CmsState;
+import org.argeo.cms.auth.RemoteAuthCallbackHandler;
+import org.argeo.cms.auth.RemoteAuthSession;
+import org.argeo.cms.servlet.ServletHttpSession;
+
+/**
+ * <strong>Disabled until third party issues are solved.</strong>. Customises
+ * the initialisation of a new web socket.
+ */
+public class CmsWebSocketConfigurator extends Configurator {
+       public final static String WEBSOCKET_SUBJECT = "org.argeo.cms.websocket.subject";
+       public final static String REMOTE_USER = "org.osgi.service.http.authentication.remote.user";
+
+       private final static CmsLog log = CmsLog.getLog(CmsWebSocketConfigurator.class);
+       final static String HEADER_WWW_AUTHENTICATE = "WWW-Authenticate";
+       
+       private CmsState cmsState;
+       
+       public void start() {
+               
+       }
+       
+       public void stop() {
+               
+       }
+
+       @Override
+       public boolean checkOrigin(String originHeaderValue) {
+               return true;
+       }
+
+       @Override
+       public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException {
+               try {
+                       return endpointClass.getDeclaredConstructor().newInstance();
+               } catch (Exception e) {
+                       throw new IllegalArgumentException("Cannot get endpoint instance", e);
+               }
+       }
+
+       @Override
+       public List<Extension> getNegotiatedExtensions(List<Extension> installed, List<Extension> requested) {
+               return requested;
+       }
+
+       @Override
+       public String getNegotiatedSubprotocol(List<String> supported, List<String> requested) {
+               if ((requested == null) || (requested.size() == 0))
+                       return "";
+               if ((supported == null) || (supported.isEmpty()))
+                       return "";
+               for (String possible : requested) {
+                       if (possible == null)
+                               continue;
+                       if (supported.contains(possible))
+                               return possible;
+               }
+               return "";
+       }
+
+       @Override
+       public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
+               if (true)
+                       return;
+
+               RemoteAuthSession httpSession = new ServletHttpSession(
+                               (javax.servlet.http.HttpSession) request.getHttpSession());
+               if (log.isDebugEnabled() && httpSession != null)
+                       log.debug("Web socket HTTP session id: " + httpSession.getId());
+
+               if (httpSession == null) {
+                       rejectResponse(response, null);
+               }
+               try {
+                       LoginContext lc = new LoginContext(CmsAuth.LOGIN_CONTEXT_USER, new RemoteAuthCallbackHandler(httpSession));
+                       lc.login();
+                       if (log.isDebugEnabled())
+                               log.debug("Web socket logged-in as " + lc.getSubject());
+                       Subject.doAs(lc.getSubject(), new PrivilegedAction<Void>() {
+
+                               @Override
+                               public Void run() {
+                                       sec.getUserProperties().put(REMOTE_USER, AccessController.getContext());
+                                       return null;
+                               }
+
+                       });
+               } catch (Exception e) {
+                       rejectResponse(response, e);
+               }
+       }
+
+       /**
+        * Behaviour when the web socket could not be authenticated. Throws an
+        * {@link IllegalStateException} by default.
+        * 
+        * @param e can be null
+        */
+       protected void rejectResponse(HandshakeResponse response, Exception e) {
+               // violent implementation, as suggested in
+               // https://stackoverflow.com/questions/21763829/jsr-356-how-to-abort-a-websocket-connection-during-the-handshake
+//             throw new IllegalStateException("Web socket cannot be authenticated");
+       }
+}
diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/EventEndpoint.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/EventEndpoint.java
new file mode 100644 (file)
index 0000000..a6b2a4d
--- /dev/null
@@ -0,0 +1,50 @@
+package org.argeo.cms.websocket.server;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.websocket.OnClose;
+import javax.websocket.OnOpen;
+import javax.websocket.RemoteEndpoint;
+import javax.websocket.Session;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+
+import org.argeo.api.cms.CmsContext;
+import org.argeo.api.cms.CmsEventSubscriber;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+
+@ServerEndpoint(value = "/event/{topic}", configurator = CmsWebSocketConfigurator.class)
+public class EventEndpoint implements CmsEventSubscriber {
+       private BundleContext bc = FrameworkUtil.getBundle(TestEndpoint.class).getBundleContext();
+
+       private RemoteEndpoint.Basic remote;
+       private CmsContext cmsContext;
+
+//     private String topic = "cms";
+
+       @OnOpen
+       public void onOpen(Session session, @PathParam("topic") String topic) {
+               if (bc != null) {
+                       cmsContext = bc.getService(bc.getServiceReference(CmsContext.class));
+                       cmsContext.addEventSubscriber(topic, this);
+               }
+               remote = session.getBasicRemote();
+
+       }
+
+       @OnClose
+       public void onClose(@PathParam("topic") String topic) {
+               cmsContext.removeEventSubscriber(topic, this);
+       }
+
+       @Override
+       public void onEvent(String topic, Map<String, Object> properties) {
+               try {
+                       remote.sendText(topic + ": " + properties);
+               } catch (IOException e) {
+                       throw new IllegalStateException(e);
+               }
+       }
+}
diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/StatusEndpoints.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/StatusEndpoints.java
new file mode 100644 (file)
index 0000000..d5839e2
--- /dev/null
@@ -0,0 +1,26 @@
+package org.argeo.cms.websocket.server;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+
+public class StatusEndpoints implements WebsocketEndpoints, HttpHandler {
+
+       @Override
+       public Set<Class<?>> getEndPoints() {
+               Set<Class<?>> res = new HashSet<>();
+               res.add(EventEndpoint.class);
+               res.add(TestEndpoint.class);
+               return res;
+       }
+
+       @Override
+       public void handle(HttpExchange exchange) throws IOException {
+               // web socket only
+               exchange.sendResponseHeaders(200, -1);
+       }
+
+}
diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/TestEndpoint.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/TestEndpoint.java
new file mode 100644 (file)
index 0000000..0575726
--- /dev/null
@@ -0,0 +1,183 @@
+package org.argeo.cms.websocket.server;
+
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import javax.security.auth.Subject;
+import javax.websocket.CloseReason;
+import javax.websocket.EndpointConfig;
+import javax.websocket.OnClose;
+import javax.websocket.OnError;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+import javax.websocket.RemoteEndpoint;
+import javax.websocket.Session;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+
+import org.argeo.api.cms.CmsLog;
+import org.argeo.cms.integration.CmsExceptionsChain;
+import org.argeo.util.naming.NamingUtils;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/** Provides WebSocket access. */
+@ServerEndpoint(value = "/test/{topic}", configurator = CmsWebSocketConfigurator.class)
+public class TestEndpoint implements EventHandler {
+       private final static CmsLog log = CmsLog.getLog(TestEndpoint.class);
+
+       final static String TOPICS_BASE = "/test";
+       final static String INPUT = "input";
+       final static String TOPIC = "topic";
+       final static String VIEW_UID = "viewUid";
+       final static String COMPUTATION_UID = "computationUid";
+       final static String MESSAGES = "messages";
+       final static String ERRORS = "errors";
+
+       final static String EXCEPTION = "exception";
+       final static String MESSAGE = "message";
+
+       private BundleContext bc = FrameworkUtil.getBundle(TestEndpoint.class).getBundleContext();
+
+       private String wsSessionId;
+       private RemoteEndpoint.Basic remote;
+       private ServiceRegistration<EventHandler> eventHandlerSr;
+
+       // json
+       private ObjectMapper objectMapper = new ObjectMapper();
+
+       private WebSocketView view;
+
+       @OnOpen
+       public void onOpen(Session session, EndpointConfig endpointConfig) {
+               Map<String, List<String>> parameters = NamingUtils.queryToMap(session.getRequestURI());
+               String path = NamingUtils.getQueryValue(parameters, "path");
+               log.debug("WS Path: " + path);
+
+               wsSessionId = session.getId();
+
+               // 24h timeout
+               session.setMaxIdleTimeout(1000 * 60 * 60 * 24);
+
+               Map<String, Object> userProperties = session.getUserProperties();
+               Subject subject = null;
+//             AccessControlContext accessControlContext = (AccessControlContext) userProperties
+//                             .get(ServletContextHelper.REMOTE_USER);
+//             Subject subject = Subject.getSubject(accessControlContext);
+//             // Deal with authentication failure
+//             if (subject == null) {
+//                     try {
+//                             CloseReason.CloseCode closeCode = new CloseReason.CloseCode() {
+//
+//                                     @Override
+//                                     public int getCode() {
+//                                             return 4001;
+//                                     }
+//                             };
+//                             session.close(new CloseReason(closeCode, "Unauthorized"));
+//                             if (log.isTraceEnabled())
+//                                     log.trace("Unauthorized web socket " + wsSessionId + ". Closing with code " + closeCode.getCode()
+//                                                     + ".");
+//                             return;
+//                     } catch (IOException e) {
+//                             // silent
+//                     }
+//                     return;// ignore
+//             }
+
+               if (log.isDebugEnabled())
+                       log.debug("WS#" + wsSessionId + " open for: " + subject);
+               remote = session.getBasicRemote();
+               view = new WebSocketView(subject);
+
+               // OSGi events
+               String[] topics = new String[] { TOPICS_BASE + "/*" };
+               Hashtable<String, Object> ht = new Hashtable<>();
+               ht.put(EventConstants.EVENT_TOPIC, topics);
+               ht.put(EventConstants.EVENT_FILTER, "(" + VIEW_UID + "=" + view.getUid() + ")");
+               eventHandlerSr = bc.registerService(EventHandler.class, this, ht);
+
+               if (log.isDebugEnabled())
+                       log.debug("New view " + view.getUid() + " opened, via web socket.");
+       }
+
+       @OnMessage
+       public void onWebSocketText(@PathParam("topic") String topic, Session session, String message)
+                       throws JsonMappingException, JsonProcessingException {
+               try {
+                       if (log.isTraceEnabled())
+                               log.trace("WS#" + view.getUid() + " received:\n" + message + "\n");
+//                     JsonNode jsonNode = objectMapper.readTree(message);
+//                     String topic = jsonNode.get(TOPIC).textValue();
+
+                       final String computationUid = null;
+//                     if (MY_TOPIC.equals(topic)) {
+//                             view.checkRole(SPECIFIC_ROLE);
+//                             computationUid= process();
+//                     }
+                       remote.sendText("ACK " + topic);
+               } catch (Exception e) {
+                       log.error("Error when receiving web socket message", e);
+                       sendSystemErrorMessage(e);
+               }
+       }
+
+       @OnClose
+       public void onWebSocketClose(CloseReason reason) {
+               if (eventHandlerSr != null)
+                       eventHandlerSr.unregister();
+               if (view != null && log.isDebugEnabled())
+                       log.debug("WS#" + view.getUid() + " closed: " + reason);
+       }
+
+       @OnError
+       public void onWebSocketError(Throwable cause) {
+               if (view != null) {
+                       log.error("WS#" + view.getUid() + " ERROR", cause);
+               } else {
+                       if (log.isTraceEnabled())
+                               log.error("Error in web socket session " + wsSessionId, cause);
+               }
+       }
+
+       @Override
+       public void handleEvent(Event event) {
+               try {
+                       Object uid = event.getProperty(COMPUTATION_UID);
+                       Exception exception = (Exception) event.getProperty(EXCEPTION);
+                       if (exception != null) {
+                               CmsExceptionsChain systemErrors = new CmsExceptionsChain(exception);
+                               String sent = systemErrors.toJsonString(objectMapper);
+                               remote.sendText(sent);
+                               return;
+                       }
+                       String topic = event.getTopic();
+                       if (log.isTraceEnabled())
+                               log.trace("WS#" + view.getUid() + " " + topic + ": notify event " + topic + "#" + uid + ", " + event);
+               } catch (Exception e) {
+                       log.error("Error when handling event for WebSocket", e);
+                       sendSystemErrorMessage(e);
+               }
+
+       }
+
+       /** Sends an error message in JSON format. */
+       protected void sendSystemErrorMessage(Exception e) {
+               CmsExceptionsChain systemErrors = new CmsExceptionsChain(e);
+               try {
+                       if (remote != null)
+                               remote.sendText(systemErrors.toJsonString(objectMapper));
+               } catch (Exception e1) {
+                       log.error("Cannot send WebSocket system error messages " + systemErrors, e1);
+               }
+       }
+}
diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketEventClient.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketEventClient.java
new file mode 100644 (file)
index 0000000..c6cb88a
--- /dev/null
@@ -0,0 +1,44 @@
+package org.argeo.cms.websocket.server;
+
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.WebSocket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/** Tests connectivity to the web socket server. */
+public class WebSocketEventClient {
+
+       public static void main(String[] args) throws Exception {
+               WebSocket.Listener listener = new WebSocket.Listener() {
+
+                       public CompletionStage<?> onText(WebSocket webSocket, CharSequence message, boolean last) {
+                               System.out.println(message);
+                               CompletionStage<String> res = CompletableFuture.completedStage(message.toString());
+                               return res;
+                       }
+
+                       @Override
+                       public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
+                               // System.out.println("Pong received.");
+                               return null;
+                       }
+
+               };
+
+               HttpClient client = HttpClient.newHttpClient();
+               CompletableFuture<WebSocket> ws = client.newWebSocketBuilder()
+                               .buildAsync(URI.create("ws://localhost:7070/cms/status/event/cms"), listener);
+               WebSocket webSocket = ws.get();
+               webSocket.request(Long.MAX_VALUE);
+
+               Runtime.getRuntime().addShutdownHook(new Thread(() -> webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "")));
+
+               while (!webSocket.isInputClosed()) {
+                       webSocket.sendPing(ByteBuffer.allocate(0));
+                       Thread.sleep(10000);
+               }
+       }
+
+}
diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketTest.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketTest.java
new file mode 100644 (file)
index 0000000..b10bcfd
--- /dev/null
@@ -0,0 +1,35 @@
+package org.argeo.cms.websocket.server;
+
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.WebSocket;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+
+/** Tests connectivity to the web socket server. */
+public class WebSocketTest {
+
+       public static void main(String[] args) throws Exception {
+               CompletableFuture<Boolean> received = new CompletableFuture<>();
+               WebSocket.Listener listener = new WebSocket.Listener() {
+
+                       public CompletionStage<?> onText(WebSocket webSocket, CharSequence message, boolean last) {
+                               System.out.println(message);
+                               CompletionStage<String> res = CompletableFuture.completedStage(message.toString());
+                               received.complete(true);
+                               return res;
+                       }
+               };
+
+               HttpClient client = HttpClient.newHttpClient();
+               CompletableFuture<WebSocket> ws = client.newWebSocketBuilder()
+                               .buildAsync(URI.create("ws://localhost:7070/cms/status/test/my%20topic?path=my%2Frelative%2Fpath"), listener);
+               WebSocket webSocket = ws.get();
+               webSocket.sendText("TEST", true);
+
+               received.get(10, TimeUnit.SECONDS);
+               webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "");
+       }
+
+}
diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketView.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketView.java
new file mode 100644 (file)
index 0000000..736631b
--- /dev/null
@@ -0,0 +1,60 @@
+package org.argeo.cms.websocket.server;
+
+import java.security.Principal;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.security.auth.Subject;
+import javax.security.auth.x500.X500Principal;
+
+import org.osgi.service.useradmin.Role;
+
+/**
+ * Abstraction of a single Frontend view, that is a web browser page. There can
+ * be multiple views within one single authenticated HTTP session.
+ */
+public class WebSocketView {
+       private final String uid;
+       private Subject subject;
+
+       public WebSocketView(Subject subject) {
+               this.uid = UUID.randomUUID().toString();
+               this.subject = subject;
+       }
+
+       public String getUid() {
+               return uid;
+       }
+
+       public Set<String> getRoles() {
+               return roles(subject);
+       }
+
+       public boolean isInRole(String role) {
+               return getRoles().contains(role);
+       }
+
+       public void checkRole(String role) {
+               checkRole(subject, role);
+       }
+
+       public final static Set<String> roles(Subject subject) {
+               Set<String> roles = new HashSet<String>();
+               X500Principal principal = subject.getPrincipals(X500Principal.class).iterator().next();
+               String username = principal.getName();
+               roles.add(username);
+               for (Principal group : subject.getPrincipals()) {
+                       if (group instanceof Role)
+                               roles.add(group.getName());
+               }
+               return roles;
+       }
+
+       public static void checkRole(Subject subject, String role) {
+               Set<String> roles = roles(subject);
+               if (!roles.contains(role))
+                       throw new IllegalStateException("User is not in role " + role);
+       }
+
+}
diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebsocketEndpoints.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebsocketEndpoints.java
new file mode 100644 (file)
index 0000000..f7cd693
--- /dev/null
@@ -0,0 +1,9 @@
+package org.argeo.cms.websocket.server;
+
+import java.util.Set;
+
+/** Configure web socket in Jetty without hard dependency. */
+public interface WebsocketEndpoints {
+       Set<Class<?>> getEndPoints();
+
+}
diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/package-info.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/package-info.java
new file mode 100644 (file)
index 0000000..9dfb766
--- /dev/null
@@ -0,0 +1,2 @@
+/** Argeo CMS websocket integration. */
+package org.argeo.cms.websocket.server;
\ No newline at end of file
index e7a1ac176d508ccf62187e6c152cde32f2324ab8..50be8b7a79cd9d2da2179c9ae3acd4f849de9eaf 100644 (file)
@@ -14,8 +14,8 @@ import org.argeo.api.cms.CmsConstants;
 import org.argeo.api.cms.CmsLog;
 import org.argeo.api.cms.CmsState;
 import org.argeo.cms.CmsDeployProperty;
-import org.argeo.cms.websocket.javax.server.CmsWebSocketConfigurator;
-import org.argeo.cms.websocket.javax.server.TestEndpoint;
+import org.argeo.cms.websocket.server.CmsWebSocketConfigurator;
+import org.argeo.cms.websocket.server.TestEndpoint;
 import org.argeo.util.LangUtils;
 import org.eclipse.equinox.http.jetty.JettyConfigurator;
 import org.osgi.framework.BundleContext;
index 3b9783ef5bc016cacdad31dfb5a6a42885a22493..a18f4b495632f908c9be762ac1587c49af46b5b2 100644 (file)
@@ -3,6 +3,7 @@ package org.argeo.cms.jetty;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Map;
 
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
@@ -14,8 +15,8 @@ import com.sun.net.httpserver.HttpContext;
 
 import org.argeo.api.cms.CmsState;
 import org.argeo.cms.CmsDeployProperty;
-import org.argeo.cms.websocket.javax.server.CmsWebSocketConfigurator;
-import org.argeo.cms.websocket.javax.server.TestEndpoint;
+import org.argeo.cms.websocket.server.CmsWebSocketConfigurator;
+import org.argeo.cms.websocket.server.TestEndpoint;
 import org.eclipse.jetty.server.session.SessionHandler;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer;
@@ -35,7 +36,7 @@ public class CmsJettyServer extends JettyHttpServer {
        private Path tempDir;
 
        // WebSocket
-       private ServerContainer wsServerContainer;
+//     private ServerContainer wsServerContainer;
        private ServerEndpointConfig.Configurator wsEndpointConfigurator;
 
        private CmsState cmsState;
@@ -88,25 +89,25 @@ public class CmsJettyServer extends JettyHttpServer {
                String webSocketEnabled = getDeployProperty(CmsDeployProperty.WEBSOCKET_ENABLED);
                // web socket
                if (webSocketEnabled != null && webSocketEnabled.equals(Boolean.toString(true))) {
-                       JavaxWebSocketServletContainerInitializer.configure(servletContextHandler, new Configurator() {
-
-                               @Override
-                               public void accept(ServletContext servletContext, ServerContainer serverContainer)
-                                               throws DeploymentException {
-                                       wsServerContainer = serverContainer;
-
-                                       wsEndpointConfigurator = new CmsWebSocketConfigurator();
-
-                                       ServerEndpointConfig config = ServerEndpointConfig.Builder
-                                                       .create(TestEndpoint.class, "/ws/test/events/").configurator(wsEndpointConfigurator)
-                                                       .build();
-                                       try {
-                                               wsServerContainer.addEndpoint(config);
-                                       } catch (DeploymentException e) {
-                                               throw new IllegalStateException("Cannot initalise the WebSocket server runtime.", e);
-                                       }
-                               }
-                       });
+//                     JavaxWebSocketServletContainerInitializer.configure(servletContextHandler, new Configurator() {
+//
+//                             @Override
+//                             public void accept(ServletContext servletContext, ServerContainer serverContainer)
+//                                             throws DeploymentException {
+////                                   wsServerContainer = serverContainer;
+//
+//                                     CmsWebSocketConfigurator wsEndpointConfigurator = new CmsWebSocketConfigurator();
+//
+//                                     ServerEndpointConfig config = ServerEndpointConfig.Builder
+//                                                     .create(TestEndpoint.class, "/ws/test/events/{topic}").configurator(wsEndpointConfigurator)
+//                                                     .build();
+//                                     try {
+//                                             serverContainer.addEndpoint(config);
+//                                     } catch (DeploymentException e) {
+//                                             throw new IllegalStateException("Cannot initalise the WebSocket server runtime.", e);
+//                                     }
+//                             }
+//                     });
                }
        }
 
index 7adb09be32911945fd9fb44f47c95006f5ef748e..5876d52e8ae72027c5a9deaa68e815affbf89ae9 100644 (file)
@@ -9,12 +9,17 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
+import javax.servlet.ServletContext;
+import javax.websocket.DeploymentException;
+import javax.websocket.server.ServerContainer;
+
 import org.argeo.cms.servlet.httpserver.HttpContextServlet;
-import org.eclipse.jetty.server.Handler;
-import org.eclipse.jetty.server.handler.ContextHandler;
+import org.argeo.cms.websocket.server.WebsocketEndpoints;
 import org.eclipse.jetty.server.session.SessionHandler;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer;
+import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer.Configurator;
 
 import com.sun.net.httpserver.Authenticator;
 import com.sun.net.httpserver.Filter;
@@ -26,7 +31,7 @@ import com.sun.net.httpserver.HttpServer;
 class JettyHttpContext extends HttpContext {
        private final JettyHttpServer httpServer;
        private final String path;
-       private final ContextHandler contextHandler;
+       private final ServletContextHandler contextHandler;
        private final ContextAttributes attributes;
        private final List<Filter> filters = new ArrayList<>();
 
@@ -63,6 +68,25 @@ class JettyHttpContext extends HttpContext {
                Objects.requireNonNull(handler);
                this.handler = handler;
 
+               // web socket
+               if (handler instanceof WebsocketEndpoints) {
+                       JavaxWebSocketServletContainerInitializer.configure(contextHandler, new Configurator() {
+
+                               @Override
+                               public void accept(ServletContext servletContext, ServerContainer serverContainer)
+                                               throws DeploymentException {
+//                                     CmsWebSocketConfigurator wsEndpointConfigurator = new CmsWebSocketConfigurator();
+
+                                       for (Class<?> clss : ((WebsocketEndpoints) handler).getEndPoints()) {
+//                                             Class<?> clss = websocketEndpoints.get(path);
+//                                             ServerEndpointConfig config = ServerEndpointConfig.Builder.create(clss, path)
+//                                                             .configurator(wsEndpointConfigurator).build();
+                                               serverContainer.addEndpoint(clss);
+                                       }
+                               }
+                       });
+               }
+
                if (httpServer.isStarted())
                        try {
                                contextHandler.start();
@@ -103,7 +127,7 @@ class JettyHttpContext extends HttpContext {
                return authenticator;
        }
 
-       public Handler getContextHandler() {
+       ServletContextHandler getContextHandler() {
                return contextHandler;
        }
 
index e14b21e7073cd39241250fa0ec337a2a795a83e8..ea9a401a4d3fc49ee2f617558432dbc11f170024 100644 (file)
@@ -1,34 +1,37 @@
 package org.argeo.cms.internal.runtime;
 
-import static java.util.Locale.ENGLISH;
-
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.SubmissionPublisher;
 
 import javax.security.auth.Subject;
 
-import org.argeo.api.cms.CmsConstants;
 import org.argeo.api.cms.CmsContext;
 import org.argeo.api.cms.CmsDeployment;
+import org.argeo.api.cms.CmsEventSubscriber;
 import org.argeo.api.cms.CmsLog;
 import org.argeo.api.cms.CmsSession;
 import org.argeo.api.cms.CmsSessionId;
 import org.argeo.api.cms.CmsState;
 import org.argeo.api.uuid.UuidFactory;
 import org.argeo.cms.CmsDeployProperty;
-import org.argeo.cms.LocaleUtils;
 import org.argeo.cms.internal.auth.CmsSessionImpl;
 import org.ietf.jgss.GSSCredential;
 import org.osgi.service.useradmin.UserAdmin;
 
 public class CmsContextImpl implements CmsContext {
+
        private final CmsLog log = CmsLog.getLog(getClass());
 //     private final BundleContext bc = FrameworkUtil.getBundle(getClass()).getBundleContext();
 
@@ -51,6 +54,10 @@ public class CmsContextImpl implements CmsContext {
        private Map<UUID, CmsSessionImpl> cmsSessionsByUuid = new HashMap<>();
        private Map<String, CmsSessionImpl> cmsSessionsByLocalId = new HashMap<>();
 
+       // CMS events
+       private Map<String, SubmissionPublisher<Map<String, Object>>> topics = new TreeMap<>();
+//     private IdentityHashMap<CmsEventSubscriber, List<CmsEventFlowSubscriber>> subscriptions = new IdentityHashMap<>();
+
 //     public CmsContextImpl() {
 //             initTrackers();
 //     }
@@ -311,4 +318,86 @@ public class CmsContextImpl implements CmsContext {
                return cmsSessionsByLocalId.get(localId);
        }
 
+       /*
+        * CMS Events
+        */
+       public void sendEvent(String topic, Map<String, Object> event) {
+               SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
+               if (publisher == null)
+                       return; // no one is interested
+               publisher.submit(event);
+       }
+
+       public void addEventSubscriber(String topic, CmsEventSubscriber subscriber) {
+               synchronized (topics) {
+                       if (!topics.containsKey(topic))
+                               topics.put(topic, new SubmissionPublisher<>());
+               }
+               SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
+               CmsEventFlowSubscriber flowSubscriber = new CmsEventFlowSubscriber(topic, subscriber);
+               publisher.subscribe(flowSubscriber);
+       }
+
+       public void removeEventSubscriber(String topic, CmsEventSubscriber subscriber) {
+               SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
+               if (publisher == null) {
+                       log.error("There should be an event topic " + topic);
+                       return;
+               }
+               for (Flow.Subscriber<? super Map<String, Object>> flowSubscriber : publisher.getSubscribers()) {
+                       if (flowSubscriber instanceof CmsEventFlowSubscriber)
+                               ((CmsEventFlowSubscriber) flowSubscriber).unsubscribe();
+               }
+               synchronized (topics) {
+                       if (!publisher.hasSubscribers()) {
+                               publisher.close();
+                               topics.remove(topic);
+                       }
+               }
+       }
+
+       static class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
+               private String topic;
+               private CmsEventSubscriber eventSubscriber;
+
+               private Subscription subscription;
+
+               public CmsEventFlowSubscriber(String topic, CmsEventSubscriber eventSubscriber) {
+                       this.topic = topic;
+                       this.eventSubscriber = eventSubscriber;
+               }
+
+               @Override
+               public void onSubscribe(Subscription subscription) {
+                       this.subscription = subscription;
+                       subscription.request(Long.MAX_VALUE);
+               }
+
+               @Override
+               public void onNext(Map<String, Object> item) {
+                       eventSubscriber.onEvent(topic, item);
+
+               }
+
+               @Override
+               public void onError(Throwable throwable) {
+                       // TODO Auto-generated method stub
+
+               }
+
+               @Override
+               public void onComplete() {
+                       // TODO Auto-generated method stub
+
+               }
+
+               void unsubscribe() {
+                       if (subscription != null)
+                               subscription.cancel();
+                       else
+                               throw new IllegalStateException("No subscription to cancel");
+               }
+
+       }
+
 }
index 6cc410ced0d07aff11369121d3687b880365bc8c..4af0c6c1ac79f20e19596b12056cbe4e59aa4f01 100644 (file)
@@ -4,6 +4,7 @@ import static org.argeo.cms.CmsMsg.password;
 import static org.argeo.cms.CmsMsg.username;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 
@@ -278,6 +279,7 @@ public class CmsLogin implements CmsStyles, CallbackHandler {
                                loginContext = new LoginContext(CmsAuth.LOGIN_CONTEXT_USER, subject, this);
                        loginContext.login();
                        cmsView.authChange(loginContext);
+                       cmsContext.sendEvent("cms", Collections.singletonMap("msg", "New login"));
                        return true;
                } catch (LoginException e) {
                        if (log.isTraceEnabled())
@@ -299,7 +301,6 @@ public class CmsLogin implements CmsStyles, CallbackHandler {
                // }
        }
 
-
        protected void logout() {
                cmsView.logout();
                cmsView.navigateTo("~");