* COMPONENT PROPERTIES
*/
String CONTEXT_PATH = "context.path";
+ String EVENT_TOPICS = "event.topics";
/*
* INIT FRAMEWORK PROPERTIES
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import javax.security.auth.Subject;
/** Get the CMS session of this subject. */
CmsSession getCmsSession(Subject subject);
-
+
CmsState getCmsState();
+
+ void sendEvent(String topic, Map<String, Object> event);
+
+ void addEventSubscriber(String topic, CmsEventSubscriber eventSubscriber);
+
+ void removeEventSubscriber(String topic, CmsEventSubscriber eventSubscriber);
}
--- /dev/null
+package org.argeo.api.cms;
+
+import java.util.Map;
+
+public interface CmsEventSubscriber {
+
+ void onEvent(String topic, Map<String, Object> properties);
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0" immmediate="true" name="Status Handler">
+ <implementation class="org.argeo.cms.websocket.server.StatusEndpoints"/>
+ <service>
+ <provide interface="com.sun.net.httpserver.HttpHandler"/>
+ </service>
+ <property name="context.path" type="String" value="/cms/status"/>
+</scr:component>
Service-Component:\
OSGI-INF/pkgServletContext.xml,\
-OSGI-INF/pkgServlet.xml
+OSGI-INF/pkgServlet.xml,\
+OSGI-INF/statusHandler.xml,\
-output.. = bin/
bin.includes = META-INF/,\
.,\
- OSGI-INF/jettyServiceFactory.xml
+ OSGI-INF/jettyServiceFactory.xml,\
+ OSGI-INF/statusHandler.xml
source.. = src/
+output.. = bin/
+++ /dev/null
-package org.argeo.cms.websocket.javax.server;
-
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.List;
-
-import javax.security.auth.Subject;
-import javax.security.auth.login.LoginContext;
-import javax.websocket.Extension;
-import javax.websocket.HandshakeResponse;
-import javax.websocket.server.HandshakeRequest;
-import javax.websocket.server.ServerEndpointConfig;
-import javax.websocket.server.ServerEndpointConfig.Configurator;
-
-import org.argeo.api.cms.CmsAuth;
-import org.argeo.api.cms.CmsLog;
-import org.argeo.cms.auth.RemoteAuthCallbackHandler;
-import org.argeo.cms.auth.RemoteAuthSession;
-import org.argeo.cms.servlet.ServletHttpSession;
-
-/**
- * <strong>Disabled until third party issues are solved.</strong>. Customises
- * the initialisation of a new web socket.
- */
-public class CmsWebSocketConfigurator extends Configurator {
- public final static String WEBSOCKET_SUBJECT = "org.argeo.cms.websocket.subject";
- public final static String REMOTE_USER = "org.osgi.service.http.authentication.remote.user";
-
- private final static CmsLog log = CmsLog.getLog(CmsWebSocketConfigurator.class);
- final static String HEADER_WWW_AUTHENTICATE = "WWW-Authenticate";
-
- @Override
- public boolean checkOrigin(String originHeaderValue) {
- return true;
- }
-
- @Override
- public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException {
- try {
- return endpointClass.getDeclaredConstructor().newInstance();
- } catch (Exception e) {
- throw new IllegalArgumentException("Cannot get endpoint instance", e);
- }
- }
-
- @Override
- public List<Extension> getNegotiatedExtensions(List<Extension> installed, List<Extension> requested) {
- return requested;
- }
-
- @Override
- public String getNegotiatedSubprotocol(List<String> supported, List<String> requested) {
- if ((requested == null) || (requested.size() == 0))
- return "";
- if ((supported == null) || (supported.isEmpty()))
- return "";
- for (String possible : requested) {
- if (possible == null)
- continue;
- if (supported.contains(possible))
- return possible;
- }
- return "";
- }
-
- @Override
- public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
- if (true)
- return;
-
- RemoteAuthSession httpSession = new ServletHttpSession(
- (javax.servlet.http.HttpSession) request.getHttpSession());
- if (log.isDebugEnabled() && httpSession != null)
- log.debug("Web socket HTTP session id: " + httpSession.getId());
-
- if (httpSession == null) {
- rejectResponse(response, null);
- }
- try {
- LoginContext lc = new LoginContext(CmsAuth.LOGIN_CONTEXT_USER, new RemoteAuthCallbackHandler(httpSession));
- lc.login();
- if (log.isDebugEnabled())
- log.debug("Web socket logged-in as " + lc.getSubject());
- Subject.doAs(lc.getSubject(), new PrivilegedAction<Void>() {
-
- @Override
- public Void run() {
- sec.getUserProperties().put(REMOTE_USER, AccessController.getContext());
- return null;
- }
-
- });
- } catch (Exception e) {
- rejectResponse(response, e);
- }
- }
-
- /**
- * Behaviour when the web socket could not be authenticated. Throws an
- * {@link IllegalStateException} by default.
- *
- * @param e can be null
- */
- protected void rejectResponse(HandshakeResponse response, Exception e) {
- // violent implementation, as suggested in
- // https://stackoverflow.com/questions/21763829/jsr-356-how-to-abort-a-websocket-connection-during-the-handshake
-// throw new IllegalStateException("Web socket cannot be authenticated");
- }
-}
+++ /dev/null
-package org.argeo.cms.websocket.javax.server;
-
-import java.io.IOException;
-import java.security.AccessControlContext;
-import java.util.Hashtable;
-import java.util.Map;
-
-import javax.security.auth.Subject;
-import javax.websocket.CloseReason;
-import javax.websocket.OnClose;
-import javax.websocket.OnError;
-import javax.websocket.OnMessage;
-import javax.websocket.OnOpen;
-import javax.websocket.RemoteEndpoint;
-import javax.websocket.Session;
-import javax.websocket.server.ServerEndpoint;
-
-import org.argeo.api.cms.CmsLog;
-import org.argeo.cms.integration.CmsExceptionsChain;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.event.Event;
-import org.osgi.service.event.EventConstants;
-import org.osgi.service.event.EventHandler;
-import org.osgi.service.http.context.ServletContextHelper;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/** Provides WebSocket access. */
-@ServerEndpoint(value = "/ws/test/events/")
-public class TestEndpoint implements EventHandler {
- private final static CmsLog log = CmsLog.getLog(TestEndpoint.class);
-
- final static String TOPICS_BASE = "/test";
- final static String INPUT = "input";
- final static String TOPIC = "topic";
- final static String VIEW_UID = "viewUid";
- final static String COMPUTATION_UID = "computationUid";
- final static String MESSAGES = "messages";
- final static String ERRORS = "errors";
-
- final static String EXCEPTION = "exception";
- final static String MESSAGE = "message";
-
- private BundleContext bc = FrameworkUtil.getBundle(TestEndpoint.class).getBundleContext();
-
- private String wsSessionId;
- private RemoteEndpoint.Basic remote;
- private ServiceRegistration<EventHandler> eventHandlerSr;
-
- // json
- private ObjectMapper objectMapper = new ObjectMapper();
-
- private WebSocketView view;
-
- @OnOpen
- public void onWebSocketConnect(Session session) {
- wsSessionId = session.getId();
-
- // 24h timeout
- session.setMaxIdleTimeout(1000 * 60 * 60 * 24);
-
- Map<String, Object> userProperties = session.getUserProperties();
- Subject subject = null;
-// AccessControlContext accessControlContext = (AccessControlContext) userProperties
-// .get(ServletContextHelper.REMOTE_USER);
-// Subject subject = Subject.getSubject(accessControlContext);
-// // Deal with authentication failure
-// if (subject == null) {
-// try {
-// CloseReason.CloseCode closeCode = new CloseReason.CloseCode() {
-//
-// @Override
-// public int getCode() {
-// return 4001;
-// }
-// };
-// session.close(new CloseReason(closeCode, "Unauthorized"));
-// if (log.isTraceEnabled())
-// log.trace("Unauthorized web socket " + wsSessionId + ". Closing with code " + closeCode.getCode()
-// + ".");
-// return;
-// } catch (IOException e) {
-// // silent
-// }
-// return;// ignore
-// }
-
- if (log.isDebugEnabled())
- log.debug("WS#" + wsSessionId + " open for: " + subject);
- remote = session.getBasicRemote();
- view = new WebSocketView(subject);
-
- // OSGi events
- String[] topics = new String[] { TOPICS_BASE + "/*" };
- Hashtable<String, Object> ht = new Hashtable<>();
- ht.put(EventConstants.EVENT_TOPIC, topics);
- ht.put(EventConstants.EVENT_FILTER, "(" + VIEW_UID + "=" + view.getUid() + ")");
- eventHandlerSr = bc.registerService(EventHandler.class, this, ht);
-
- if (log.isDebugEnabled())
- log.debug("New view " + view.getUid() + " opened, via web socket.");
- }
-
- @OnMessage
- public void onWebSocketText(Session session, String message) throws JsonMappingException, JsonProcessingException {
- try {
- if (log.isTraceEnabled())
- log.trace("WS#" + view.getUid() + " received:\n" + message + "\n");
-// JsonNode jsonNode = objectMapper.readTree(message);
-// String topic = jsonNode.get(TOPIC).textValue();
-
- final String computationUid = null;
-// if (MY_TOPIC.equals(topic)) {
-// view.checkRole(SPECIFIC_ROLE);
-// computationUid= process();
-// }
- remote.sendText("ACK");
- } catch (Exception e) {
- log.error("Error when receiving web socket message", e);
- sendSystemErrorMessage(e);
- }
- }
-
- @OnClose
- public void onWebSocketClose(CloseReason reason) {
- if (eventHandlerSr != null)
- eventHandlerSr.unregister();
- if (view != null && log.isDebugEnabled())
- log.debug("WS#" + view.getUid() + " closed: " + reason);
- }
-
- @OnError
- public void onWebSocketError(Throwable cause) {
- if (view != null) {
- log.error("WS#" + view.getUid() + " ERROR", cause);
- } else {
- if (log.isTraceEnabled())
- log.error("Error in web socket session " + wsSessionId, cause);
- }
- }
-
- @Override
- public void handleEvent(Event event) {
- try {
- Object uid = event.getProperty(COMPUTATION_UID);
- Exception exception = (Exception) event.getProperty(EXCEPTION);
- if (exception != null) {
- CmsExceptionsChain systemErrors = new CmsExceptionsChain(exception);
- String sent = systemErrors.toJsonString(objectMapper);
- remote.sendText(sent);
- return;
- }
- String topic = event.getTopic();
- if (log.isTraceEnabled())
- log.trace("WS#" + view.getUid() + " " + topic + ": notify event " + topic + "#" + uid + ", " + event);
- } catch (Exception e) {
- log.error("Error when handling event for WebSocket", e);
- sendSystemErrorMessage(e);
- }
-
- }
-
- /** Sends an error message in JSON format. */
- protected void sendSystemErrorMessage(Exception e) {
- CmsExceptionsChain systemErrors = new CmsExceptionsChain(e);
- try {
- if (remote != null)
- remote.sendText(systemErrors.toJsonString(objectMapper));
- } catch (Exception e1) {
- log.error("Cannot send WebSocket system error messages " + systemErrors, e1);
- }
- }
-}
+++ /dev/null
-package org.argeo.cms.websocket.javax.server;
-
-import java.net.URI;
-import java.net.http.HttpClient;
-import java.net.http.WebSocket;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.TimeUnit;
-
-/** Tests connectivity to the web socket server. */
-public class WebSocketTest {
-
- public static void main(String[] args) throws Exception {
- CompletableFuture<Boolean> received = new CompletableFuture<>();
- WebSocket.Listener listener = new WebSocket.Listener() {
-
- public CompletionStage<?> onText(WebSocket webSocket, CharSequence message, boolean last) {
- System.out.println(message);
- CompletionStage<String> res = CompletableFuture.completedStage(message.toString());
- received.complete(true);
- return res;
- }
- };
-
- HttpClient client = HttpClient.newHttpClient();
- CompletableFuture<WebSocket> ws = client.newWebSocketBuilder()
- .buildAsync(URI.create("ws://localhost:7070/ws/test/events/"), listener);
- WebSocket webSocket = ws.get();
- webSocket.sendText("TEST", true);
-
- received.get(10, TimeUnit.SECONDS);
- webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "");
- }
-
-}
+++ /dev/null
-package org.argeo.cms.websocket.javax.server;
-
-import java.security.Principal;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-
-import javax.security.auth.Subject;
-import javax.security.auth.x500.X500Principal;
-
-import org.osgi.service.useradmin.Role;
-
-/**
- * Abstraction of a single Frontend view, that is a web browser page. There can
- * be multiple views within one single authenticated HTTP session.
- */
-public class WebSocketView {
- private final String uid;
- private Subject subject;
-
- public WebSocketView(Subject subject) {
- this.uid = UUID.randomUUID().toString();
- this.subject = subject;
- }
-
- public String getUid() {
- return uid;
- }
-
- public Set<String> getRoles() {
- return roles(subject);
- }
-
- public boolean isInRole(String role) {
- return getRoles().contains(role);
- }
-
- public void checkRole(String role) {
- checkRole(subject, role);
- }
-
- public final static Set<String> roles(Subject subject) {
- Set<String> roles = new HashSet<String>();
- X500Principal principal = subject.getPrincipals(X500Principal.class).iterator().next();
- String username = principal.getName();
- roles.add(username);
- for (Principal group : subject.getPrincipals()) {
- if (group instanceof Role)
- roles.add(group.getName());
- }
- return roles;
- }
-
- public static void checkRole(Subject subject, String role) {
- Set<String> roles = roles(subject);
- if (!roles.contains(role))
- throw new IllegalStateException("User is not in role " + role);
- }
-
-}
+++ /dev/null
-/** Argeo CMS websocket integration. */
-package org.argeo.cms.websocket.javax.server;
\ No newline at end of file
--- /dev/null
+package org.argeo.cms.websocket.server;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.List;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginContext;
+import javax.websocket.Extension;
+import javax.websocket.HandshakeResponse;
+import javax.websocket.server.HandshakeRequest;
+import javax.websocket.server.ServerEndpointConfig;
+import javax.websocket.server.ServerEndpointConfig.Configurator;
+
+import org.argeo.api.cms.CmsAuth;
+import org.argeo.api.cms.CmsLog;
+import org.argeo.api.cms.CmsState;
+import org.argeo.cms.auth.RemoteAuthCallbackHandler;
+import org.argeo.cms.auth.RemoteAuthSession;
+import org.argeo.cms.servlet.ServletHttpSession;
+
+/**
+ * <strong>Disabled until third party issues are solved.</strong>. Customises
+ * the initialisation of a new web socket.
+ */
+public class CmsWebSocketConfigurator extends Configurator {
+ public final static String WEBSOCKET_SUBJECT = "org.argeo.cms.websocket.subject";
+ public final static String REMOTE_USER = "org.osgi.service.http.authentication.remote.user";
+
+ private final static CmsLog log = CmsLog.getLog(CmsWebSocketConfigurator.class);
+ final static String HEADER_WWW_AUTHENTICATE = "WWW-Authenticate";
+
+ private CmsState cmsState;
+
+ public void start() {
+
+ }
+
+ public void stop() {
+
+ }
+
+ @Override
+ public boolean checkOrigin(String originHeaderValue) {
+ return true;
+ }
+
+ @Override
+ public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException {
+ try {
+ return endpointClass.getDeclaredConstructor().newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Cannot get endpoint instance", e);
+ }
+ }
+
+ @Override
+ public List<Extension> getNegotiatedExtensions(List<Extension> installed, List<Extension> requested) {
+ return requested;
+ }
+
+ @Override
+ public String getNegotiatedSubprotocol(List<String> supported, List<String> requested) {
+ if ((requested == null) || (requested.size() == 0))
+ return "";
+ if ((supported == null) || (supported.isEmpty()))
+ return "";
+ for (String possible : requested) {
+ if (possible == null)
+ continue;
+ if (supported.contains(possible))
+ return possible;
+ }
+ return "";
+ }
+
+ @Override
+ public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
+ if (true)
+ return;
+
+ RemoteAuthSession httpSession = new ServletHttpSession(
+ (javax.servlet.http.HttpSession) request.getHttpSession());
+ if (log.isDebugEnabled() && httpSession != null)
+ log.debug("Web socket HTTP session id: " + httpSession.getId());
+
+ if (httpSession == null) {
+ rejectResponse(response, null);
+ }
+ try {
+ LoginContext lc = new LoginContext(CmsAuth.LOGIN_CONTEXT_USER, new RemoteAuthCallbackHandler(httpSession));
+ lc.login();
+ if (log.isDebugEnabled())
+ log.debug("Web socket logged-in as " + lc.getSubject());
+ Subject.doAs(lc.getSubject(), new PrivilegedAction<Void>() {
+
+ @Override
+ public Void run() {
+ sec.getUserProperties().put(REMOTE_USER, AccessController.getContext());
+ return null;
+ }
+
+ });
+ } catch (Exception e) {
+ rejectResponse(response, e);
+ }
+ }
+
+ /**
+ * Behaviour when the web socket could not be authenticated. Throws an
+ * {@link IllegalStateException} by default.
+ *
+ * @param e can be null
+ */
+ protected void rejectResponse(HandshakeResponse response, Exception e) {
+ // violent implementation, as suggested in
+ // https://stackoverflow.com/questions/21763829/jsr-356-how-to-abort-a-websocket-connection-during-the-handshake
+// throw new IllegalStateException("Web socket cannot be authenticated");
+ }
+}
--- /dev/null
+package org.argeo.cms.websocket.server;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.websocket.OnClose;
+import javax.websocket.OnOpen;
+import javax.websocket.RemoteEndpoint;
+import javax.websocket.Session;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+
+import org.argeo.api.cms.CmsContext;
+import org.argeo.api.cms.CmsEventSubscriber;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+
+@ServerEndpoint(value = "/event/{topic}", configurator = CmsWebSocketConfigurator.class)
+public class EventEndpoint implements CmsEventSubscriber {
+ private BundleContext bc = FrameworkUtil.getBundle(TestEndpoint.class).getBundleContext();
+
+ private RemoteEndpoint.Basic remote;
+ private CmsContext cmsContext;
+
+// private String topic = "cms";
+
+ @OnOpen
+ public void onOpen(Session session, @PathParam("topic") String topic) {
+ if (bc != null) {
+ cmsContext = bc.getService(bc.getServiceReference(CmsContext.class));
+ cmsContext.addEventSubscriber(topic, this);
+ }
+ remote = session.getBasicRemote();
+
+ }
+
+ @OnClose
+ public void onClose(@PathParam("topic") String topic) {
+ cmsContext.removeEventSubscriber(topic, this);
+ }
+
+ @Override
+ public void onEvent(String topic, Map<String, Object> properties) {
+ try {
+ remote.sendText(topic + ": " + properties);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
--- /dev/null
+package org.argeo.cms.websocket.server;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+
+public class StatusEndpoints implements WebsocketEndpoints, HttpHandler {
+
+ @Override
+ public Set<Class<?>> getEndPoints() {
+ Set<Class<?>> res = new HashSet<>();
+ res.add(EventEndpoint.class);
+ res.add(TestEndpoint.class);
+ return res;
+ }
+
+ @Override
+ public void handle(HttpExchange exchange) throws IOException {
+ // web socket only
+ exchange.sendResponseHeaders(200, -1);
+ }
+
+}
--- /dev/null
+package org.argeo.cms.websocket.server;
+
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import javax.security.auth.Subject;
+import javax.websocket.CloseReason;
+import javax.websocket.EndpointConfig;
+import javax.websocket.OnClose;
+import javax.websocket.OnError;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+import javax.websocket.RemoteEndpoint;
+import javax.websocket.Session;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+
+import org.argeo.api.cms.CmsLog;
+import org.argeo.cms.integration.CmsExceptionsChain;
+import org.argeo.util.naming.NamingUtils;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/** Provides WebSocket access. */
+@ServerEndpoint(value = "/test/{topic}", configurator = CmsWebSocketConfigurator.class)
+public class TestEndpoint implements EventHandler {
+ private final static CmsLog log = CmsLog.getLog(TestEndpoint.class);
+
+ final static String TOPICS_BASE = "/test";
+ final static String INPUT = "input";
+ final static String TOPIC = "topic";
+ final static String VIEW_UID = "viewUid";
+ final static String COMPUTATION_UID = "computationUid";
+ final static String MESSAGES = "messages";
+ final static String ERRORS = "errors";
+
+ final static String EXCEPTION = "exception";
+ final static String MESSAGE = "message";
+
+ private BundleContext bc = FrameworkUtil.getBundle(TestEndpoint.class).getBundleContext();
+
+ private String wsSessionId;
+ private RemoteEndpoint.Basic remote;
+ private ServiceRegistration<EventHandler> eventHandlerSr;
+
+ // json
+ private ObjectMapper objectMapper = new ObjectMapper();
+
+ private WebSocketView view;
+
+ @OnOpen
+ public void onOpen(Session session, EndpointConfig endpointConfig) {
+ Map<String, List<String>> parameters = NamingUtils.queryToMap(session.getRequestURI());
+ String path = NamingUtils.getQueryValue(parameters, "path");
+ log.debug("WS Path: " + path);
+
+ wsSessionId = session.getId();
+
+ // 24h timeout
+ session.setMaxIdleTimeout(1000 * 60 * 60 * 24);
+
+ Map<String, Object> userProperties = session.getUserProperties();
+ Subject subject = null;
+// AccessControlContext accessControlContext = (AccessControlContext) userProperties
+// .get(ServletContextHelper.REMOTE_USER);
+// Subject subject = Subject.getSubject(accessControlContext);
+// // Deal with authentication failure
+// if (subject == null) {
+// try {
+// CloseReason.CloseCode closeCode = new CloseReason.CloseCode() {
+//
+// @Override
+// public int getCode() {
+// return 4001;
+// }
+// };
+// session.close(new CloseReason(closeCode, "Unauthorized"));
+// if (log.isTraceEnabled())
+// log.trace("Unauthorized web socket " + wsSessionId + ". Closing with code " + closeCode.getCode()
+// + ".");
+// return;
+// } catch (IOException e) {
+// // silent
+// }
+// return;// ignore
+// }
+
+ if (log.isDebugEnabled())
+ log.debug("WS#" + wsSessionId + " open for: " + subject);
+ remote = session.getBasicRemote();
+ view = new WebSocketView(subject);
+
+ // OSGi events
+ String[] topics = new String[] { TOPICS_BASE + "/*" };
+ Hashtable<String, Object> ht = new Hashtable<>();
+ ht.put(EventConstants.EVENT_TOPIC, topics);
+ ht.put(EventConstants.EVENT_FILTER, "(" + VIEW_UID + "=" + view.getUid() + ")");
+ eventHandlerSr = bc.registerService(EventHandler.class, this, ht);
+
+ if (log.isDebugEnabled())
+ log.debug("New view " + view.getUid() + " opened, via web socket.");
+ }
+
+ @OnMessage
+ public void onWebSocketText(@PathParam("topic") String topic, Session session, String message)
+ throws JsonMappingException, JsonProcessingException {
+ try {
+ if (log.isTraceEnabled())
+ log.trace("WS#" + view.getUid() + " received:\n" + message + "\n");
+// JsonNode jsonNode = objectMapper.readTree(message);
+// String topic = jsonNode.get(TOPIC).textValue();
+
+ final String computationUid = null;
+// if (MY_TOPIC.equals(topic)) {
+// view.checkRole(SPECIFIC_ROLE);
+// computationUid= process();
+// }
+ remote.sendText("ACK " + topic);
+ } catch (Exception e) {
+ log.error("Error when receiving web socket message", e);
+ sendSystemErrorMessage(e);
+ }
+ }
+
+ @OnClose
+ public void onWebSocketClose(CloseReason reason) {
+ if (eventHandlerSr != null)
+ eventHandlerSr.unregister();
+ if (view != null && log.isDebugEnabled())
+ log.debug("WS#" + view.getUid() + " closed: " + reason);
+ }
+
+ @OnError
+ public void onWebSocketError(Throwable cause) {
+ if (view != null) {
+ log.error("WS#" + view.getUid() + " ERROR", cause);
+ } else {
+ if (log.isTraceEnabled())
+ log.error("Error in web socket session " + wsSessionId, cause);
+ }
+ }
+
+ @Override
+ public void handleEvent(Event event) {
+ try {
+ Object uid = event.getProperty(COMPUTATION_UID);
+ Exception exception = (Exception) event.getProperty(EXCEPTION);
+ if (exception != null) {
+ CmsExceptionsChain systemErrors = new CmsExceptionsChain(exception);
+ String sent = systemErrors.toJsonString(objectMapper);
+ remote.sendText(sent);
+ return;
+ }
+ String topic = event.getTopic();
+ if (log.isTraceEnabled())
+ log.trace("WS#" + view.getUid() + " " + topic + ": notify event " + topic + "#" + uid + ", " + event);
+ } catch (Exception e) {
+ log.error("Error when handling event for WebSocket", e);
+ sendSystemErrorMessage(e);
+ }
+
+ }
+
+ /** Sends an error message in JSON format. */
+ protected void sendSystemErrorMessage(Exception e) {
+ CmsExceptionsChain systemErrors = new CmsExceptionsChain(e);
+ try {
+ if (remote != null)
+ remote.sendText(systemErrors.toJsonString(objectMapper));
+ } catch (Exception e1) {
+ log.error("Cannot send WebSocket system error messages " + systemErrors, e1);
+ }
+ }
+}
--- /dev/null
+package org.argeo.cms.websocket.server;
+
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.WebSocket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/** Tests connectivity to the web socket server. */
+public class WebSocketEventClient {
+
+ public static void main(String[] args) throws Exception {
+ WebSocket.Listener listener = new WebSocket.Listener() {
+
+ public CompletionStage<?> onText(WebSocket webSocket, CharSequence message, boolean last) {
+ System.out.println(message);
+ CompletionStage<String> res = CompletableFuture.completedStage(message.toString());
+ return res;
+ }
+
+ @Override
+ public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
+ // System.out.println("Pong received.");
+ return null;
+ }
+
+ };
+
+ HttpClient client = HttpClient.newHttpClient();
+ CompletableFuture<WebSocket> ws = client.newWebSocketBuilder()
+ .buildAsync(URI.create("ws://localhost:7070/cms/status/event/cms"), listener);
+ WebSocket webSocket = ws.get();
+ webSocket.request(Long.MAX_VALUE);
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "")));
+
+ while (!webSocket.isInputClosed()) {
+ webSocket.sendPing(ByteBuffer.allocate(0));
+ Thread.sleep(10000);
+ }
+ }
+
+}
--- /dev/null
+package org.argeo.cms.websocket.server;
+
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.WebSocket;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+
+/** Tests connectivity to the web socket server. */
+public class WebSocketTest {
+
+ public static void main(String[] args) throws Exception {
+ CompletableFuture<Boolean> received = new CompletableFuture<>();
+ WebSocket.Listener listener = new WebSocket.Listener() {
+
+ public CompletionStage<?> onText(WebSocket webSocket, CharSequence message, boolean last) {
+ System.out.println(message);
+ CompletionStage<String> res = CompletableFuture.completedStage(message.toString());
+ received.complete(true);
+ return res;
+ }
+ };
+
+ HttpClient client = HttpClient.newHttpClient();
+ CompletableFuture<WebSocket> ws = client.newWebSocketBuilder()
+ .buildAsync(URI.create("ws://localhost:7070/cms/status/test/my%20topic?path=my%2Frelative%2Fpath"), listener);
+ WebSocket webSocket = ws.get();
+ webSocket.sendText("TEST", true);
+
+ received.get(10, TimeUnit.SECONDS);
+ webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "");
+ }
+
+}
--- /dev/null
+package org.argeo.cms.websocket.server;
+
+import java.security.Principal;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.security.auth.Subject;
+import javax.security.auth.x500.X500Principal;
+
+import org.osgi.service.useradmin.Role;
+
+/**
+ * Abstraction of a single Frontend view, that is a web browser page. There can
+ * be multiple views within one single authenticated HTTP session.
+ */
+public class WebSocketView {
+ private final String uid;
+ private Subject subject;
+
+ public WebSocketView(Subject subject) {
+ this.uid = UUID.randomUUID().toString();
+ this.subject = subject;
+ }
+
+ public String getUid() {
+ return uid;
+ }
+
+ public Set<String> getRoles() {
+ return roles(subject);
+ }
+
+ public boolean isInRole(String role) {
+ return getRoles().contains(role);
+ }
+
+ public void checkRole(String role) {
+ checkRole(subject, role);
+ }
+
+ public final static Set<String> roles(Subject subject) {
+ Set<String> roles = new HashSet<String>();
+ X500Principal principal = subject.getPrincipals(X500Principal.class).iterator().next();
+ String username = principal.getName();
+ roles.add(username);
+ for (Principal group : subject.getPrincipals()) {
+ if (group instanceof Role)
+ roles.add(group.getName());
+ }
+ return roles;
+ }
+
+ public static void checkRole(Subject subject, String role) {
+ Set<String> roles = roles(subject);
+ if (!roles.contains(role))
+ throw new IllegalStateException("User is not in role " + role);
+ }
+
+}
--- /dev/null
+package org.argeo.cms.websocket.server;
+
+import java.util.Set;
+
+/** Configure web socket in Jetty without hard dependency. */
+public interface WebsocketEndpoints {
+ Set<Class<?>> getEndPoints();
+
+}
--- /dev/null
+/** Argeo CMS websocket integration. */
+package org.argeo.cms.websocket.server;
\ No newline at end of file
import org.argeo.api.cms.CmsLog;
import org.argeo.api.cms.CmsState;
import org.argeo.cms.CmsDeployProperty;
-import org.argeo.cms.websocket.javax.server.CmsWebSocketConfigurator;
-import org.argeo.cms.websocket.javax.server.TestEndpoint;
+import org.argeo.cms.websocket.server.CmsWebSocketConfigurator;
+import org.argeo.cms.websocket.server.TestEndpoint;
import org.argeo.util.LangUtils;
import org.eclipse.equinox.http.jetty.JettyConfigurator;
import org.osgi.framework.BundleContext;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Map;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import org.argeo.api.cms.CmsState;
import org.argeo.cms.CmsDeployProperty;
-import org.argeo.cms.websocket.javax.server.CmsWebSocketConfigurator;
-import org.argeo.cms.websocket.javax.server.TestEndpoint;
+import org.argeo.cms.websocket.server.CmsWebSocketConfigurator;
+import org.argeo.cms.websocket.server.TestEndpoint;
import org.eclipse.jetty.server.session.SessionHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer;
private Path tempDir;
// WebSocket
- private ServerContainer wsServerContainer;
+// private ServerContainer wsServerContainer;
private ServerEndpointConfig.Configurator wsEndpointConfigurator;
private CmsState cmsState;
String webSocketEnabled = getDeployProperty(CmsDeployProperty.WEBSOCKET_ENABLED);
// web socket
if (webSocketEnabled != null && webSocketEnabled.equals(Boolean.toString(true))) {
- JavaxWebSocketServletContainerInitializer.configure(servletContextHandler, new Configurator() {
-
- @Override
- public void accept(ServletContext servletContext, ServerContainer serverContainer)
- throws DeploymentException {
- wsServerContainer = serverContainer;
-
- wsEndpointConfigurator = new CmsWebSocketConfigurator();
-
- ServerEndpointConfig config = ServerEndpointConfig.Builder
- .create(TestEndpoint.class, "/ws/test/events/").configurator(wsEndpointConfigurator)
- .build();
- try {
- wsServerContainer.addEndpoint(config);
- } catch (DeploymentException e) {
- throw new IllegalStateException("Cannot initalise the WebSocket server runtime.", e);
- }
- }
- });
+// JavaxWebSocketServletContainerInitializer.configure(servletContextHandler, new Configurator() {
+//
+// @Override
+// public void accept(ServletContext servletContext, ServerContainer serverContainer)
+// throws DeploymentException {
+//// wsServerContainer = serverContainer;
+//
+// CmsWebSocketConfigurator wsEndpointConfigurator = new CmsWebSocketConfigurator();
+//
+// ServerEndpointConfig config = ServerEndpointConfig.Builder
+// .create(TestEndpoint.class, "/ws/test/events/{topic}").configurator(wsEndpointConfigurator)
+// .build();
+// try {
+// serverContainer.addEndpoint(config);
+// } catch (DeploymentException e) {
+// throw new IllegalStateException("Cannot initalise the WebSocket server runtime.", e);
+// }
+// }
+// });
}
}
import java.util.Objects;
import java.util.Set;
+import javax.servlet.ServletContext;
+import javax.websocket.DeploymentException;
+import javax.websocket.server.ServerContainer;
+
import org.argeo.cms.servlet.httpserver.HttpContextServlet;
-import org.eclipse.jetty.server.Handler;
-import org.eclipse.jetty.server.handler.ContextHandler;
+import org.argeo.cms.websocket.server.WebsocketEndpoints;
import org.eclipse.jetty.server.session.SessionHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer;
+import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer.Configurator;
import com.sun.net.httpserver.Authenticator;
import com.sun.net.httpserver.Filter;
class JettyHttpContext extends HttpContext {
private final JettyHttpServer httpServer;
private final String path;
- private final ContextHandler contextHandler;
+ private final ServletContextHandler contextHandler;
private final ContextAttributes attributes;
private final List<Filter> filters = new ArrayList<>();
Objects.requireNonNull(handler);
this.handler = handler;
+ // web socket
+ if (handler instanceof WebsocketEndpoints) {
+ JavaxWebSocketServletContainerInitializer.configure(contextHandler, new Configurator() {
+
+ @Override
+ public void accept(ServletContext servletContext, ServerContainer serverContainer)
+ throws DeploymentException {
+// CmsWebSocketConfigurator wsEndpointConfigurator = new CmsWebSocketConfigurator();
+
+ for (Class<?> clss : ((WebsocketEndpoints) handler).getEndPoints()) {
+// Class<?> clss = websocketEndpoints.get(path);
+// ServerEndpointConfig config = ServerEndpointConfig.Builder.create(clss, path)
+// .configurator(wsEndpointConfigurator).build();
+ serverContainer.addEndpoint(clss);
+ }
+ }
+ });
+ }
+
if (httpServer.isStarted())
try {
contextHandler.start();
return authenticator;
}
- public Handler getContextHandler() {
+ ServletContextHandler getContextHandler() {
return contextHandler;
}
package org.argeo.cms.internal.runtime;
-import static java.util.Locale.ENGLISH;
-
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.SubmissionPublisher;
import javax.security.auth.Subject;
-import org.argeo.api.cms.CmsConstants;
import org.argeo.api.cms.CmsContext;
import org.argeo.api.cms.CmsDeployment;
+import org.argeo.api.cms.CmsEventSubscriber;
import org.argeo.api.cms.CmsLog;
import org.argeo.api.cms.CmsSession;
import org.argeo.api.cms.CmsSessionId;
import org.argeo.api.cms.CmsState;
import org.argeo.api.uuid.UuidFactory;
import org.argeo.cms.CmsDeployProperty;
-import org.argeo.cms.LocaleUtils;
import org.argeo.cms.internal.auth.CmsSessionImpl;
import org.ietf.jgss.GSSCredential;
import org.osgi.service.useradmin.UserAdmin;
public class CmsContextImpl implements CmsContext {
+
private final CmsLog log = CmsLog.getLog(getClass());
// private final BundleContext bc = FrameworkUtil.getBundle(getClass()).getBundleContext();
private Map<UUID, CmsSessionImpl> cmsSessionsByUuid = new HashMap<>();
private Map<String, CmsSessionImpl> cmsSessionsByLocalId = new HashMap<>();
+ // CMS events
+ private Map<String, SubmissionPublisher<Map<String, Object>>> topics = new TreeMap<>();
+// private IdentityHashMap<CmsEventSubscriber, List<CmsEventFlowSubscriber>> subscriptions = new IdentityHashMap<>();
+
// public CmsContextImpl() {
// initTrackers();
// }
return cmsSessionsByLocalId.get(localId);
}
+ /*
+ * CMS Events
+ */
+ public void sendEvent(String topic, Map<String, Object> event) {
+ SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
+ if (publisher == null)
+ return; // no one is interested
+ publisher.submit(event);
+ }
+
+ public void addEventSubscriber(String topic, CmsEventSubscriber subscriber) {
+ synchronized (topics) {
+ if (!topics.containsKey(topic))
+ topics.put(topic, new SubmissionPublisher<>());
+ }
+ SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
+ CmsEventFlowSubscriber flowSubscriber = new CmsEventFlowSubscriber(topic, subscriber);
+ publisher.subscribe(flowSubscriber);
+ }
+
+ public void removeEventSubscriber(String topic, CmsEventSubscriber subscriber) {
+ SubmissionPublisher<Map<String, Object>> publisher = topics.get(topic);
+ if (publisher == null) {
+ log.error("There should be an event topic " + topic);
+ return;
+ }
+ for (Flow.Subscriber<? super Map<String, Object>> flowSubscriber : publisher.getSubscribers()) {
+ if (flowSubscriber instanceof CmsEventFlowSubscriber)
+ ((CmsEventFlowSubscriber) flowSubscriber).unsubscribe();
+ }
+ synchronized (topics) {
+ if (!publisher.hasSubscribers()) {
+ publisher.close();
+ topics.remove(topic);
+ }
+ }
+ }
+
+ static class CmsEventFlowSubscriber implements Flow.Subscriber<Map<String, Object>> {
+ private String topic;
+ private CmsEventSubscriber eventSubscriber;
+
+ private Subscription subscription;
+
+ public CmsEventFlowSubscriber(String topic, CmsEventSubscriber eventSubscriber) {
+ this.topic = topic;
+ this.eventSubscriber = eventSubscriber;
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ this.subscription = subscription;
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(Map<String, Object> item) {
+ eventSubscriber.onEvent(topic, item);
+
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void onComplete() {
+ // TODO Auto-generated method stub
+
+ }
+
+ void unsubscribe() {
+ if (subscription != null)
+ subscription.cancel();
+ else
+ throw new IllegalStateException("No subscription to cancel");
+ }
+
+ }
+
}
import static org.argeo.cms.CmsMsg.username;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
loginContext = new LoginContext(CmsAuth.LOGIN_CONTEXT_USER, subject, this);
loginContext.login();
cmsView.authChange(loginContext);
+ cmsContext.sendEvent("cms", Collections.singletonMap("msg", "New login"));
return true;
} catch (LoginException e) {
if (log.isTraceEnabled())
// }
}
-
protected void logout() {
cmsView.logout();
cmsView.navigateTo("~");