1 package org
.argeo
.cms
.integration
;
3 import java
.util
.Hashtable
;
7 import javax
.security
.auth
.Subject
;
8 import javax
.websocket
.CloseReason
;
9 import javax
.websocket
.EndpointConfig
;
10 import javax
.websocket
.OnClose
;
11 import javax
.websocket
.OnError
;
12 import javax
.websocket
.OnMessage
;
13 import javax
.websocket
.OnOpen
;
14 import javax
.websocket
.RemoteEndpoint
;
15 import javax
.websocket
.Session
;
16 import javax
.websocket
.server
.PathParam
;
17 import javax
.websocket
.server
.ServerEndpoint
;
19 import org
.argeo
.api
.acr
.ldap
.NamingUtils
;
20 import org
.argeo
.api
.cms
.CmsLog
;
21 import org
.argeo
.cms
.websocket
.server
.CmsWebSocketConfigurator
;
22 import org
.argeo
.cms
.websocket
.server
.WebSocketView
;
23 import org
.osgi
.framework
.BundleContext
;
24 import org
.osgi
.framework
.FrameworkUtil
;
25 import org
.osgi
.framework
.ServiceRegistration
;
26 import org
.osgi
.service
.event
.Event
;
27 import org
.osgi
.service
.event
.EventConstants
;
28 import org
.osgi
.service
.event
.EventHandler
;
30 import com
.fasterxml
.jackson
.core
.JsonProcessingException
;
31 import com
.fasterxml
.jackson
.databind
.JsonMappingException
;
32 import com
.fasterxml
.jackson
.databind
.ObjectMapper
;
34 /** Provides WebSocket access. */
35 @ServerEndpoint(value
= "/cms/status/test/{topic}", configurator
= CmsWebSocketConfigurator
.class)
36 public class TestEndpoint
implements EventHandler
{
37 private final static CmsLog log
= CmsLog
.getLog(TestEndpoint
.class);
39 final static String TOPICS_BASE
= "/test";
40 final static String INPUT
= "input";
41 final static String TOPIC
= "topic";
42 final static String VIEW_UID
= "viewUid";
43 final static String COMPUTATION_UID
= "computationUid";
44 final static String MESSAGES
= "messages";
45 final static String ERRORS
= "errors";
47 final static String EXCEPTION
= "exception";
48 final static String MESSAGE
= "message";
50 private BundleContext bc
= FrameworkUtil
.getBundle(TestEndpoint
.class).getBundleContext();
52 private String wsSessionId
;
53 private RemoteEndpoint
.Basic remote
;
54 private ServiceRegistration
<EventHandler
> eventHandlerSr
;
57 private ObjectMapper objectMapper
= new ObjectMapper();
59 private WebSocketView view
;
62 public void onOpen(Session session
, EndpointConfig endpointConfig
) {
63 Map
<String
, List
<String
>> parameters
= NamingUtils
.queryToMap(session
.getRequestURI());
64 String path
= NamingUtils
.getQueryValue(parameters
, "path");
65 log
.debug("WS Path: " + path
);
67 wsSessionId
= session
.getId();
70 session
.setMaxIdleTimeout(1000 * 60 * 60 * 24);
72 Map
<String
, Object
> userProperties
= session
.getUserProperties();
73 Subject subject
= null;
74 // AccessControlContext accessControlContext = (AccessControlContext) userProperties
75 // .get(ServletContextHelper.REMOTE_USER);
76 // Subject subject = Subject.getSubject(accessControlContext);
77 // // Deal with authentication failure
78 // if (subject == null) {
80 // CloseReason.CloseCode closeCode = new CloseReason.CloseCode() {
83 // public int getCode() {
87 // session.close(new CloseReason(closeCode, "Unauthorized"));
88 // if (log.isTraceEnabled())
89 // log.trace("Unauthorized web socket " + wsSessionId + ". Closing with code " + closeCode.getCode()
92 // } catch (IOException e) {
98 if (log
.isDebugEnabled())
99 log
.debug("WS#" + wsSessionId
+ " open for: " + subject
);
100 remote
= session
.getBasicRemote();
101 view
= new WebSocketView(subject
);
104 String
[] topics
= new String
[] { TOPICS_BASE
+ "/*" };
105 Hashtable
<String
, Object
> ht
= new Hashtable
<>();
106 ht
.put(EventConstants
.EVENT_TOPIC
, topics
);
107 ht
.put(EventConstants
.EVENT_FILTER
, "(" + VIEW_UID
+ "=" + view
.getUid() + ")");
108 eventHandlerSr
= bc
.registerService(EventHandler
.class, this, ht
);
110 if (log
.isDebugEnabled())
111 log
.debug("New view " + view
.getUid() + " opened, via web socket.");
115 public void onWebSocketText(@PathParam("topic") String topic
, Session session
, String message
)
116 throws JsonMappingException
, JsonProcessingException
{
118 if (log
.isTraceEnabled())
119 log
.trace("WS#" + view
.getUid() + " received:\n" + message
+ "\n");
120 // JsonNode jsonNode = objectMapper.readTree(message);
121 // String topic = jsonNode.get(TOPIC).textValue();
123 final String computationUid
= null;
124 // if (MY_TOPIC.equals(topic)) {
125 // view.checkRole(SPECIFIC_ROLE);
126 // computationUid= process();
128 remote
.sendText("ACK " + topic
);
129 } catch (Exception e
) {
130 log
.error("Error when receiving web socket message", e
);
131 sendSystemErrorMessage(e
);
136 public void onWebSocketClose(CloseReason reason
) {
137 if (eventHandlerSr
!= null)
138 eventHandlerSr
.unregister();
139 if (view
!= null && log
.isDebugEnabled())
140 log
.debug("WS#" + view
.getUid() + " closed: " + reason
);
144 public void onWebSocketError(Throwable cause
) {
146 log
.error("WS#" + view
.getUid() + " ERROR", cause
);
148 if (log
.isTraceEnabled())
149 log
.error("Error in web socket session " + wsSessionId
, cause
);
154 public void handleEvent(Event event
) {
156 Object uid
= event
.getProperty(COMPUTATION_UID
);
157 Exception exception
= (Exception
) event
.getProperty(EXCEPTION
);
158 if (exception
!= null) {
159 CmsExceptionsChain systemErrors
= new CmsExceptionsChain(exception
);
160 String sent
= systemErrors
.toJsonString(objectMapper
);
161 remote
.sendText(sent
);
164 String topic
= event
.getTopic();
165 if (log
.isTraceEnabled())
166 log
.trace("WS#" + view
.getUid() + " " + topic
+ ": notify event " + topic
+ "#" + uid
+ ", " + event
);
167 } catch (Exception e
) {
168 log
.error("Error when handling event for WebSocket", e
);
169 sendSystemErrorMessage(e
);
174 /** Sends an error message in JSON format. */
175 protected void sendSystemErrorMessage(Exception e
) {
176 CmsExceptionsChain systemErrors
= new CmsExceptionsChain(e
);
179 remote
.sendText(systemErrors
.toJsonString(objectMapper
));
180 } catch (Exception e1
) {
181 log
.error("Cannot send WebSocket system error messages " + systemErrors
, e1
);