]> git.argeo.org Git - gpl/argeo-slc.git/commitdiff
First run of using AmqClient to listen to agent queues. Working good, make a try...
authorCharles du Jeu <charles.dujeu@gmail.com>
Wed, 14 Jan 2009 12:26:51 +0000 (12:26 +0000)
committerCharles du Jeu <charles.dujeu@gmail.com>
Wed, 14 Jan 2009 12:26:51 +0000 (12:26 +0000)
git-svn-id: https://svn.argeo.org/slc/trunk@2098 4cfe0d0a-d680-48aa-b62c-e0a02a3f76cc

org.argeo.slc.webapp/src/main/webapp/argeo-ria-lib/slc/class/org/argeo/slc/ria/LauncherApplet.js
org.argeo.slc.webapp/src/main/webapp/argeo-ria-lib/slc/class/org/argeo/slc/ria/SlcExecutionMessage.js
org.argeo.slc.webapp/src/main/webapp/argeo-ria-src/class/org/argeo/ria/remote/AmqClient.js

index 572544b508c8060fdd3fad629e19b2b58da63270..a8886acc89308bfc7485961aafecaa7bc44b3cf7 100644 (file)
@@ -64,6 +64,9 @@ qx.Class.define("org.argeo.slc.ria.LauncherApplet",
                this.setView(viewPane);
                this._createLayout();
                this._createForm();
+               this._amqClient = new org.argeo.ria.remote.AmqClient();
+               this._amqClient.startPolling(); 
+               qx.io.remote.RequestQueue.getInstance().setDefaultTimeout(30000);
        },
        
        /**
@@ -204,9 +207,23 @@ qx.Class.define("org.argeo.slc.ria.LauncherApplet",
                        var host = org.argeo.ria.util.Element.getSingleNodeText(nodes[i], "slc:host", NSMap);
                        var listItem = new qx.ui.form.ListItem(uuid+' ('+host+')', null, uuid);
                        selector.add(listItem);
+                       this._addAmqListenerDeferred(uuid, i);
                }
        },
        
+       _addAmqListenerDeferred: function(uuid, index){
+               qx.event.Timer.once(function(){                 
+                       this._amqClient.addListener("launcherId", "topic://agent."+uuid+".newExecution", function(message){
+                               this.info("Received message!");
+                               var slcExec = new org.argeo.slc.ria.SlcExecutionMessage(message.getAttribute("uuid"));
+                               slcExec.fromXml(message);
+                               this.logModel.addRows([
+                                       [new Date().toString(), slcExec.getUuid(), slcExec.getStatus(), slcExec.getType()]
+                               ]);                             
+                       }, this);
+               }, this, 500*index);            
+       },
+       
        submitForm : function(){
                var currentUuid = this.agentSelector.getValue();
                if(!currentUuid) return;
@@ -222,13 +239,7 @@ qx.Class.define("org.argeo.slc.ria.LauncherApplet",
                        }
                }
                var destination = "topic://agent."+currentUuid+".newExecution";
-               var req = org.argeo.slc.ria.SlcApi.getSendAmqMessageRequest(destination, slcExec);
-               req.addListener("completed", function(e){                       
-                       this.logModel.addRows([
-                               [new Date().toString(), slcExec.getUuid(), slcExec.getStatus(), slcExec.getType()]
-                       ]);
-               }, this);
-               req.send();
+               this._amqClient.sendMessage(destination, slcExec.toXml());
        }
                
   }
index 1ecb0e906378151b679ce091944fd1071d0c7b3d..c5f3ce199f91adf0c42b092847d5ab0c8eeee3b3 100644 (file)
@@ -64,7 +64,7 @@ qx.Class.define("org.argeo.slc.ria.SlcExecutionMessage", {
                 */\r
                toXml : function (){\r
                        var builder = new qx.util.StringBuilder();\r
-                       builder.add('<slc:slc-execution uuid="'+this.getUuid()+'">');\r
+                       builder.add('<slc:slc-execution  xmlns:slc="http://argeo.org/projects/slc/schemas" uuid="'+this.getUuid()+'">');\r
                        builder.add('<slc:status>'+this.getStatus()+'</slc:status>');\r
                        builder.add('<slc:type>'+this.getType()+'</slc:type>');\r
                        builder.add('<slc:host>'+this.getHost()+'</slc:host>');\r
@@ -79,6 +79,18 @@ qx.Class.define("org.argeo.slc.ria.SlcExecutionMessage", {
                        }\r
                        builder.add('</slc:slc-execution>');\r
                        return builder.get();\r
+               },\r
+               \r
+               fromXml : function(slcExecXml){\r
+                       var NSMap = {slc:"http://argeo.org/projects/slc/schemas"};                      \r
+                       this.setStatus(org.argeo.ria.util.Element.getSingleNodeText(slcExecXml, "slc:status", NSMap));\r
+                       this.setType(org.argeo.ria.util.Element.getSingleNodeText(slcExecXml, "slc:type", NSMap));\r
+                       this.setHost(org.argeo.ria.util.Element.getSingleNodeText(slcExecXml, "slc:host", NSMap));\r
+                       this.setUser(org.argeo.ria.util.Element.getSingleNodeText(slcExecXml, "slc:user", NSMap));\r
+                       var attributes = org.argeo.ria.util.Element.selectNodes(slcExecXml, "slc:attribute", NSMap);\r
+                       for(var i=0;i<attributes.length;i++){\r
+                               this.addAttribute(attribute.getAttribute("name"), attribute.firstChild);\r
+                       }\r
                }\r
        }       \r
 });
\ No newline at end of file
index 4952561d5c18c3b600c054dff93ee4c8aaa4782d..f471d2670673abd0cf5c1f20328906c718deefc1 100644 (file)
@@ -6,7 +6,7 @@ qx.Class.define("org.argeo.ria.remote.AmqClient", {
        },\r
        members : {\r
                // The URI of the MessageListenerServlet\r
-               uri : '/amq',\r
+               uri : '/org.argeo.slc.webapp/amq',\r
 \r
                // Polling. Set to true (default) if waiting poll for messages is needed\r
                poll : true,\r
@@ -24,32 +24,17 @@ qx.Class.define("org.argeo.ria.remote.AmqClient", {
                _messageQueue : '',\r
                _queueMessages : 0,\r
 \r
-               _messageHandler : function(request) {\r
-                       try {\r
-                               if (request.status == 200) {\r
-                                       var response = request.responseXML\r
-                                                       .getElementsByTagName("ajax-response");\r
-                                       if (response != null && response.length == 1) {\r
-                                               for (var i = 0; i < response[0].childNodes.length; i++) {\r
-                                                       var responseElement = response[0].childNodes[i];\r
-\r
-                                                       // only process nodes of type element.....\r
-                                                       if (responseElement.nodeType != 1)\r
-                                                               continue;\r
-\r
-                                                       var id = responseElement.getAttribute('id');\r
-\r
-                                                       var handler = this._handlers[id];\r
-                                                       if (handler != null) {\r
-                                                               for (var j = 0; j < responseElement.childNodes.length; j++) {\r
-                                                                       handler(responseElement.childNodes[j]);\r
-                                                               }\r
-                                                       }\r
-                                               }\r
-                                       }\r
+               _messageHandler : function(response) {\r
+                       var doc = response.getContent();\r
+                       var NSMap = {slc:"http://argeo.org/projects/slc/schemas"};\r
+                       var messages = org.argeo.ria.util.Element.selectNodes(doc, "//response", NSMap);\r
+                       //this.info("Received " + messages.length + " messages");\r
+                       for(var i=0;i<messages.length;i++){\r
+                               var id = messages[i].getAttribute("id");\r
+                               var slcExec = org.argeo.ria.util.Element.selectSingleNode(messages[i], "slc:slc-execution", NSMap);                             \r
+                               if(id && this._handlers[id] && slcExec){\r
+                                       this._handlers[id](slcExec);\r
                                }\r
-                       } catch (e) {\r
-                               alert(e);\r
                        }\r
                },\r
 \r
@@ -58,22 +43,22 @@ qx.Class.define("org.argeo.ria.remote.AmqClient", {
                },\r
 \r
                endBatch : function() {\r
-                       this._queueMessages--;\r
+                       this._queueMessages--;                  \r
                        if (this._queueMessages == 0 && this._messages > 0) {\r
                                var body = this._messageQueue;\r
                                this._messageQueue = '';\r
                                this._messages = 0;\r
                                this._queueMessages++;\r
-                               var request = new qx.io.remote.Request(this.uri, "post", "text/plain");\r
+                               var request = new qx.io.remote.Request(this.uri, "POST", "text/plain");\r
                                request.addListener("completed", this.endBatch, this);\r
                                request.send();\r
                        }\r
                },\r
 \r
-               _pollHandler : function(request) {\r
+               _pollHandler : function(response) {\r
                        this.startBatch();\r
                        try {\r
-                               this._messageHandler(request);\r
+                               this._messageHandler(response);\r
                                this._pollEvent(this._first);\r
                                this._first = false;\r
                        } catch (e) {\r
@@ -88,7 +73,7 @@ qx.Class.define("org.argeo.ria.remote.AmqClient", {
                },\r
 \r
                _sendPoll : function(request) {\r
-                       var request = new qx.io.remote.Request(this.uri, "get", "application/xml");\r
+                       var request = new qx.io.remote.Request(this.uri, "GET", "application/xml");\r
                        request.addListener("completed", this._pollHandler, this);\r
                        request.send();\r
                },\r
@@ -111,8 +96,8 @@ qx.Class.define("org.argeo.ria.remote.AmqClient", {
                },\r
 \r
                // Listen on a channel or topic.   handler must be a function taking a message arguement\r
-               addListener : function(id, destination, handler) {\r
-                       this._handlers[id] = handler;\r
+               addListener : function(id, destination, handler, context) {\r
+                       this._handlers[id] = qx.lang.Function.bind(handler, context);\r
                        this._sendMessage(destination, id, 'listen');\r
                },\r
 \r
@@ -135,7 +120,7 @@ qx.Class.define("org.argeo.ria.remote.AmqClient", {
                                this._messages++;\r
                        } else {\r
                                this.startBatch();\r
-                               var req = new qx.io.remote.Request(this.uri, "post", "application/xml");\r
+                               var req = new qx.io.remote.Request(this.uri, "POST", "text/plain");\r
                                req.setParameter("destination", destination);\r
                                req.setParameter("message", message);\r
                                req.setParameter("type", type);\r
@@ -144,9 +129,9 @@ qx.Class.define("org.argeo.ria.remote.AmqClient", {
                        }\r
                },\r
 \r
-               _startPolling : function() {\r
+               startPolling : function() {\r
                        if (this.poll){\r
-                               var req = new qx.io.remote.Request(this.uri, "get", "application/xml");\r
+                               var req = new qx.io.remote.Request(this.uri, "GET", "application/xml");\r
                                req.setParameter("timeout", "10");\r
                                req.addListener("completed", this._pollHandler, this);\r
                                req.send();\r