1 package org
.argeo
.cms
.websocket
.server
;
3 import java
.util
.Hashtable
;
7 import javax
.security
.auth
.Subject
;
8 import javax
.websocket
.CloseReason
;
9 import javax
.websocket
.EndpointConfig
;
10 import javax
.websocket
.OnClose
;
11 import javax
.websocket
.OnError
;
12 import javax
.websocket
.OnMessage
;
13 import javax
.websocket
.OnOpen
;
14 import javax
.websocket
.RemoteEndpoint
;
15 import javax
.websocket
.Session
;
16 import javax
.websocket
.server
.PathParam
;
17 import javax
.websocket
.server
.ServerEndpoint
;
19 import org
.argeo
.api
.acr
.ldap
.NamingUtils
;
20 import org
.argeo
.api
.cms
.CmsLog
;
21 import org
.argeo
.cms
.integration
.CmsExceptionsChain
;
22 import org
.osgi
.framework
.BundleContext
;
23 import org
.osgi
.framework
.FrameworkUtil
;
24 import org
.osgi
.framework
.ServiceRegistration
;
25 import org
.osgi
.service
.event
.Event
;
26 import org
.osgi
.service
.event
.EventConstants
;
27 import org
.osgi
.service
.event
.EventHandler
;
29 import com
.fasterxml
.jackson
.core
.JsonProcessingException
;
30 import com
.fasterxml
.jackson
.databind
.JsonMappingException
;
31 import com
.fasterxml
.jackson
.databind
.ObjectMapper
;
33 /** Provides WebSocket access. */
34 @ServerEndpoint(value
= "/cms/status/test/{topic}", configurator
= CmsWebSocketConfigurator
.class)
35 public class TestEndpoint
implements EventHandler
{
36 private final static CmsLog log
= CmsLog
.getLog(TestEndpoint
.class);
38 final static String TOPICS_BASE
= "/test";
39 final static String INPUT
= "input";
40 final static String TOPIC
= "topic";
41 final static String VIEW_UID
= "viewUid";
42 final static String COMPUTATION_UID
= "computationUid";
43 final static String MESSAGES
= "messages";
44 final static String ERRORS
= "errors";
46 final static String EXCEPTION
= "exception";
47 final static String MESSAGE
= "message";
49 private BundleContext bc
= FrameworkUtil
.getBundle(TestEndpoint
.class).getBundleContext();
51 private String wsSessionId
;
52 private RemoteEndpoint
.Basic remote
;
53 private ServiceRegistration
<EventHandler
> eventHandlerSr
;
56 private ObjectMapper objectMapper
= new ObjectMapper();
58 private WebSocketView view
;
61 public void onOpen(Session session
, EndpointConfig endpointConfig
) {
62 Map
<String
, List
<String
>> parameters
= NamingUtils
.queryToMap(session
.getRequestURI());
63 String path
= NamingUtils
.getQueryValue(parameters
, "path");
64 log
.debug("WS Path: " + path
);
66 wsSessionId
= session
.getId();
69 session
.setMaxIdleTimeout(1000 * 60 * 60 * 24);
71 Map
<String
, Object
> userProperties
= session
.getUserProperties();
72 Subject subject
= null;
73 // AccessControlContext accessControlContext = (AccessControlContext) userProperties
74 // .get(ServletContextHelper.REMOTE_USER);
75 // Subject subject = Subject.getSubject(accessControlContext);
76 // // Deal with authentication failure
77 // if (subject == null) {
79 // CloseReason.CloseCode closeCode = new CloseReason.CloseCode() {
82 // public int getCode() {
86 // session.close(new CloseReason(closeCode, "Unauthorized"));
87 // if (log.isTraceEnabled())
88 // log.trace("Unauthorized web socket " + wsSessionId + ". Closing with code " + closeCode.getCode()
91 // } catch (IOException e) {
97 if (log
.isDebugEnabled())
98 log
.debug("WS#" + wsSessionId
+ " open for: " + subject
);
99 remote
= session
.getBasicRemote();
100 view
= new WebSocketView(subject
);
103 String
[] topics
= new String
[] { TOPICS_BASE
+ "/*" };
104 Hashtable
<String
, Object
> ht
= new Hashtable
<>();
105 ht
.put(EventConstants
.EVENT_TOPIC
, topics
);
106 ht
.put(EventConstants
.EVENT_FILTER
, "(" + VIEW_UID
+ "=" + view
.getUid() + ")");
107 eventHandlerSr
= bc
.registerService(EventHandler
.class, this, ht
);
109 if (log
.isDebugEnabled())
110 log
.debug("New view " + view
.getUid() + " opened, via web socket.");
114 public void onWebSocketText(@PathParam("topic") String topic
, Session session
, String message
)
115 throws JsonMappingException
, JsonProcessingException
{
117 if (log
.isTraceEnabled())
118 log
.trace("WS#" + view
.getUid() + " received:\n" + message
+ "\n");
119 // JsonNode jsonNode = objectMapper.readTree(message);
120 // String topic = jsonNode.get(TOPIC).textValue();
122 final String computationUid
= null;
123 // if (MY_TOPIC.equals(topic)) {
124 // view.checkRole(SPECIFIC_ROLE);
125 // computationUid= process();
127 remote
.sendText("ACK " + topic
);
128 } catch (Exception e
) {
129 log
.error("Error when receiving web socket message", e
);
130 sendSystemErrorMessage(e
);
135 public void onWebSocketClose(CloseReason reason
) {
136 if (eventHandlerSr
!= null)
137 eventHandlerSr
.unregister();
138 if (view
!= null && log
.isDebugEnabled())
139 log
.debug("WS#" + view
.getUid() + " closed: " + reason
);
143 public void onWebSocketError(Throwable cause
) {
145 log
.error("WS#" + view
.getUid() + " ERROR", cause
);
147 if (log
.isTraceEnabled())
148 log
.error("Error in web socket session " + wsSessionId
, cause
);
153 public void handleEvent(Event event
) {
155 Object uid
= event
.getProperty(COMPUTATION_UID
);
156 Exception exception
= (Exception
) event
.getProperty(EXCEPTION
);
157 if (exception
!= null) {
158 CmsExceptionsChain systemErrors
= new CmsExceptionsChain(exception
);
159 String sent
= systemErrors
.toJsonString(objectMapper
);
160 remote
.sendText(sent
);
163 String topic
= event
.getTopic();
164 if (log
.isTraceEnabled())
165 log
.trace("WS#" + view
.getUid() + " " + topic
+ ": notify event " + topic
+ "#" + uid
+ ", " + event
);
166 } catch (Exception e
) {
167 log
.error("Error when handling event for WebSocket", e
);
168 sendSystemErrorMessage(e
);
173 /** Sends an error message in JSON format. */
174 protected void sendSystemErrorMessage(Exception e
) {
175 CmsExceptionsChain systemErrors
= new CmsExceptionsChain(e
);
178 remote
.sendText(systemErrors
.toJsonString(objectMapper
));
179 } catch (Exception e1
) {
180 log
.error("Cannot send WebSocket system error messages " + systemErrors
, e1
);