1 package org
.argeo
.slc
.server
.client
.impl
;
3 import java
.util
.ArrayList
;
4 import java
.util
.HashMap
;
9 import org
.apache
.commons
.logging
.Log
;
10 import org
.apache
.commons
.logging
.LogFactory
;
11 import org
.argeo
.slc
.Condition
;
12 import org
.argeo
.slc
.SlcException
;
13 import org
.argeo
.slc
.execution
.ExecutionFlowDescriptor
;
14 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
15 import org
.argeo
.slc
.msg
.ExecutionAnswer
;
16 import org
.argeo
.slc
.msg
.MsgConstants
;
17 import org
.argeo
.slc
.msg
.ObjectList
;
18 import org
.argeo
.slc
.msg
.event
.SlcEvent
;
19 import org
.argeo
.slc
.process
.RealizedFlow
;
20 import org
.argeo
.slc
.process
.SlcExecution
;
21 import org
.argeo
.slc
.runtime
.SlcAgentDescriptor
;
22 import org
.argeo
.slc
.server
.client
.SlcServerHttpClient
;
24 public class SlcServerHttpClientImpl
extends AbstractHttpServicesClient
25 implements SlcServerHttpClient
{
27 protected final static String PARAM_AGENT_ID
= "agentId";
29 private final static Log log
= LogFactory
30 .getLog(SlcServerHttpClientImpl
.class);
32 private Long serverReadyTimeout
= 120 * 1000l;
34 public void waitForSlcExecutionFinished(SlcExecution slcExecution
,
36 if (slcExecution
.getStatus().equals(SlcExecution
.STATUS_FINISHED
))
39 long begin
= System
.currentTimeMillis();
40 while (System
.currentTimeMillis() - begin
< timeout(timeout
)) {
41 SlcEvent event
= pollEvent(timeout
);
42 String slcExecutionId
= event
.getHeaders().get(
43 MsgConstants
.PROPERTY_SLC_EXECUTION_ID
);
44 String status
= event
.getHeaders().get(
45 MsgConstants
.PROPERTY_SLC_EXECUTION_STATUS
);
46 if (slcExecutionId
.equals(slcExecution
.getUuid())
47 && status
.equals(SlcExecution
.STATUS_FINISHED
)) {
51 throw new SlcException("SLC Execution not completed after timeout "
52 + timeout(timeout
) + " elapsed.");
55 public SlcEvent
pollEvent(Long timeout
) {
56 long begin
= System
.currentTimeMillis();
57 while (System
.currentTimeMillis() - begin
< timeout(timeout
)) {
58 Object obj
= callService(POLL_EVENT
, null);
59 if (obj
instanceof ExecutionAnswer
) {
60 ExecutionAnswer answer
= (ExecutionAnswer
) obj
;
62 throw new SlcException(
63 "Unexpected exception when polling event: "
64 + answer
.getMessage());
66 return (SlcEvent
) obj
;
69 throw new SlcException("No event received after timeout "
70 + timeout(timeout
) + " elapsed.");
73 public ExecutionAnswer
addEventListener(String eventType
, String eventFilter
) {
74 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
75 parameters
.put(SlcEvent
.EVENT_TYPE
, eventType
);
76 parameters
.put(SlcEvent
.EVENT_FILTER
, eventFilter
);
77 return callService(ADD_EVENT_LISTENER
, parameters
);
80 public ExecutionAnswer
removeEventListener(String eventType
,
82 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
83 parameters
.put(SlcEvent
.EVENT_TYPE
, eventType
);
84 parameters
.put(SlcEvent
.EVENT_FILTER
, eventFilter
);
85 return callService(REMOVE_EVENT_LISTENER
, parameters
);
88 public SlcExecution
startFlow(String agentId
, RealizedFlow realizedFlow
) {
89 SlcExecution slcExecution
= new SlcExecution();
90 slcExecution
.setUuid(UUID
.randomUUID().toString());
92 slcExecution
.getRealizedFlows().add(realizedFlow
);
94 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
95 parameters
.put(MsgConstants
.PROPERTY_SLC_AGENT_ID
, agentId
);
96 ExecutionAnswer answer
= callService(NEW_SLC_EXECUTION
, parameters
,
99 throw new SlcException("Could not start flow on agent " + agentId
100 + ": " + answer
.getMessage());
104 public SlcExecution
startFlowDefault(String moduleName
, String flowName
,
105 Map
<String
, Object
> args
) {
106 SlcAgentDescriptor agentDescriptor
= waitForOneAgent();
107 List
<ExecutionModuleDescriptor
> lst
= listModuleDescriptors(agentDescriptor
109 ExecutionModuleDescriptor moduleDescMinimal
= findModule(lst
,
111 if (moduleDescMinimal
== null)
112 throw new SlcException("Cannot find module " + moduleName
);
113 String moduleVersion
= moduleDescMinimal
.getVersion();
115 ExecutionModuleDescriptor moduleDesc
= getModuleDescriptor(
116 agentDescriptor
.getUuid(), moduleName
, moduleVersion
);
118 RealizedFlow realizedFlow
= new RealizedFlow();
119 realizedFlow
.setModuleName(moduleName
);
120 realizedFlow
.setModuleVersion(moduleDesc
.getVersion());
122 ExecutionFlowDescriptor flowDescriptor
= findFlow(moduleDesc
, flowName
);
124 for (String key
: args
.keySet()) {
125 if (flowDescriptor
.getValues().containsKey(key
)) {
126 flowDescriptor
.getValues().put(key
, args
.get(key
));
130 realizedFlow
.setFlowDescriptor(flowDescriptor
);
132 return startFlow(agentDescriptor
.getUuid(), realizedFlow
);
134 // FIXME: polling not working when called from test: no unique
135 // session is created on server side
136 // SlcExecution slcExecutionFinished = null;
139 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
140 // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(),
143 // waitForSlcExecutionFinished(slcExecution, null);
145 // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null);
146 // for (Serializable sr : ol.getObjects()) {
147 // SlcExecution se = (SlcExecution) sr;
148 // if (se.getUuid().equals(slcExecution.getUuid())) {
149 // slcExecutionFinished = se;
155 // removeEventListener(
156 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
159 // if (slcExecutionFinished == null)
160 // throw new SlcException("No finished SLC Execution.");
161 // return slcExecutionFinished;
164 public static ExecutionModuleDescriptor
findModule(
165 List
<ExecutionModuleDescriptor
> lst
, String moduleName
) {
166 ExecutionModuleDescriptor moduleDesc
= null;
167 for (ExecutionModuleDescriptor desc
: lst
) {
168 if (desc
.getName().equals(moduleName
)) {
169 if (moduleDesc
!= null)
170 throw new SlcException(
171 "There is more than one module named " + moduleName
172 + " (versions: " + moduleDesc
+ " and "
173 + desc
.getVersion() + ")");
180 public static ExecutionFlowDescriptor
findFlow(
181 ExecutionModuleDescriptor moduleDesc
, String flowName
) {
182 ExecutionFlowDescriptor flowDesc
= null;
183 for (ExecutionFlowDescriptor desc
: moduleDesc
.getExecutionFlows()) {
184 if (desc
.getName().equals(flowName
)) {
191 public List
<ExecutionModuleDescriptor
> listModuleDescriptors(String agentId
) {
192 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
193 parameters
.put(PARAM_AGENT_ID
, agentId
);
195 List
<ExecutionModuleDescriptor
> moduleDescriptors
= new ArrayList
<ExecutionModuleDescriptor
>();
196 ObjectList ol
= callService(LIST_MODULE_DESCRIPTORS
, parameters
);
197 ol
.fill(moduleDescriptors
);
198 return moduleDescriptors
;
201 public ExecutionModuleDescriptor
getModuleDescriptor(String agentId
,
202 String moduleName
, String version
) {
203 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
204 parameters
.put(PARAM_AGENT_ID
, agentId
);
205 parameters
.put("moduleName", moduleName
);
206 parameters
.put("version", version
);
207 ExecutionModuleDescriptor moduleDescriptor
= callService(
208 GET_MODULE_DESCRIPTOR
, parameters
);
209 return moduleDescriptor
;
212 public SlcAgentDescriptor
waitForOneAgent() {
213 ObjectList objectList
= callServiceSafe(LIST_AGENTS
, null,
214 new Condition
<ObjectList
>() {
215 public Boolean
check(ObjectList obj
) {
216 int size
= obj
.getObjects().size();
217 if (log
.isTraceEnabled())
218 log
.trace("Object list size: " + size
);
222 return (SlcAgentDescriptor
) objectList
.getObjects().get(0);
225 public void waitForServerToBeReady() {
226 ExecutionAnswer answer
= callServiceSafe(IS_SERVER_READY
, null, null,
229 throw new SlcException("Server is not ready: " + answer
);
233 * Timeout in ms after which the client will stop waiting for the server to
234 * be ready and throw an exception. Default is 120s.
236 public void setServerReadyTimeout(Long serverReadyTimeout
) {
237 this.serverReadyTimeout
= serverReadyTimeout
;