2 * Copyright (C) 2007-2012 Mathieu Baudier
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org
.argeo
.slc
.server
.client
.impl
;
18 import java
.util
.ArrayList
;
19 import java
.util
.HashMap
;
20 import java
.util
.List
;
22 import java
.util
.UUID
;
24 import org
.apache
.commons
.logging
.Log
;
25 import org
.apache
.commons
.logging
.LogFactory
;
26 import org
.argeo
.slc
.Condition
;
27 import org
.argeo
.slc
.SlcException
;
28 import org
.argeo
.slc
.execution
.ExecutionFlowDescriptor
;
29 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
30 import org
.argeo
.slc
.msg
.ExecutionAnswer
;
31 import org
.argeo
.slc
.msg
.MsgConstants
;
32 import org
.argeo
.slc
.msg
.ObjectList
;
33 import org
.argeo
.slc
.msg
.event
.SlcEvent
;
34 import org
.argeo
.slc
.process
.RealizedFlow
;
35 import org
.argeo
.slc
.process
.SlcExecution
;
36 import org
.argeo
.slc
.runtime
.SlcAgentDescriptor
;
37 import org
.argeo
.slc
.server
.client
.SlcServerHttpClient
;
39 public class SlcServerHttpClientImpl
extends AbstractHttpServicesClient
40 implements SlcServerHttpClient
{
42 protected final static String PARAM_AGENT_ID
= "agentId";
44 private final static Log log
= LogFactory
45 .getLog(SlcServerHttpClientImpl
.class);
47 private Long serverReadyTimeout
= 120 * 1000l;
49 public void waitForSlcExecutionFinished(SlcExecution slcExecution
,
51 if (slcExecution
.getStatus().equals(SlcExecution
.COMPLETED
))
54 long begin
= System
.currentTimeMillis();
55 while (System
.currentTimeMillis() - begin
< timeout(timeout
)) {
56 SlcEvent event
= pollEvent(timeout
);
57 String slcExecutionId
= event
.getHeaders().get(
58 MsgConstants
.PROPERTY_SLC_EXECUTION_ID
);
59 String status
= event
.getHeaders().get(
60 MsgConstants
.PROPERTY_SLC_EXECUTION_STATUS
);
61 if (slcExecutionId
.equals(slcExecution
.getUuid())
62 && status
.equals(SlcExecution
.COMPLETED
)) {
66 throw new SlcException("SLC Execution not completed after timeout "
67 + timeout(timeout
) + " elapsed.");
70 public SlcEvent
pollEvent(Long timeout
) {
71 long begin
= System
.currentTimeMillis();
72 while (System
.currentTimeMillis() - begin
< timeout(timeout
)) {
73 Object obj
= callService(POLL_EVENT
, null);
74 if (obj
instanceof ExecutionAnswer
) {
75 ExecutionAnswer answer
= (ExecutionAnswer
) obj
;
77 throw new SlcException(
78 "Unexpected exception when polling event: "
79 + answer
.getMessage());
81 return (SlcEvent
) obj
;
84 throw new SlcException("No event received after timeout "
85 + timeout(timeout
) + " elapsed.");
88 public ExecutionAnswer
addEventListener(String eventType
, String eventFilter
) {
89 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
90 parameters
.put(SlcEvent
.EVENT_TYPE
, eventType
);
91 parameters
.put(SlcEvent
.EVENT_FILTER
, eventFilter
);
92 return callService(ADD_EVENT_LISTENER
, parameters
);
95 public ExecutionAnswer
removeEventListener(String eventType
,
97 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
98 parameters
.put(SlcEvent
.EVENT_TYPE
, eventType
);
99 parameters
.put(SlcEvent
.EVENT_FILTER
, eventFilter
);
100 return callService(REMOVE_EVENT_LISTENER
, parameters
);
103 public SlcExecution
startFlow(String agentId
, RealizedFlow realizedFlow
) {
104 SlcExecution slcExecution
= new SlcExecution();
105 slcExecution
.setUuid(UUID
.randomUUID().toString());
107 slcExecution
.getRealizedFlows().add(realizedFlow
);
109 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
110 parameters
.put(MsgConstants
.PROPERTY_SLC_AGENT_ID
, agentId
);
111 ExecutionAnswer answer
= callService(NEW_SLC_EXECUTION
, parameters
,
114 throw new SlcException("Could not start flow on agent " + agentId
115 + ": " + answer
.getMessage());
119 public SlcExecution
startFlowDefault(String moduleName
, String flowName
,
120 Map
<String
, Object
> args
) {
121 SlcAgentDescriptor agentDescriptor
= waitForOneAgent();
122 List
<ExecutionModuleDescriptor
> lst
= listModuleDescriptors(agentDescriptor
124 ExecutionModuleDescriptor moduleDescMinimal
= findModule(lst
,
126 if (moduleDescMinimal
== null)
127 throw new SlcException("Cannot find module " + moduleName
);
128 String moduleVersion
= moduleDescMinimal
.getVersion();
130 ExecutionModuleDescriptor moduleDesc
= getModuleDescriptor(
131 agentDescriptor
.getUuid(), moduleName
, moduleVersion
);
133 RealizedFlow realizedFlow
= new RealizedFlow();
134 realizedFlow
.setModuleName(moduleName
);
135 realizedFlow
.setModuleVersion(moduleDesc
.getVersion());
137 ExecutionFlowDescriptor flowDescriptor
= findFlow(moduleDesc
, flowName
);
139 for (String key
: args
.keySet()) {
140 if (flowDescriptor
.getValues().containsKey(key
)) {
141 flowDescriptor
.getValues().put(key
, args
.get(key
));
145 realizedFlow
.setFlowDescriptor(flowDescriptor
);
147 return startFlow(agentDescriptor
.getUuid(), realizedFlow
);
149 // FIXME: polling not working when called from test: no unique
150 // session is created on server side
151 // SlcExecution slcExecutionFinished = null;
154 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
155 // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(),
158 // waitForSlcExecutionFinished(slcExecution, null);
160 // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null);
161 // for (Serializable sr : ol.getObjects()) {
162 // SlcExecution se = (SlcExecution) sr;
163 // if (se.getUuid().equals(slcExecution.getUuid())) {
164 // slcExecutionFinished = se;
170 // removeEventListener(
171 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
174 // if (slcExecutionFinished == null)
175 // throw new SlcException("No finished SLC Execution.");
176 // return slcExecutionFinished;
179 public static ExecutionModuleDescriptor
findModule(
180 List
<ExecutionModuleDescriptor
> lst
, String moduleName
) {
181 ExecutionModuleDescriptor moduleDesc
= null;
182 for (ExecutionModuleDescriptor desc
: lst
) {
183 if (desc
.getName().equals(moduleName
)) {
184 if (moduleDesc
!= null)
185 throw new SlcException(
186 "There is more than one module named " + moduleName
187 + " (versions: " + moduleDesc
+ " and "
188 + desc
.getVersion() + ")");
195 public static ExecutionFlowDescriptor
findFlow(
196 ExecutionModuleDescriptor moduleDesc
, String flowName
) {
197 ExecutionFlowDescriptor flowDesc
= null;
198 for (ExecutionFlowDescriptor desc
: moduleDesc
.getExecutionFlows()) {
199 if (desc
.getName().equals(flowName
)) {
206 public List
<ExecutionModuleDescriptor
> listModuleDescriptors(String agentId
) {
207 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
208 parameters
.put(PARAM_AGENT_ID
, agentId
);
210 List
<ExecutionModuleDescriptor
> moduleDescriptors
= new ArrayList
<ExecutionModuleDescriptor
>();
211 ObjectList ol
= callService(LIST_MODULE_DESCRIPTORS
, parameters
);
212 ol
.fill(moduleDescriptors
);
213 return moduleDescriptors
;
216 public ExecutionModuleDescriptor
getModuleDescriptor(String agentId
,
217 String moduleName
, String version
) {
218 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
219 parameters
.put(PARAM_AGENT_ID
, agentId
);
220 parameters
.put("moduleName", moduleName
);
221 parameters
.put("version", version
);
222 ExecutionModuleDescriptor moduleDescriptor
= callService(
223 GET_MODULE_DESCRIPTOR
, parameters
);
224 return moduleDescriptor
;
227 public SlcAgentDescriptor
waitForOneAgent() {
228 ObjectList objectList
= callServiceSafe(LIST_AGENTS
, null,
229 new Condition
<ObjectList
>() {
230 public Boolean
check(ObjectList obj
) {
231 int size
= obj
.getObjects().size();
232 if (log
.isTraceEnabled())
233 log
.trace("Object list size: " + size
);
237 return (SlcAgentDescriptor
) objectList
.getObjects().get(0);
240 public void waitForServerToBeReady() {
241 ExecutionAnswer answer
= callServiceSafe(IS_SERVER_READY
, null, null,
244 throw new SlcException("Server is not ready: " + answer
);
248 * Timeout in ms after which the client will stop waiting for the server to
249 * be ready and throw an exception. Default is 120s.
251 public void setServerReadyTimeout(Long serverReadyTimeout
) {
252 this.serverReadyTimeout
= serverReadyTimeout
;