From 60aa2e64061a09e938d9f6cba35ee3fd66a19828 Mon Sep 17 00:00:00 2001 From: Mathieu Baudier Date: Tue, 19 Jul 2022 12:19:38 +0200 Subject: [PATCH] Improve events and web sockets --- .../src/org/argeo/api/cms/CmsConstants.java | 1 + .../src/org/argeo/api/cms/CmsContext.java | 9 +- .../org/argeo/api/cms/CmsEventSubscriber.java | 8 ++ org.argeo.cms.ee/OSGI-INF/statusHandler.xml | 8 ++ org.argeo.cms.ee/bnd.bnd | 3 +- org.argeo.cms.ee/build.properties | 5 +- .../websocket/javax/server/package-info.java | 2 - .../server/CmsWebSocketConfigurator.java | 13 ++- .../cms/websocket/server/EventEndpoint.java | 50 ++++++++++ .../cms/websocket/server/StatusEndpoints.java | 26 +++++ .../{javax => }/server/TestEndpoint.java | 23 +++-- .../server/WebSocketEventClient.java | 44 +++++++++ .../{javax => }/server/WebSocketTest.java | 4 +- .../{javax => }/server/WebSocketView.java | 2 +- .../websocket/server/WebsocketEndpoints.java | 9 ++ .../cms/websocket/server/package-info.java | 2 + .../servlet/internal/jetty/JettyConfig.java | 4 +- .../org/argeo/cms/jetty/CmsJettyServer.java | 45 ++++----- .../org/argeo/cms/jetty/JettyHttpContext.java | 32 +++++- .../cms/internal/runtime/CmsContextImpl.java | 97 ++++++++++++++++++- .../src/org/argeo/cms/swt/auth/CmsLogin.java | 3 +- 21 files changed, 338 insertions(+), 52 deletions(-) create mode 100644 org.argeo.api.cms/src/org/argeo/api/cms/CmsEventSubscriber.java create mode 100644 org.argeo.cms.ee/OSGI-INF/statusHandler.xml delete mode 100644 org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/package-info.java rename org.argeo.cms.ee/src/org/argeo/cms/websocket/{javax => }/server/CmsWebSocketConfigurator.java (95%) create mode 100644 org.argeo.cms.ee/src/org/argeo/cms/websocket/server/EventEndpoint.java create mode 100644 org.argeo.cms.ee/src/org/argeo/cms/websocket/server/StatusEndpoints.java rename org.argeo.cms.ee/src/org/argeo/cms/websocket/{javax => }/server/TestEndpoint.java (88%) create mode 100644 org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketEventClient.java rename org.argeo.cms.ee/src/org/argeo/cms/websocket/{javax => }/server/WebSocketTest.java (87%) rename org.argeo.cms.ee/src/org/argeo/cms/websocket/{javax => }/server/WebSocketView.java (96%) create mode 100644 org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebsocketEndpoints.java create mode 100644 org.argeo.cms.ee/src/org/argeo/cms/websocket/server/package-info.java diff --git a/org.argeo.api.cms/src/org/argeo/api/cms/CmsConstants.java b/org.argeo.api.cms/src/org/argeo/api/cms/CmsConstants.java index 578c82431..207b0a8df 100644 --- a/org.argeo.api.cms/src/org/argeo/api/cms/CmsConstants.java +++ b/org.argeo.api.cms/src/org/argeo/api/cms/CmsConstants.java @@ -83,6 +83,7 @@ public interface CmsConstants { * COMPONENT PROPERTIES */ String CONTEXT_PATH = "context.path"; + String EVENT_TOPICS = "event.topics"; /* * INIT FRAMEWORK PROPERTIES diff --git a/org.argeo.api.cms/src/org/argeo/api/cms/CmsContext.java b/org.argeo.api.cms/src/org/argeo/api/cms/CmsContext.java index 05108beac..64bb4255c 100644 --- a/org.argeo.api.cms/src/org/argeo/api/cms/CmsContext.java +++ b/org.argeo.api.cms/src/org/argeo/api/cms/CmsContext.java @@ -2,6 +2,7 @@ package org.argeo.api.cms; import java.util.List; import java.util.Locale; +import java.util.Map; import javax.security.auth.Subject; @@ -27,6 +28,12 @@ public interface CmsContext { /** Get the CMS session of this subject. */ CmsSession getCmsSession(Subject subject); - + CmsState getCmsState(); + + void sendEvent(String topic, Map event); + + void addEventSubscriber(String topic, CmsEventSubscriber eventSubscriber); + + void removeEventSubscriber(String topic, CmsEventSubscriber eventSubscriber); } diff --git a/org.argeo.api.cms/src/org/argeo/api/cms/CmsEventSubscriber.java b/org.argeo.api.cms/src/org/argeo/api/cms/CmsEventSubscriber.java new file mode 100644 index 000000000..9ca5eaa38 --- /dev/null +++ b/org.argeo.api.cms/src/org/argeo/api/cms/CmsEventSubscriber.java @@ -0,0 +1,8 @@ +package org.argeo.api.cms; + +import java.util.Map; + +public interface CmsEventSubscriber { + + void onEvent(String topic, Map properties); +} diff --git a/org.argeo.cms.ee/OSGI-INF/statusHandler.xml b/org.argeo.cms.ee/OSGI-INF/statusHandler.xml new file mode 100644 index 000000000..a530c336a --- /dev/null +++ b/org.argeo.cms.ee/OSGI-INF/statusHandler.xml @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/org.argeo.cms.ee/bnd.bnd b/org.argeo.cms.ee/bnd.bnd index 6fae1ea24..f09995c00 100644 --- a/org.argeo.cms.ee/bnd.bnd +++ b/org.argeo.cms.ee/bnd.bnd @@ -8,4 +8,5 @@ javax.servlet.*;version="[3,5)",\ Service-Component:\ OSGI-INF/pkgServletContext.xml,\ -OSGI-INF/pkgServlet.xml +OSGI-INF/pkgServlet.xml,\ +OSGI-INF/statusHandler.xml,\ diff --git a/org.argeo.cms.ee/build.properties b/org.argeo.cms.ee/build.properties index ee94f53be..eb170c950 100644 --- a/org.argeo.cms.ee/build.properties +++ b/org.argeo.cms.ee/build.properties @@ -1,5 +1,6 @@ -output.. = bin/ bin.includes = META-INF/,\ .,\ - OSGI-INF/jettyServiceFactory.xml + OSGI-INF/jettyServiceFactory.xml,\ + OSGI-INF/statusHandler.xml source.. = src/ +output.. = bin/ diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/package-info.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/package-info.java deleted file mode 100644 index 564c881bc..000000000 --- a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/package-info.java +++ /dev/null @@ -1,2 +0,0 @@ -/** Argeo CMS websocket integration. */ -package org.argeo.cms.websocket.javax.server; \ No newline at end of file diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/CmsWebSocketConfigurator.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/CmsWebSocketConfigurator.java similarity index 95% rename from org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/CmsWebSocketConfigurator.java rename to org.argeo.cms.ee/src/org/argeo/cms/websocket/server/CmsWebSocketConfigurator.java index 46dabc28e..880eb0ed5 100644 --- a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/CmsWebSocketConfigurator.java +++ b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/CmsWebSocketConfigurator.java @@ -1,4 +1,4 @@ -package org.argeo.cms.websocket.javax.server; +package org.argeo.cms.websocket.server; import java.security.AccessController; import java.security.PrivilegedAction; @@ -14,6 +14,7 @@ import javax.websocket.server.ServerEndpointConfig.Configurator; import org.argeo.api.cms.CmsAuth; import org.argeo.api.cms.CmsLog; +import org.argeo.api.cms.CmsState; import org.argeo.cms.auth.RemoteAuthCallbackHandler; import org.argeo.cms.auth.RemoteAuthSession; import org.argeo.cms.servlet.ServletHttpSession; @@ -28,6 +29,16 @@ public class CmsWebSocketConfigurator extends Configurator { private final static CmsLog log = CmsLog.getLog(CmsWebSocketConfigurator.class); final static String HEADER_WWW_AUTHENTICATE = "WWW-Authenticate"; + + private CmsState cmsState; + + public void start() { + + } + + public void stop() { + + } @Override public boolean checkOrigin(String originHeaderValue) { diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/EventEndpoint.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/EventEndpoint.java new file mode 100644 index 000000000..a6b2a4df7 --- /dev/null +++ b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/EventEndpoint.java @@ -0,0 +1,50 @@ +package org.argeo.cms.websocket.server; + +import java.io.IOException; +import java.util.Map; + +import javax.websocket.OnClose; +import javax.websocket.OnOpen; +import javax.websocket.RemoteEndpoint; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; + +import org.argeo.api.cms.CmsContext; +import org.argeo.api.cms.CmsEventSubscriber; +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; + +@ServerEndpoint(value = "/event/{topic}", configurator = CmsWebSocketConfigurator.class) +public class EventEndpoint implements CmsEventSubscriber { + private BundleContext bc = FrameworkUtil.getBundle(TestEndpoint.class).getBundleContext(); + + private RemoteEndpoint.Basic remote; + private CmsContext cmsContext; + +// private String topic = "cms"; + + @OnOpen + public void onOpen(Session session, @PathParam("topic") String topic) { + if (bc != null) { + cmsContext = bc.getService(bc.getServiceReference(CmsContext.class)); + cmsContext.addEventSubscriber(topic, this); + } + remote = session.getBasicRemote(); + + } + + @OnClose + public void onClose(@PathParam("topic") String topic) { + cmsContext.removeEventSubscriber(topic, this); + } + + @Override + public void onEvent(String topic, Map properties) { + try { + remote.sendText(topic + ": " + properties); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } +} diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/StatusEndpoints.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/StatusEndpoints.java new file mode 100644 index 000000000..d5839e25e --- /dev/null +++ b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/StatusEndpoints.java @@ -0,0 +1,26 @@ +package org.argeo.cms.websocket.server; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; + +public class StatusEndpoints implements WebsocketEndpoints, HttpHandler { + + @Override + public Set> getEndPoints() { + Set> res = new HashSet<>(); + res.add(EventEndpoint.class); + res.add(TestEndpoint.class); + return res; + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + // web socket only + exchange.sendResponseHeaders(200, -1); + } + +} diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/TestEndpoint.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/TestEndpoint.java similarity index 88% rename from org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/TestEndpoint.java rename to org.argeo.cms.ee/src/org/argeo/cms/websocket/server/TestEndpoint.java index e01f6f721..0575726d3 100644 --- a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/TestEndpoint.java +++ b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/TestEndpoint.java @@ -1,37 +1,37 @@ -package org.argeo.cms.websocket.javax.server; +package org.argeo.cms.websocket.server; -import java.io.IOException; -import java.security.AccessControlContext; import java.util.Hashtable; +import java.util.List; import java.util.Map; import javax.security.auth.Subject; import javax.websocket.CloseReason; +import javax.websocket.EndpointConfig; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.RemoteEndpoint; import javax.websocket.Session; +import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import org.argeo.api.cms.CmsLog; import org.argeo.cms.integration.CmsExceptionsChain; +import org.argeo.util.naming.NamingUtils; import org.osgi.framework.BundleContext; import org.osgi.framework.FrameworkUtil; import org.osgi.framework.ServiceRegistration; import org.osgi.service.event.Event; import org.osgi.service.event.EventConstants; import org.osgi.service.event.EventHandler; -import org.osgi.service.http.context.ServletContextHelper; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; /** Provides WebSocket access. */ -@ServerEndpoint(value = "/ws/test/events/") +@ServerEndpoint(value = "/test/{topic}", configurator = CmsWebSocketConfigurator.class) public class TestEndpoint implements EventHandler { private final static CmsLog log = CmsLog.getLog(TestEndpoint.class); @@ -58,7 +58,11 @@ public class TestEndpoint implements EventHandler { private WebSocketView view; @OnOpen - public void onWebSocketConnect(Session session) { + public void onOpen(Session session, EndpointConfig endpointConfig) { + Map> parameters = NamingUtils.queryToMap(session.getRequestURI()); + String path = NamingUtils.getQueryValue(parameters, "path"); + log.debug("WS Path: " + path); + wsSessionId = session.getId(); // 24h timeout @@ -107,7 +111,8 @@ public class TestEndpoint implements EventHandler { } @OnMessage - public void onWebSocketText(Session session, String message) throws JsonMappingException, JsonProcessingException { + public void onWebSocketText(@PathParam("topic") String topic, Session session, String message) + throws JsonMappingException, JsonProcessingException { try { if (log.isTraceEnabled()) log.trace("WS#" + view.getUid() + " received:\n" + message + "\n"); @@ -119,7 +124,7 @@ public class TestEndpoint implements EventHandler { // view.checkRole(SPECIFIC_ROLE); // computationUid= process(); // } - remote.sendText("ACK"); + remote.sendText("ACK " + topic); } catch (Exception e) { log.error("Error when receiving web socket message", e); sendSystemErrorMessage(e); diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketEventClient.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketEventClient.java new file mode 100644 index 000000000..c6cb88aea --- /dev/null +++ b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketEventClient.java @@ -0,0 +1,44 @@ +package org.argeo.cms.websocket.server; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.WebSocket; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +/** Tests connectivity to the web socket server. */ +public class WebSocketEventClient { + + public static void main(String[] args) throws Exception { + WebSocket.Listener listener = new WebSocket.Listener() { + + public CompletionStage onText(WebSocket webSocket, CharSequence message, boolean last) { + System.out.println(message); + CompletionStage res = CompletableFuture.completedStage(message.toString()); + return res; + } + + @Override + public CompletionStage onPong(WebSocket webSocket, ByteBuffer message) { + // System.out.println("Pong received."); + return null; + } + + }; + + HttpClient client = HttpClient.newHttpClient(); + CompletableFuture ws = client.newWebSocketBuilder() + .buildAsync(URI.create("ws://localhost:7070/cms/status/event/cms"), listener); + WebSocket webSocket = ws.get(); + webSocket.request(Long.MAX_VALUE); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> webSocket.sendClose(WebSocket.NORMAL_CLOSURE, ""))); + + while (!webSocket.isInputClosed()) { + webSocket.sendPing(ByteBuffer.allocate(0)); + Thread.sleep(10000); + } + } + +} diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/WebSocketTest.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketTest.java similarity index 87% rename from org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/WebSocketTest.java rename to org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketTest.java index 819837b49..b10bcfda2 100644 --- a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/WebSocketTest.java +++ b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketTest.java @@ -1,4 +1,4 @@ -package org.argeo.cms.websocket.javax.server; +package org.argeo.cms.websocket.server; import java.net.URI; import java.net.http.HttpClient; @@ -24,7 +24,7 @@ public class WebSocketTest { HttpClient client = HttpClient.newHttpClient(); CompletableFuture ws = client.newWebSocketBuilder() - .buildAsync(URI.create("ws://localhost:7070/ws/test/events/"), listener); + .buildAsync(URI.create("ws://localhost:7070/cms/status/test/my%20topic?path=my%2Frelative%2Fpath"), listener); WebSocket webSocket = ws.get(); webSocket.sendText("TEST", true); diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/WebSocketView.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketView.java similarity index 96% rename from org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/WebSocketView.java rename to org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketView.java index a5da88be9..736631b10 100644 --- a/org.argeo.cms.ee/src/org/argeo/cms/websocket/javax/server/WebSocketView.java +++ b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebSocketView.java @@ -1,4 +1,4 @@ -package org.argeo.cms.websocket.javax.server; +package org.argeo.cms.websocket.server; import java.security.Principal; import java.util.HashSet; diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebsocketEndpoints.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebsocketEndpoints.java new file mode 100644 index 000000000..f7cd69384 --- /dev/null +++ b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/WebsocketEndpoints.java @@ -0,0 +1,9 @@ +package org.argeo.cms.websocket.server; + +import java.util.Set; + +/** Configure web socket in Jetty without hard dependency. */ +public interface WebsocketEndpoints { + Set> getEndPoints(); + +} diff --git a/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/package-info.java b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/package-info.java new file mode 100644 index 000000000..9dfb76645 --- /dev/null +++ b/org.argeo.cms.ee/src/org/argeo/cms/websocket/server/package-info.java @@ -0,0 +1,2 @@ +/** Argeo CMS websocket integration. */ +package org.argeo.cms.websocket.server; \ No newline at end of file diff --git a/org.argeo.cms.lib.equinox/src/org/argeo/cms/servlet/internal/jetty/JettyConfig.java b/org.argeo.cms.lib.equinox/src/org/argeo/cms/servlet/internal/jetty/JettyConfig.java index e7a1ac176..50be8b7a7 100644 --- a/org.argeo.cms.lib.equinox/src/org/argeo/cms/servlet/internal/jetty/JettyConfig.java +++ b/org.argeo.cms.lib.equinox/src/org/argeo/cms/servlet/internal/jetty/JettyConfig.java @@ -14,8 +14,8 @@ import org.argeo.api.cms.CmsConstants; import org.argeo.api.cms.CmsLog; import org.argeo.api.cms.CmsState; import org.argeo.cms.CmsDeployProperty; -import org.argeo.cms.websocket.javax.server.CmsWebSocketConfigurator; -import org.argeo.cms.websocket.javax.server.TestEndpoint; +import org.argeo.cms.websocket.server.CmsWebSocketConfigurator; +import org.argeo.cms.websocket.server.TestEndpoint; import org.argeo.util.LangUtils; import org.eclipse.equinox.http.jetty.JettyConfigurator; import org.osgi.framework.BundleContext; diff --git a/org.argeo.cms.lib.jetty/src/org/argeo/cms/jetty/CmsJettyServer.java b/org.argeo.cms.lib.jetty/src/org/argeo/cms/jetty/CmsJettyServer.java index 3b9783ef5..a18f4b495 100644 --- a/org.argeo.cms.lib.jetty/src/org/argeo/cms/jetty/CmsJettyServer.java +++ b/org.argeo.cms.lib.jetty/src/org/argeo/cms/jetty/CmsJettyServer.java @@ -3,6 +3,7 @@ package org.argeo.cms.jetty; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Map; import javax.servlet.ServletContext; import javax.servlet.ServletException; @@ -14,8 +15,8 @@ import com.sun.net.httpserver.HttpContext; import org.argeo.api.cms.CmsState; import org.argeo.cms.CmsDeployProperty; -import org.argeo.cms.websocket.javax.server.CmsWebSocketConfigurator; -import org.argeo.cms.websocket.javax.server.TestEndpoint; +import org.argeo.cms.websocket.server.CmsWebSocketConfigurator; +import org.argeo.cms.websocket.server.TestEndpoint; import org.eclipse.jetty.server.session.SessionHandler; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer; @@ -35,7 +36,7 @@ public class CmsJettyServer extends JettyHttpServer { private Path tempDir; // WebSocket - private ServerContainer wsServerContainer; +// private ServerContainer wsServerContainer; private ServerEndpointConfig.Configurator wsEndpointConfigurator; private CmsState cmsState; @@ -88,25 +89,25 @@ public class CmsJettyServer extends JettyHttpServer { String webSocketEnabled = getDeployProperty(CmsDeployProperty.WEBSOCKET_ENABLED); // web socket if (webSocketEnabled != null && webSocketEnabled.equals(Boolean.toString(true))) { - JavaxWebSocketServletContainerInitializer.configure(servletContextHandler, new Configurator() { - - @Override - public void accept(ServletContext servletContext, ServerContainer serverContainer) - throws DeploymentException { - wsServerContainer = serverContainer; - - wsEndpointConfigurator = new CmsWebSocketConfigurator(); - - ServerEndpointConfig config = ServerEndpointConfig.Builder - .create(TestEndpoint.class, "/ws/test/events/").configurator(wsEndpointConfigurator) - .build(); - try { - wsServerContainer.addEndpoint(config); - } catch (DeploymentException e) { - throw new IllegalStateException("Cannot initalise the WebSocket server runtime.", e); - } - } - }); +// JavaxWebSocketServletContainerInitializer.configure(servletContextHandler, new Configurator() { +// +// @Override +// public void accept(ServletContext servletContext, ServerContainer serverContainer) +// throws DeploymentException { +//// wsServerContainer = serverContainer; +// +// CmsWebSocketConfigurator wsEndpointConfigurator = new CmsWebSocketConfigurator(); +// +// ServerEndpointConfig config = ServerEndpointConfig.Builder +// .create(TestEndpoint.class, "/ws/test/events/{topic}").configurator(wsEndpointConfigurator) +// .build(); +// try { +// serverContainer.addEndpoint(config); +// } catch (DeploymentException e) { +// throw new IllegalStateException("Cannot initalise the WebSocket server runtime.", e); +// } +// } +// }); } } diff --git a/org.argeo.cms.lib.jetty/src/org/argeo/cms/jetty/JettyHttpContext.java b/org.argeo.cms.lib.jetty/src/org/argeo/cms/jetty/JettyHttpContext.java index 7adb09be3..5876d52e8 100644 --- a/org.argeo.cms.lib.jetty/src/org/argeo/cms/jetty/JettyHttpContext.java +++ b/org.argeo.cms.lib.jetty/src/org/argeo/cms/jetty/JettyHttpContext.java @@ -9,12 +9,17 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import javax.servlet.ServletContext; +import javax.websocket.DeploymentException; +import javax.websocket.server.ServerContainer; + import org.argeo.cms.servlet.httpserver.HttpContextServlet; -import org.eclipse.jetty.server.Handler; -import org.eclipse.jetty.server.handler.ContextHandler; +import org.argeo.cms.websocket.server.WebsocketEndpoints; import org.eclipse.jetty.server.session.SessionHandler; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer.Configurator; import com.sun.net.httpserver.Authenticator; import com.sun.net.httpserver.Filter; @@ -26,7 +31,7 @@ import com.sun.net.httpserver.HttpServer; class JettyHttpContext extends HttpContext { private final JettyHttpServer httpServer; private final String path; - private final ContextHandler contextHandler; + private final ServletContextHandler contextHandler; private final ContextAttributes attributes; private final List filters = new ArrayList<>(); @@ -63,6 +68,25 @@ class JettyHttpContext extends HttpContext { Objects.requireNonNull(handler); this.handler = handler; + // web socket + if (handler instanceof WebsocketEndpoints) { + JavaxWebSocketServletContainerInitializer.configure(contextHandler, new Configurator() { + + @Override + public void accept(ServletContext servletContext, ServerContainer serverContainer) + throws DeploymentException { +// CmsWebSocketConfigurator wsEndpointConfigurator = new CmsWebSocketConfigurator(); + + for (Class clss : ((WebsocketEndpoints) handler).getEndPoints()) { +// Class clss = websocketEndpoints.get(path); +// ServerEndpointConfig config = ServerEndpointConfig.Builder.create(clss, path) +// .configurator(wsEndpointConfigurator).build(); + serverContainer.addEndpoint(clss); + } + } + }); + } + if (httpServer.isStarted()) try { contextHandler.start(); @@ -103,7 +127,7 @@ class JettyHttpContext extends HttpContext { return authenticator; } - public Handler getContextHandler() { + ServletContextHandler getContextHandler() { return contextHandler; } diff --git a/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsContextImpl.java b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsContextImpl.java index e14b21e70..ea9a401a4 100644 --- a/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsContextImpl.java +++ b/org.argeo.cms/src/org/argeo/cms/internal/runtime/CmsContextImpl.java @@ -1,34 +1,37 @@ package org.argeo.cms.internal.runtime; -import static java.util.Locale.ENGLISH; - import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.SubmissionPublisher; import javax.security.auth.Subject; -import org.argeo.api.cms.CmsConstants; import org.argeo.api.cms.CmsContext; import org.argeo.api.cms.CmsDeployment; +import org.argeo.api.cms.CmsEventSubscriber; import org.argeo.api.cms.CmsLog; import org.argeo.api.cms.CmsSession; import org.argeo.api.cms.CmsSessionId; import org.argeo.api.cms.CmsState; import org.argeo.api.uuid.UuidFactory; import org.argeo.cms.CmsDeployProperty; -import org.argeo.cms.LocaleUtils; import org.argeo.cms.internal.auth.CmsSessionImpl; import org.ietf.jgss.GSSCredential; import org.osgi.service.useradmin.UserAdmin; public class CmsContextImpl implements CmsContext { + private final CmsLog log = CmsLog.getLog(getClass()); // private final BundleContext bc = FrameworkUtil.getBundle(getClass()).getBundleContext(); @@ -51,6 +54,10 @@ public class CmsContextImpl implements CmsContext { private Map cmsSessionsByUuid = new HashMap<>(); private Map cmsSessionsByLocalId = new HashMap<>(); + // CMS events + private Map>> topics = new TreeMap<>(); +// private IdentityHashMap> subscriptions = new IdentityHashMap<>(); + // public CmsContextImpl() { // initTrackers(); // } @@ -311,4 +318,86 @@ public class CmsContextImpl implements CmsContext { return cmsSessionsByLocalId.get(localId); } + /* + * CMS Events + */ + public void sendEvent(String topic, Map event) { + SubmissionPublisher> publisher = topics.get(topic); + if (publisher == null) + return; // no one is interested + publisher.submit(event); + } + + public void addEventSubscriber(String topic, CmsEventSubscriber subscriber) { + synchronized (topics) { + if (!topics.containsKey(topic)) + topics.put(topic, new SubmissionPublisher<>()); + } + SubmissionPublisher> publisher = topics.get(topic); + CmsEventFlowSubscriber flowSubscriber = new CmsEventFlowSubscriber(topic, subscriber); + publisher.subscribe(flowSubscriber); + } + + public void removeEventSubscriber(String topic, CmsEventSubscriber subscriber) { + SubmissionPublisher> publisher = topics.get(topic); + if (publisher == null) { + log.error("There should be an event topic " + topic); + return; + } + for (Flow.Subscriber> flowSubscriber : publisher.getSubscribers()) { + if (flowSubscriber instanceof CmsEventFlowSubscriber) + ((CmsEventFlowSubscriber) flowSubscriber).unsubscribe(); + } + synchronized (topics) { + if (!publisher.hasSubscribers()) { + publisher.close(); + topics.remove(topic); + } + } + } + + static class CmsEventFlowSubscriber implements Flow.Subscriber> { + private String topic; + private CmsEventSubscriber eventSubscriber; + + private Subscription subscription; + + public CmsEventFlowSubscriber(String topic, CmsEventSubscriber eventSubscriber) { + this.topic = topic; + this.eventSubscriber = eventSubscriber; + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Map item) { + eventSubscriber.onEvent(topic, item); + + } + + @Override + public void onError(Throwable throwable) { + // TODO Auto-generated method stub + + } + + @Override + public void onComplete() { + // TODO Auto-generated method stub + + } + + void unsubscribe() { + if (subscription != null) + subscription.cancel(); + else + throw new IllegalStateException("No subscription to cancel"); + } + + } + } diff --git a/swt/org.argeo.cms.swt/src/org/argeo/cms/swt/auth/CmsLogin.java b/swt/org.argeo.cms.swt/src/org/argeo/cms/swt/auth/CmsLogin.java index 6cc410ced..4af0c6c1a 100644 --- a/swt/org.argeo.cms.swt/src/org/argeo/cms/swt/auth/CmsLogin.java +++ b/swt/org.argeo.cms.swt/src/org/argeo/cms/swt/auth/CmsLogin.java @@ -4,6 +4,7 @@ import static org.argeo.cms.CmsMsg.password; import static org.argeo.cms.CmsMsg.username; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Locale; @@ -278,6 +279,7 @@ public class CmsLogin implements CmsStyles, CallbackHandler { loginContext = new LoginContext(CmsAuth.LOGIN_CONTEXT_USER, subject, this); loginContext.login(); cmsView.authChange(loginContext); + cmsContext.sendEvent("cms", Collections.singletonMap("msg", "New login")); return true; } catch (LoginException e) { if (log.isTraceEnabled()) @@ -299,7 +301,6 @@ public class CmsLogin implements CmsStyles, CallbackHandler { // } } - protected void logout() { cmsView.logout(); cmsView.navigateTo("~"); -- 2.30.2