--- /dev/null
+package org.argeo.api.cms.http;
+
+import java.net.http.WebSocket;
+
+public interface WebSocketHttpServer {
+
+ WebSocket.Builder newWebSocketBuilder() ;
+}
package org.argeo.cms.jetty;
+import java.net.http.WebSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
+import org.eclipse.jetty.websocket.server.WebSocketUpgradeHandler;
import com.sun.net.httpserver.HttpContext;
import com.sun.net.httpserver.HttpHandler;
private boolean started;
private CmsState cmsState;
+
+ private WebSocketUpgradeHandler webSocketUpgradeHandler;
@Override
public void bind(InetSocketAddress addr, int backlog) throws IOException {
if (rootHandler != null)
configureRootHandler(rootHandler);
+ webSocketUpgradeHandler = WebSocketUpgradeHandler.from(server);
+ pathMappingsHandler.addMapping(PathSpec.from("/ws/*"), webSocketUpgradeHandler);
+
// if (rootContextHandler != null && !contexts.containsKey("/"))
// contextHandlerCollection.addHandler(rootContextHandler);
// server.setHandler(contextHandlerCollection);
throw new UnsupportedOperationException();
}
+ public Server getServer() {
+ return server;
+ }
+
+ public WebSocketUpgradeHandler getWebSocketUpgradeHandler() {
+ return webSocketUpgradeHandler;
+ }
+
public static void main(String... args) {
JettyHttpServer httpServer = new JettyHttpServer();
System.setProperty("argeo.http.port", "8080");
Thread.currentThread().getContextClassLoader());
servletContextHandler.setClassLoader(this.getClass().getClassLoader());
servletContextHandler.setContextPath("/");
- //servletContextHandler.setContextPath("/cms/user");
+ // servletContextHandler.setContextPath("/cms/user");
servletContextHandler.setAttribute(CONTEXT_TEMPDIR, tempDir.toAbsolutePath().toFile());
SessionHandler handler = new SessionHandler();
public HttpContextJettyContextHandler(HttpContext httpContext) {
// FIXME make path more robust
- super(new HttpContextJettyHandler(httpContext), httpContext.getPath() + "/*");
+ super(new HttpContextJettyHandler(httpContext), null);
}
}
package org.argeo.cms.jetty.server;
+import java.net.http.WebSocket;
import java.util.HashMap;
import java.util.Map;
import org.argeo.cms.jetty.AbstractJettyHttpContext;
import org.argeo.cms.jetty.ContextHandlerAttributes;
import org.argeo.cms.jetty.JettyHttpServer;
+import org.argeo.cms.jetty.websocket.JettyLocalWebSocket;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.websocket.server.ServerUpgradeRequest;
+import org.eclipse.jetty.websocket.server.ServerUpgradeResponse;
+import org.eclipse.jetty.websocket.server.WebSocketCreator;
+import org.eclipse.jetty.websocket.server.WebSocketUpgradeHandler;
import com.sun.net.httpserver.HttpContext;
* the jakarta/javax servlet APIs).
*/
public class JettyHttpContext extends AbstractJettyHttpContext {
- private final Handler handler;
+ private final Handler httpHandler;
private Map<String, Object> attributes;
public JettyHttpContext(JettyHttpServer httpServer, String path) {
if (useContextHandler) {
// TODO not working yet
// (sub contexts do not work)
- handler = new HttpContextJettyContextHandler(this);
- attributes = new ContextHandlerAttributes((ContextHandler) handler);
+ httpHandler = new HttpContextJettyContextHandler(this);
+ attributes = new ContextHandlerAttributes((ContextHandler) httpHandler);
} else {
- handler = new HttpContextJettyHandler(this);
+ httpHandler = new HttpContextJettyHandler(this);
attributes = new HashMap<>();
}
}
@Override
protected Handler getJettyHandler() {
- return handler;
+ WebSocketUpgradeHandler webSocketUpgradeHandler = WebSocketUpgradeHandler.from(getJettyHttpServer().getServer(),
+ (container) -> {
+ container.addMapping(getPath(), new WebSocketCreator() {
+
+ @Override
+ public Object createWebSocket(ServerUpgradeRequest upgradeRequest,
+ ServerUpgradeResponse upgradeResponse, Callback callback) throws Exception {
+ if (getHandler() instanceof WebSocket.Listener webSocketListener) {
+ return new JettyLocalWebSocket(webSocketListener);
+ } else {
+ callback.succeeded();
+ return null;
+ }
+ }
+ });
+ });
+ webSocketUpgradeHandler.setHandler(httpHandler);
+ return webSocketUpgradeHandler;
}
@Override
--- /dev/null
+package org.argeo.cms.jetty.websocket;
+
+import java.net.http.WebSocket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+import org.eclipse.jetty.websocket.api.Callback;
+import org.eclipse.jetty.websocket.api.Session;
+
+/**
+ * A {@link java.net.http.WebSocket} wrapping a Jetty WebSocket API
+ * {@link Session}. This is the "client" interface of a server-side socket,
+ * which allows to interact with the remote endpoint.
+ */
+class JettyJavaWebSocket implements WebSocket {
+ private Session session;
+
+ JettyJavaWebSocket(Session session) {
+ this.session = session;
+ }
+
+ @Override
+ public CompletableFuture<WebSocket> sendText(CharSequence data, boolean last) {
+ return Callback.Completable.with(completable -> session.sendText(data.toString(), completable))
+ .thenApply((v) -> JettyJavaWebSocket.this);
+ }
+
+ @Override
+ public CompletableFuture<WebSocket> sendBinary(ByteBuffer data, boolean last) {
+ return Callback.Completable.with(completable -> session.sendBinary(data, completable))
+ .thenApply((v) -> JettyJavaWebSocket.this);
+ }
+
+ @Override
+ public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
+ return Callback.Completable.with(completable -> session.sendPing(message, completable))
+ .thenApply((v) -> JettyJavaWebSocket.this);
+ }
+
+ @Override
+ public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
+ return Callback.Completable.with(completable -> session.sendPong(message, completable))
+ .thenApply((v) -> JettyJavaWebSocket.this);
+ }
+
+ @Override
+ public CompletableFuture<WebSocket> sendClose(int statusCode, String reason) {
+ return Callback.Completable.with(completable -> session.close(statusCode, reason, completable))
+ .thenApply((v) -> JettyJavaWebSocket.this);
+ }
+
+ @Override
+ public void request(long n) {
+ for (long i = 0; i < n; i++) {
+ // TODO throttle it somehow?
+ session.demand();
+ }
+ }
+
+ @Override
+ public String getSubprotocol() {
+ // TODO test this
+ return session.getUpgradeResponse().getAcceptedSubProtocol();
+ }
+
+ @Override
+ public boolean isOutputClosed() {
+ // TODO make sure the semantics are similar
+ return !session.isOpen();
+ }
+
+ @Override
+ public boolean isInputClosed() {
+ // TODO make sure the semantics are similar
+ return !session.isOpen();
+ }
+
+ @Override
+ public void abort() {
+ session.disconnect();
+ }
+
+}
--- /dev/null
+package org.argeo.cms.jetty.websocket;
+
+import java.net.http.WebSocket;
+import java.net.http.WebSocket.Listener;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.concurrent.CompletionStage;
+
+import org.eclipse.jetty.websocket.api.Callback;
+import org.eclipse.jetty.websocket.api.Frame;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen;
+
+/**
+ * Wrap a {@link java.net.http.WebSocket.Listener} with Jetty WebSocket API
+ * annotations. This is the actual "server"/local side of a WebSocket.
+ */
+@org.eclipse.jetty.websocket.api.annotations.WebSocket(autoDemand = false)
+public class JettyLocalWebSocket {
+ private WebSocket.Listener listener;
+
+ public JettyLocalWebSocket(Listener listener) {
+ this.listener = listener;
+ }
+
+ @OnWebSocketOpen
+ public void onOpen(Session session) {
+ //session.demand();
+ listener.onOpen(wrap(session));
+ }
+
+ @OnWebSocketMessage
+ public void onText(Session session, String text, boolean last) {
+ waitFor(listener.onText(wrap(session), text, last));
+ }
+
+ @OnWebSocketMessage
+ public void onBinary(Session session, ByteBuffer data, boolean last, Callback callback) {
+ notifyCallback(listener.onBinary(wrap(session), data, last), callback);
+ }
+
+ @OnWebSocketFrame
+ public void onFrame(Session session, Frame frame, Callback callback) {
+ if (Frame.Type.PING.equals(frame.getType())) {
+ notifyCallback(listener.onPing(wrap(session), frame.getPayload()), callback);
+ } else if (Frame.Type.PONG.equals(frame.getType())) {
+ notifyCallback(listener.onPong(wrap(session), frame.getPayload()), callback);
+ }
+ }
+
+ @OnWebSocketClose
+ public void onClose(Session session, int statusCode, String reason) {
+ waitFor(listener.onClose(wrap(session), statusCode, reason));
+ }
+
+ @OnWebSocketError
+ public void onError(Session session, Throwable error) {
+ listener.onError(wrap(session), error);
+ }
+
+ /*
+ * UTILITIES
+ */
+ protected WebSocket wrap(Session session) {
+ return new JettyJavaWebSocket(session);
+ }
+
+ protected void waitFor(CompletionStage<?> stage) {
+ if (stage == null)
+ return;
+ stage.toCompletableFuture().join();
+ }
+
+ protected void notifyCallback(CompletionStage<?> stage, Callback callback) {
+ Objects.requireNonNull(callback);
+ if (stage == null) {
+ callback.succeed();
+ return;
+ }
+ stage.exceptionally((t) -> {// failure
+ callback.fail(t);
+ return null;
+ }).thenRun(() -> {// success
+ callback.succeed();
+ });
+ }
+}
--- /dev/null
+package org.argeo.cms.jetty.websocket;
+
+import static org.argeo.api.cms.CmsConstants.CONTEXT_PATH;
+
+import java.net.http.WebSocket;
+import java.util.Map;
+
+import org.argeo.api.cms.CmsConstants;
+import org.argeo.api.cms.CmsLog;
+import org.argeo.cms.jetty.JettyHttpServer;
+import org.eclipse.jetty.websocket.server.ServerWebSocketContainer;
+import org.eclipse.jetty.websocket.server.WebSocketUpgradeHandler;
+
+/** Adds WebSocket mapping to an existing Jetty server. */
+public class JettyServerWebSocketFactory {
+ private final static CmsLog log = CmsLog.getLog(JettyServerWebSocketFactory.class);
+
+ private ServerWebSocketContainer container;
+
+ public void setJettyHttpServer(JettyHttpServer jettyHttpServer) {
+ //ServletContextHandler contextHandler = (ServletContextHandler) jettyHttpServer.getRootHandler();
+ //container = ServerWebSocketContainer.ensure(jettyHttpServer.getServer(), contextHandler);
+ WebSocketUpgradeHandler webSocketUpgradeHandler = jettyHttpServer.getWebSocketUpgradeHandler();
+ container = webSocketUpgradeHandler.getServerWebSocketContainer();
+// container = ServerWebSocketContainer.ensure(jettyHttpServer.getServer());
+ log.debug("WebSocket support initalized");
+ }
+
+ public void addWebSocket(WebSocket.Listener webSocket, Map<String, String> properties) {
+ String path = properties.get(CmsConstants.CONTEXT_PATH);
+ if (path == null) {
+ log.warn("Property " + CONTEXT_PATH + " not set on HTTP handler " + properties + ". Ignoring it.");
+ return;
+ }
+
+ container.addMapping(path, (upgradeRequest, upgradeResponse, callback) -> {
+ log.debug("Adding " + path + " WebSocket " + webSocket.getClass());
+ return new JettyLocalWebSocket(webSocket);
+ });
+ }
+
+ public void removeWebSocket(WebSocket.Listener webSocket, Map<String, String> properties) {
+ String path = properties.get(CmsConstants.CONTEXT_PATH);
+ if (path == null) {
+ log.warn("Property " + CONTEXT_PATH + " not set on HTTP handler " + properties + ". Ignoring it.");
+ return;
+ }
+
+ container.addMapping(path, (upgradeRequest, upgradeResponse, callback) -> {
+ // disable web socket for this path
+ log.debug("Removing " + path + " WebSocket " + webSocket.getClass());
+ // TODO check that it works and that mappings can be removed dynamically
+ callback.succeeded();
+ return null;
+ });
+ }
+
+ public static void main(String[] args) {
+
+ }
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0" name="org.argeo.cms.pingWebSocket">
+ <implementation class="org.argeo.cms.internal.http.PingWebSocket"/>
+ <service>
+ <provide interface="java.net.http.WebSocket$Listener"/>
+ <provide interface="com.sun.net.httpserver.HttpHandler"/>
+ </service>
+ <property name="context.path" type="String" value="/status/ping/" />
+ <property name="context.public" type="String" value="true" />
+</scr:component>
OSGI-INF/cmsFileSystemProvider.xml,\
OSGI-INF/cmsAcrHttpHandler.xml,\
OSGI-INF/pkgHttpHandler.xml,\
+OSGI-INF/pingWebSocket.xml,\
}
}
-// public static void main(String[] args) throws Exception {
-// if (args.length == 0) {
-// System.err.println("usage: java " + WsPing.class.getName() + " <url>");
-// System.exit(1);
-// return;
-// }
-// URI uri = URI.create(args[0]);
-// new WsPing(uri).run();
-// }
+ public static void main(String[] args) throws Exception {
+ if (args.length == 0) {
+ System.err.println("usage: java " + WebSocketPing.class.getName() + " <url>");
+ System.exit(1);
+ return;
+ }
+ URI uri = URI.create(args[0]);
+ new WebSocketPing(uri).run();
+ }
}
--- /dev/null
+package org.argeo.cms.internal.http;
+
+import java.io.IOException;
+import java.net.http.WebSocket;
+import java.net.http.WebSocket.Listener;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletionStage;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+
+/** A trivial ping WebSocket. */
+public class PingWebSocket implements Listener, HttpHandler {
+
+ @Override
+ public CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
+ return null;
+ }
+
+ @Override
+ public void handle(HttpExchange exchange) throws IOException {
+ exchange.sendResponseHeaders(200, -1);
+ exchange.getResponseBody().write("pong".getBytes());
+ exchange.getResponseBody().close();
+ }
+
+}
<service>
<provide interface="com.sun.net.httpserver.HttpServer"/>
<provide interface="com.sun.net.httpserver.HttpsServer"/>
+ <provide interface="org.argeo.cms.jetty.JettyHttpServer"/>
</service>
</scr:component>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0" immediate="true" name="org.argeo.cms.lib.equinox">
+ <implementation class="org.argeo.cms.jetty.websocket.JettyServerWebSocketFactory"/>
+ <reference bind="setJettyHttpServer" cardinality="1..1" interface="org.argeo.cms.jetty.JettyHttpServer" name="JettyHttpServer" policy="static"/>
+ <reference bind="addWebSocket" cardinality="0..n" interface="java.net.http.WebSocket$Listener" unbind="removeWebSocket" policy="dynamic"/>
+</scr:component>
Service-Component: \
OSGI-INF/equinoxJettyServer.xml,\
+OSGI-INF/jettyServerWebSocketFactory.xml,\
Import-Package:\
org.eclipse.jetty.session,\
org.eclipse.jetty.server,\
+org.argeo.cms.jetty.websocket,\
*
+bin.includes = META-INF/,\
+ .,\
+ OSGI-INF/jettyServerWebSocketFactory.xml
source.. = src/
output.. = bin/
-bin.includes = META-INF/,\
- .