2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org
.argeo
.slc
.server
.client
.impl
;
19 import java
.util
.ArrayList
;
20 import java
.util
.HashMap
;
21 import java
.util
.List
;
23 import java
.util
.UUID
;
25 import org
.apache
.commons
.logging
.Log
;
26 import org
.apache
.commons
.logging
.LogFactory
;
27 import org
.argeo
.slc
.Condition
;
28 import org
.argeo
.slc
.SlcException
;
29 import org
.argeo
.slc
.execution
.ExecutionFlowDescriptor
;
30 import org
.argeo
.slc
.execution
.ExecutionModuleDescriptor
;
31 import org
.argeo
.slc
.msg
.ExecutionAnswer
;
32 import org
.argeo
.slc
.msg
.MsgConstants
;
33 import org
.argeo
.slc
.msg
.ObjectList
;
34 import org
.argeo
.slc
.msg
.event
.SlcEvent
;
35 import org
.argeo
.slc
.process
.RealizedFlow
;
36 import org
.argeo
.slc
.process
.SlcExecution
;
37 import org
.argeo
.slc
.runtime
.SlcAgentDescriptor
;
38 import org
.argeo
.slc
.server
.client
.SlcServerHttpClient
;
40 public class SlcServerHttpClientImpl
extends AbstractHttpServicesClient
41 implements SlcServerHttpClient
{
43 protected final static String PARAM_AGENT_ID
= "agentId";
45 private final static Log log
= LogFactory
46 .getLog(SlcServerHttpClientImpl
.class);
48 private Long serverReadyTimeout
= 120 * 1000l;
50 public void waitForSlcExecutionFinished(SlcExecution slcExecution
,
52 if (slcExecution
.getStatus().equals(SlcExecution
.COMPLETED
))
55 long begin
= System
.currentTimeMillis();
56 while (System
.currentTimeMillis() - begin
< timeout(timeout
)) {
57 SlcEvent event
= pollEvent(timeout
);
58 String slcExecutionId
= event
.getHeaders().get(
59 MsgConstants
.PROPERTY_SLC_EXECUTION_ID
);
60 String status
= event
.getHeaders().get(
61 MsgConstants
.PROPERTY_SLC_EXECUTION_STATUS
);
62 if (slcExecutionId
.equals(slcExecution
.getUuid())
63 && status
.equals(SlcExecution
.COMPLETED
)) {
67 throw new SlcException("SLC Execution not completed after timeout "
68 + timeout(timeout
) + " elapsed.");
71 public SlcEvent
pollEvent(Long timeout
) {
72 long begin
= System
.currentTimeMillis();
73 while (System
.currentTimeMillis() - begin
< timeout(timeout
)) {
74 Object obj
= callService(POLL_EVENT
, null);
75 if (obj
instanceof ExecutionAnswer
) {
76 ExecutionAnswer answer
= (ExecutionAnswer
) obj
;
78 throw new SlcException(
79 "Unexpected exception when polling event: "
80 + answer
.getMessage());
82 return (SlcEvent
) obj
;
85 throw new SlcException("No event received after timeout "
86 + timeout(timeout
) + " elapsed.");
89 public ExecutionAnswer
addEventListener(String eventType
, String eventFilter
) {
90 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
91 parameters
.put(SlcEvent
.EVENT_TYPE
, eventType
);
92 parameters
.put(SlcEvent
.EVENT_FILTER
, eventFilter
);
93 return callService(ADD_EVENT_LISTENER
, parameters
);
96 public ExecutionAnswer
removeEventListener(String eventType
,
98 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
99 parameters
.put(SlcEvent
.EVENT_TYPE
, eventType
);
100 parameters
.put(SlcEvent
.EVENT_FILTER
, eventFilter
);
101 return callService(REMOVE_EVENT_LISTENER
, parameters
);
104 public SlcExecution
startFlow(String agentId
, RealizedFlow realizedFlow
) {
105 SlcExecution slcExecution
= new SlcExecution();
106 slcExecution
.setUuid(UUID
.randomUUID().toString());
108 slcExecution
.getRealizedFlows().add(realizedFlow
);
110 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
111 parameters
.put(MsgConstants
.PROPERTY_SLC_AGENT_ID
, agentId
);
112 ExecutionAnswer answer
= callService(NEW_SLC_EXECUTION
, parameters
,
115 throw new SlcException("Could not start flow on agent " + agentId
116 + ": " + answer
.getMessage());
120 public SlcExecution
startFlowDefault(String moduleName
, String flowName
,
121 Map
<String
, Object
> args
) {
122 SlcAgentDescriptor agentDescriptor
= waitForOneAgent();
123 List
<ExecutionModuleDescriptor
> lst
= listModuleDescriptors(agentDescriptor
125 ExecutionModuleDescriptor moduleDescMinimal
= findModule(lst
,
127 if (moduleDescMinimal
== null)
128 throw new SlcException("Cannot find module " + moduleName
);
129 String moduleVersion
= moduleDescMinimal
.getVersion();
131 ExecutionModuleDescriptor moduleDesc
= getModuleDescriptor(
132 agentDescriptor
.getUuid(), moduleName
, moduleVersion
);
134 RealizedFlow realizedFlow
= new RealizedFlow();
135 realizedFlow
.setModuleName(moduleName
);
136 realizedFlow
.setModuleVersion(moduleDesc
.getVersion());
138 ExecutionFlowDescriptor flowDescriptor
= findFlow(moduleDesc
, flowName
);
140 for (String key
: args
.keySet()) {
141 if (flowDescriptor
.getValues().containsKey(key
)) {
142 flowDescriptor
.getValues().put(key
, args
.get(key
));
146 realizedFlow
.setFlowDescriptor(flowDescriptor
);
148 return startFlow(agentDescriptor
.getUuid(), realizedFlow
);
150 // FIXME: polling not working when called from test: no unique
151 // session is created on server side
152 // SlcExecution slcExecutionFinished = null;
155 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
156 // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(),
159 // waitForSlcExecutionFinished(slcExecution, null);
161 // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null);
162 // for (Serializable sr : ol.getObjects()) {
163 // SlcExecution se = (SlcExecution) sr;
164 // if (se.getUuid().equals(slcExecution.getUuid())) {
165 // slcExecutionFinished = se;
171 // removeEventListener(
172 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
175 // if (slcExecutionFinished == null)
176 // throw new SlcException("No finished SLC Execution.");
177 // return slcExecutionFinished;
180 public static ExecutionModuleDescriptor
findModule(
181 List
<ExecutionModuleDescriptor
> lst
, String moduleName
) {
182 ExecutionModuleDescriptor moduleDesc
= null;
183 for (ExecutionModuleDescriptor desc
: lst
) {
184 if (desc
.getName().equals(moduleName
)) {
185 if (moduleDesc
!= null)
186 throw new SlcException(
187 "There is more than one module named " + moduleName
188 + " (versions: " + moduleDesc
+ " and "
189 + desc
.getVersion() + ")");
196 public static ExecutionFlowDescriptor
findFlow(
197 ExecutionModuleDescriptor moduleDesc
, String flowName
) {
198 ExecutionFlowDescriptor flowDesc
= null;
199 for (ExecutionFlowDescriptor desc
: moduleDesc
.getExecutionFlows()) {
200 if (desc
.getName().equals(flowName
)) {
207 public List
<ExecutionModuleDescriptor
> listModuleDescriptors(String agentId
) {
208 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
209 parameters
.put(PARAM_AGENT_ID
, agentId
);
211 List
<ExecutionModuleDescriptor
> moduleDescriptors
= new ArrayList
<ExecutionModuleDescriptor
>();
212 ObjectList ol
= callService(LIST_MODULE_DESCRIPTORS
, parameters
);
213 ol
.fill(moduleDescriptors
);
214 return moduleDescriptors
;
217 public ExecutionModuleDescriptor
getModuleDescriptor(String agentId
,
218 String moduleName
, String version
) {
219 Map
<String
, String
> parameters
= new HashMap
<String
, String
>();
220 parameters
.put(PARAM_AGENT_ID
, agentId
);
221 parameters
.put("moduleName", moduleName
);
222 parameters
.put("version", version
);
223 ExecutionModuleDescriptor moduleDescriptor
= callService(
224 GET_MODULE_DESCRIPTOR
, parameters
);
225 return moduleDescriptor
;
228 public SlcAgentDescriptor
waitForOneAgent() {
229 ObjectList objectList
= callServiceSafe(LIST_AGENTS
, null,
230 new Condition
<ObjectList
>() {
231 public Boolean
check(ObjectList obj
) {
232 int size
= obj
.getObjects().size();
233 if (log
.isTraceEnabled())
234 log
.trace("Object list size: " + size
);
238 return (SlcAgentDescriptor
) objectList
.getObjects().get(0);
241 public void waitForServerToBeReady() {
242 ExecutionAnswer answer
= callServiceSafe(IS_SERVER_READY
, null, null,
245 throw new SlcException("Server is not ready: " + answer
);
249 * Timeout in ms after which the client will stop waiting for the server to
250 * be ready and throw an exception. Default is 120s.
252 public void setServerReadyTimeout(Long serverReadyTimeout
) {
253 this.serverReadyTimeout
= serverReadyTimeout
;