]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.launcher/src/main/java/org/argeo/slc/server/client/impl/SlcServerHttpClientImpl.java
Update headers
[gpl/argeo-slc.git] / runtime / org.argeo.slc.launcher / src / main / java / org / argeo / slc / server / client / impl / SlcServerHttpClientImpl.java
1 /*
2 * Copyright (C) 2007-2012 Mathieu Baudier
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.argeo.slc.server.client.impl;
17
18 import java.util.ArrayList;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.UUID;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.argeo.slc.Condition;
27 import org.argeo.slc.SlcException;
28 import org.argeo.slc.execution.ExecutionFlowDescriptor;
29 import org.argeo.slc.execution.ExecutionModuleDescriptor;
30 import org.argeo.slc.msg.ExecutionAnswer;
31 import org.argeo.slc.msg.MsgConstants;
32 import org.argeo.slc.msg.ObjectList;
33 import org.argeo.slc.msg.event.SlcEvent;
34 import org.argeo.slc.process.RealizedFlow;
35 import org.argeo.slc.process.SlcExecution;
36 import org.argeo.slc.runtime.SlcAgentDescriptor;
37 import org.argeo.slc.server.client.SlcServerHttpClient;
38
39 public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
40 implements SlcServerHttpClient {
41
42 protected final static String PARAM_AGENT_ID = "agentId";
43
44 private final static Log log = LogFactory
45 .getLog(SlcServerHttpClientImpl.class);
46
47 private Long serverReadyTimeout = 120 * 1000l;
48
49 public void waitForSlcExecutionFinished(SlcExecution slcExecution,
50 Long timeout) {
51 if (slcExecution.getStatus().equals(SlcExecution.COMPLETED))
52 return;
53
54 long begin = System.currentTimeMillis();
55 while (System.currentTimeMillis() - begin < timeout(timeout)) {
56 SlcEvent event = pollEvent(timeout);
57 String slcExecutionId = event.getHeaders().get(
58 MsgConstants.PROPERTY_SLC_EXECUTION_ID);
59 String status = event.getHeaders().get(
60 MsgConstants.PROPERTY_SLC_EXECUTION_STATUS);
61 if (slcExecutionId.equals(slcExecution.getUuid())
62 && status.equals(SlcExecution.COMPLETED)) {
63 return;
64 }
65 }
66 throw new SlcException("SLC Execution not completed after timeout "
67 + timeout(timeout) + " elapsed.");
68 }
69
70 public SlcEvent pollEvent(Long timeout) {
71 long begin = System.currentTimeMillis();
72 while (System.currentTimeMillis() - begin < timeout(timeout)) {
73 Object obj = callService(POLL_EVENT, null);
74 if (obj instanceof ExecutionAnswer) {
75 ExecutionAnswer answer = (ExecutionAnswer) obj;
76 if (answer.isError())
77 throw new SlcException(
78 "Unexpected exception when polling event: "
79 + answer.getMessage());
80 } else {
81 return (SlcEvent) obj;
82 }
83 }
84 throw new SlcException("No event received after timeout "
85 + timeout(timeout) + " elapsed.");
86 }
87
88 public ExecutionAnswer addEventListener(String eventType, String eventFilter) {
89 Map<String, String> parameters = new HashMap<String, String>();
90 parameters.put(SlcEvent.EVENT_TYPE, eventType);
91 parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
92 return callService(ADD_EVENT_LISTENER, parameters);
93 }
94
95 public ExecutionAnswer removeEventListener(String eventType,
96 String eventFilter) {
97 Map<String, String> parameters = new HashMap<String, String>();
98 parameters.put(SlcEvent.EVENT_TYPE, eventType);
99 parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
100 return callService(REMOVE_EVENT_LISTENER, parameters);
101 }
102
103 public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow) {
104 SlcExecution slcExecution = new SlcExecution();
105 slcExecution.setUuid(UUID.randomUUID().toString());
106
107 slcExecution.getRealizedFlows().add(realizedFlow);
108
109 Map<String, String> parameters = new HashMap<String, String>();
110 parameters.put(MsgConstants.PROPERTY_SLC_AGENT_ID, agentId);
111 ExecutionAnswer answer = callService(NEW_SLC_EXECUTION, parameters,
112 slcExecution);
113 if (!answer.isOk())
114 throw new SlcException("Could not start flow on agent " + agentId
115 + ": " + answer.getMessage());
116 return slcExecution;
117 }
118
119 public SlcExecution startFlowDefault(String moduleName, String flowName,
120 Map<String, Object> args) {
121 SlcAgentDescriptor agentDescriptor = waitForOneAgent();
122 List<ExecutionModuleDescriptor> lst = listModuleDescriptors(agentDescriptor
123 .getUuid());
124 ExecutionModuleDescriptor moduleDescMinimal = findModule(lst,
125 moduleName);
126 if (moduleDescMinimal == null)
127 throw new SlcException("Cannot find module " + moduleName);
128 String moduleVersion = moduleDescMinimal.getVersion();
129
130 ExecutionModuleDescriptor moduleDesc = getModuleDescriptor(
131 agentDescriptor.getUuid(), moduleName, moduleVersion);
132
133 RealizedFlow realizedFlow = new RealizedFlow();
134 realizedFlow.setModuleName(moduleName);
135 realizedFlow.setModuleVersion(moduleDesc.getVersion());
136
137 ExecutionFlowDescriptor flowDescriptor = findFlow(moduleDesc, flowName);
138 if (args != null) {
139 for (String key : args.keySet()) {
140 if (flowDescriptor.getValues().containsKey(key)) {
141 flowDescriptor.getValues().put(key, args.get(key));
142 }
143 }
144 }
145 realizedFlow.setFlowDescriptor(flowDescriptor);
146
147 return startFlow(agentDescriptor.getUuid(), realizedFlow);
148
149 // FIXME: polling not working when called from test: no unique
150 // session is created on server side
151 // SlcExecution slcExecutionFinished = null;
152 // try {
153 // addEventListener(
154 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
155 // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(),
156 // realizedFlow);
157 //
158 // waitForSlcExecutionFinished(slcExecution, null);
159 //
160 // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null);
161 // for (Serializable sr : ol.getObjects()) {
162 // SlcExecution se = (SlcExecution) sr;
163 // if (se.getUuid().equals(slcExecution.getUuid())) {
164 // slcExecutionFinished = se;
165 // break;
166 // }
167 // }
168 //
169 // } finally {
170 // removeEventListener(
171 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
172 // }
173 //
174 // if (slcExecutionFinished == null)
175 // throw new SlcException("No finished SLC Execution.");
176 // return slcExecutionFinished;
177 }
178
179 public static ExecutionModuleDescriptor findModule(
180 List<ExecutionModuleDescriptor> lst, String moduleName) {
181 ExecutionModuleDescriptor moduleDesc = null;
182 for (ExecutionModuleDescriptor desc : lst) {
183 if (desc.getName().equals(moduleName)) {
184 if (moduleDesc != null)
185 throw new SlcException(
186 "There is more than one module named " + moduleName
187 + " (versions: " + moduleDesc + " and "
188 + desc.getVersion() + ")");
189 moduleDesc = desc;
190 }
191 }
192 return moduleDesc;
193 }
194
195 public static ExecutionFlowDescriptor findFlow(
196 ExecutionModuleDescriptor moduleDesc, String flowName) {
197 ExecutionFlowDescriptor flowDesc = null;
198 for (ExecutionFlowDescriptor desc : moduleDesc.getExecutionFlows()) {
199 if (desc.getName().equals(flowName)) {
200 flowDesc = desc;
201 }
202 }
203 return flowDesc;
204 }
205
206 public List<ExecutionModuleDescriptor> listModuleDescriptors(String agentId) {
207 Map<String, String> parameters = new HashMap<String, String>();
208 parameters.put(PARAM_AGENT_ID, agentId);
209
210 List<ExecutionModuleDescriptor> moduleDescriptors = new ArrayList<ExecutionModuleDescriptor>();
211 ObjectList ol = callService(LIST_MODULE_DESCRIPTORS, parameters);
212 ol.fill(moduleDescriptors);
213 return moduleDescriptors;
214 }
215
216 public ExecutionModuleDescriptor getModuleDescriptor(String agentId,
217 String moduleName, String version) {
218 Map<String, String> parameters = new HashMap<String, String>();
219 parameters.put(PARAM_AGENT_ID, agentId);
220 parameters.put("moduleName", moduleName);
221 parameters.put("version", version);
222 ExecutionModuleDescriptor moduleDescriptor = callService(
223 GET_MODULE_DESCRIPTOR, parameters);
224 return moduleDescriptor;
225 }
226
227 public SlcAgentDescriptor waitForOneAgent() {
228 ObjectList objectList = callServiceSafe(LIST_AGENTS, null,
229 new Condition<ObjectList>() {
230 public Boolean check(ObjectList obj) {
231 int size = obj.getObjects().size();
232 if (log.isTraceEnabled())
233 log.trace("Object list size: " + size);
234 return size == 1;
235 }
236 }, null);
237 return (SlcAgentDescriptor) objectList.getObjects().get(0);
238 }
239
240 public void waitForServerToBeReady() {
241 ExecutionAnswer answer = callServiceSafe(IS_SERVER_READY, null, null,
242 serverReadyTimeout);
243 if (!answer.isOk())
244 throw new SlcException("Server is not ready: " + answer);
245 }
246
247 /**
248 * Timeout in ms after which the client will stop waiting for the server to
249 * be ready and throw an exception. Default is 120s.
250 */
251 public void setServerReadyTimeout(Long serverReadyTimeout) {
252 this.serverReadyTimeout = serverReadyTimeout;
253 }
254
255 }