1 package org
.argeo
.slc
.server
.client
.impl
;
3 import java
.io
.Serializable
;
4 import java
.util
.ArrayList
;
5 import java
.util
.HashMap
;
10 import org
.apache
.commons
.logging
.Log
;
11 import org
.apache
.commons
.logging
.LogFactory
;
12 import org
.argeo
.slc
.Condition
;
13 import org
.argeo
.slc
.SlcException
;
14 import org
.argeo
.slc
.execution
.ExecutionFlowDescriptor
;
15 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
16 import org
.argeo
.slc
.msg
.ExecutionAnswer
;
17 import org
.argeo
.slc
.msg
.MsgConstants
;
18 import org
.argeo
.slc
.msg
.ObjectList
;
19 import org
.argeo
.slc
.msg
.event
.SlcEvent
;
20 import org
.argeo
.slc
.process
.RealizedFlow
;
21 import org
.argeo
.slc
.process
.SlcExecution
;
22 import org
.argeo
.slc
.runtime
.SlcAgentDescriptor
;
23 import org
.argeo
.slc
.server
.client
.SlcServerHttpClient
;
24 import org
.argeo
.slc
.services
.EventPublisherAspect
;
26 public class SlcServerHttpClientImpl
extends AbstractHttpServicesClient
27 implements SlcServerHttpClient
{
29 protected final static String PARAM_AGENT_ID
= "agentId";
31 private final static Log log
= LogFactory
32 .getLog(SlcServerHttpClientImpl
.class);
34 private Long serverReadyTimeout
= 120 * 1000l;
36 public void waitForSlcExecutionFinished(SlcExecution slcExecution
,
38 if (slcExecution
.getStatus().equals(SlcExecution
.STATUS_FINISHED
))
41 long begin
= System
.currentTimeMillis();
42 while (System
.currentTimeMillis() - begin
< timeout(timeout
)) {
43 SlcEvent event
= pollEvent(timeout
);
44 String slcExecutionId
= event
.getHeaders().get(
45 MsgConstants
.PROPERTY_SLC_EXECUTION_ID
);
46 String status
= event
.getHeaders().get(
47 MsgConstants
.PROPERTY_SLC_EXECUTION_STATUS
);
48 if (slcExecutionId
.equals(slcExecution
.getUuid())
49 && status
.equals(SlcExecution
.STATUS_FINISHED
)) {
53 throw new SlcException("SLC Execution not completed after timeout "
54 + timeout(timeout
) + " elapsed.");
57 public SlcEvent
pollEvent(Long timeout
) {
58 long begin
= System
.currentTimeMillis();
59 while (System
.currentTimeMillis() - begin
< timeout(timeout
)) {
60 Object obj
= callService(POLL_EVENT
, null);
61 if (obj
instanceof ExecutionAnswer
) {
62 ExecutionAnswer answer
= (ExecutionAnswer
) obj
;
64 throw new SlcException(
65 "Unexpected exception when pollign event: "
66 + answer
.getMessage());
68 return (SlcEvent
) obj
;
71 throw new SlcException("No event received after timeout "
72 + timeout(timeout
) + " elapsed.");
75 public ExecutionAnswer
addEventListener(String eventType
, String eventFilter
) {
76 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
77 parameters
.put(SlcEvent
.EVENT_TYPE
, eventType
);
78 parameters
.put(SlcEvent
.EVENT_FILTER
, eventFilter
);
79 return callService(ADD_EVENT_LISTENER
, parameters
);
82 public ExecutionAnswer
removeEventListener(String eventType
,
84 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
85 parameters
.put(SlcEvent
.EVENT_TYPE
, eventType
);
86 parameters
.put(SlcEvent
.EVENT_FILTER
, eventFilter
);
87 return callService(REMOVE_EVENT_LISTENER
, parameters
);
90 public SlcExecution
startFlow(String agentId
, RealizedFlow realizedFlow
) {
91 SlcExecution slcExecution
= new SlcExecution();
92 slcExecution
.setUuid(UUID
.randomUUID().toString());
94 slcExecution
.getRealizedFlows().add(realizedFlow
);
96 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
97 parameters
.put(MsgConstants
.PROPERTY_SLC_AGENT_ID
, agentId
);
98 ExecutionAnswer answer
= callService(NEW_SLC_EXECUTION
, parameters
,
101 throw new SlcException("Could not start flow on agent " + agentId
102 + ": " + answer
.getMessage());
106 public SlcExecution
startFlowDefault(String moduleName
, String flowName
,
107 Map
<String
, Object
> args
) {
108 SlcAgentDescriptor agentDescriptor
= waitForOneAgent();
109 List
<ExecutionModuleDescriptor
> lst
= listModuleDescriptors(agentDescriptor
111 ExecutionModuleDescriptor moduleDescMinimal
= findModule(lst
,
113 String moduleVersion
= moduleDescMinimal
.getVersion();
115 ExecutionModuleDescriptor moduleDesc
= getModuleDescriptor(
116 agentDescriptor
.getUuid(), moduleName
, moduleVersion
);
118 RealizedFlow realizedFlow
= new RealizedFlow();
119 realizedFlow
.setModuleName(moduleName
);
120 realizedFlow
.setModuleVersion(moduleDesc
.getVersion());
122 ExecutionFlowDescriptor flowDescriptor
= findFlow(moduleDesc
, flowName
);
124 for (String key
: args
.keySet()) {
125 if (flowDescriptor
.getValues().containsKey(key
)) {
126 flowDescriptor
.getValues().put(key
, args
.get(key
));
130 realizedFlow
.setFlowDescriptor(flowDescriptor
);
132 return startFlow(agentDescriptor
.getUuid(), realizedFlow
);
134 // FIXME: polling not working when called from test: no unique
135 // session is created on server side
136 // SlcExecution slcExecutionFinished = null;
139 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
140 // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(),
143 // waitForSlcExecutionFinished(slcExecution, null);
145 // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null);
146 // for (Serializable sr : ol.getObjects()) {
147 // SlcExecution se = (SlcExecution) sr;
148 // if (se.getUuid().equals(slcExecution.getUuid())) {
149 // slcExecutionFinished = se;
155 // removeEventListener(
156 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
159 // if (slcExecutionFinished == null)
160 // throw new SlcException("No finished SLC Execution.");
161 // return slcExecutionFinished;
164 public static ExecutionModuleDescriptor
findModule(
165 List
<ExecutionModuleDescriptor
> lst
, String moduleName
) {
166 ExecutionModuleDescriptor moduleDesc
= null;
167 for (ExecutionModuleDescriptor desc
: lst
) {
168 if (desc
.getName().equals(moduleName
)) {
169 if (moduleDesc
!= null)
170 throw new SlcException(
171 "There is more than one module named " + moduleName
172 + " (versions: " + moduleDesc
+ " and "
173 + desc
.getVersion() + ")");
180 public static ExecutionFlowDescriptor
findFlow(
181 ExecutionModuleDescriptor moduleDesc
, String flowName
) {
182 ExecutionFlowDescriptor flowDesc
= null;
183 for (ExecutionFlowDescriptor desc
: moduleDesc
.getExecutionFlows()) {
184 if (desc
.getName().equals(flowName
)) {
191 public List
<ExecutionModuleDescriptor
> listModuleDescriptors(String agentId
) {
192 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
193 parameters
.put(PARAM_AGENT_ID
, agentId
);
195 List
<ExecutionModuleDescriptor
> moduleDescriptors
= new ArrayList
<ExecutionModuleDescriptor
>();
196 ObjectList ol
= callService(LIST_MODULE_DESCRIPTORS
, parameters
);
197 ol
.fill(moduleDescriptors
);
198 return moduleDescriptors
;
201 public ExecutionModuleDescriptor
getModuleDescriptor(String agentId
,
202 String moduleName
, String version
) {
203 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
204 parameters
.put(PARAM_AGENT_ID
, agentId
);
205 parameters
.put("moduleName", moduleName
);
206 parameters
.put("version", version
);
207 ExecutionModuleDescriptor moduleDescriptor
= callService(
208 GET_MODULE_DESCRIPTOR
, parameters
);
209 return moduleDescriptor
;
212 public SlcAgentDescriptor
waitForOneAgent() {
213 ObjectList objectList
= callServiceSafe(LIST_AGENTS
, null,
214 new Condition
<ObjectList
>() {
215 public Boolean
check(ObjectList obj
) {
216 int size
= obj
.getObjects().size();
217 if (log
.isTraceEnabled())
218 log
.trace("Object list size: " + size
);
222 return (SlcAgentDescriptor
) objectList
.getObjects().get(0);
225 public void waitForServerToBeReady() {
226 ExecutionAnswer answer
= callServiceSafe(IS_SERVER_READY
, null, null,
229 throw new SlcException("Server is not ready: " + answer
);
233 * Timeout in ms after which the client will stop waiting for the server to
234 * be ready and throw an exception. Default is 120s.
236 public void setServerReadyTimeout(Long serverReadyTimeout
) {
237 this.serverReadyTimeout
= serverReadyTimeout
;