]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.launcher/src/main/java/org/argeo/slc/server/client/impl/SlcServerHttpClientImpl.java
Add license headers
[gpl/argeo-slc.git] / runtime / org.argeo.slc.launcher / src / main / java / org / argeo / slc / server / client / impl / SlcServerHttpClientImpl.java
1 /*
2 * Copyright (C) 2010 Mathieu Baudier <mbaudier@argeo.org>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.argeo.slc.server.client.impl;
18
19 import java.util.ArrayList;
20 import java.util.HashMap;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.UUID;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.argeo.slc.Condition;
28 import org.argeo.slc.SlcException;
29 import org.argeo.slc.execution.ExecutionFlowDescriptor;
30 import org.argeo.slc.execution.ExecutionModuleDescriptor;
31 import org.argeo.slc.msg.ExecutionAnswer;
32 import org.argeo.slc.msg.MsgConstants;
33 import org.argeo.slc.msg.ObjectList;
34 import org.argeo.slc.msg.event.SlcEvent;
35 import org.argeo.slc.process.RealizedFlow;
36 import org.argeo.slc.process.SlcExecution;
37 import org.argeo.slc.runtime.SlcAgentDescriptor;
38 import org.argeo.slc.server.client.SlcServerHttpClient;
39
40 public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
41 implements SlcServerHttpClient {
42
43 protected final static String PARAM_AGENT_ID = "agentId";
44
45 private final static Log log = LogFactory
46 .getLog(SlcServerHttpClientImpl.class);
47
48 private Long serverReadyTimeout = 120 * 1000l;
49
50 public void waitForSlcExecutionFinished(SlcExecution slcExecution,
51 Long timeout) {
52 if (slcExecution.getStatus().equals(SlcExecution.STATUS_FINISHED))
53 return;
54
55 long begin = System.currentTimeMillis();
56 while (System.currentTimeMillis() - begin < timeout(timeout)) {
57 SlcEvent event = pollEvent(timeout);
58 String slcExecutionId = event.getHeaders().get(
59 MsgConstants.PROPERTY_SLC_EXECUTION_ID);
60 String status = event.getHeaders().get(
61 MsgConstants.PROPERTY_SLC_EXECUTION_STATUS);
62 if (slcExecutionId.equals(slcExecution.getUuid())
63 && status.equals(SlcExecution.STATUS_FINISHED)) {
64 return;
65 }
66 }
67 throw new SlcException("SLC Execution not completed after timeout "
68 + timeout(timeout) + " elapsed.");
69 }
70
71 public SlcEvent pollEvent(Long timeout) {
72 long begin = System.currentTimeMillis();
73 while (System.currentTimeMillis() - begin < timeout(timeout)) {
74 Object obj = callService(POLL_EVENT, null);
75 if (obj instanceof ExecutionAnswer) {
76 ExecutionAnswer answer = (ExecutionAnswer) obj;
77 if (answer.isError())
78 throw new SlcException(
79 "Unexpected exception when polling event: "
80 + answer.getMessage());
81 } else {
82 return (SlcEvent) obj;
83 }
84 }
85 throw new SlcException("No event received after timeout "
86 + timeout(timeout) + " elapsed.");
87 }
88
89 public ExecutionAnswer addEventListener(String eventType, String eventFilter) {
90 Map<String, String> parameters = new HashMap<String, String>();
91 parameters.put(SlcEvent.EVENT_TYPE, eventType);
92 parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
93 return callService(ADD_EVENT_LISTENER, parameters);
94 }
95
96 public ExecutionAnswer removeEventListener(String eventType,
97 String eventFilter) {
98 Map<String, String> parameters = new HashMap<String, String>();
99 parameters.put(SlcEvent.EVENT_TYPE, eventType);
100 parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
101 return callService(REMOVE_EVENT_LISTENER, parameters);
102 }
103
104 public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow) {
105 SlcExecution slcExecution = new SlcExecution();
106 slcExecution.setUuid(UUID.randomUUID().toString());
107
108 slcExecution.getRealizedFlows().add(realizedFlow);
109
110 Map<String, String> parameters = new HashMap<String, String>();
111 parameters.put(MsgConstants.PROPERTY_SLC_AGENT_ID, agentId);
112 ExecutionAnswer answer = callService(NEW_SLC_EXECUTION, parameters,
113 slcExecution);
114 if (!answer.isOk())
115 throw new SlcException("Could not start flow on agent " + agentId
116 + ": " + answer.getMessage());
117 return slcExecution;
118 }
119
120 public SlcExecution startFlowDefault(String moduleName, String flowName,
121 Map<String, Object> args) {
122 SlcAgentDescriptor agentDescriptor = waitForOneAgent();
123 List<ExecutionModuleDescriptor> lst = listModuleDescriptors(agentDescriptor
124 .getUuid());
125 ExecutionModuleDescriptor moduleDescMinimal = findModule(lst,
126 moduleName);
127 if (moduleDescMinimal == null)
128 throw new SlcException("Cannot find module " + moduleName);
129 String moduleVersion = moduleDescMinimal.getVersion();
130
131 ExecutionModuleDescriptor moduleDesc = getModuleDescriptor(
132 agentDescriptor.getUuid(), moduleName, moduleVersion);
133
134 RealizedFlow realizedFlow = new RealizedFlow();
135 realizedFlow.setModuleName(moduleName);
136 realizedFlow.setModuleVersion(moduleDesc.getVersion());
137
138 ExecutionFlowDescriptor flowDescriptor = findFlow(moduleDesc, flowName);
139 if (args != null) {
140 for (String key : args.keySet()) {
141 if (flowDescriptor.getValues().containsKey(key)) {
142 flowDescriptor.getValues().put(key, args.get(key));
143 }
144 }
145 }
146 realizedFlow.setFlowDescriptor(flowDescriptor);
147
148 return startFlow(agentDescriptor.getUuid(), realizedFlow);
149
150 // FIXME: polling not working when called from test: no unique
151 // session is created on server side
152 // SlcExecution slcExecutionFinished = null;
153 // try {
154 // addEventListener(
155 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
156 // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(),
157 // realizedFlow);
158 //
159 // waitForSlcExecutionFinished(slcExecution, null);
160 //
161 // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null);
162 // for (Serializable sr : ol.getObjects()) {
163 // SlcExecution se = (SlcExecution) sr;
164 // if (se.getUuid().equals(slcExecution.getUuid())) {
165 // slcExecutionFinished = se;
166 // break;
167 // }
168 // }
169 //
170 // } finally {
171 // removeEventListener(
172 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
173 // }
174 //
175 // if (slcExecutionFinished == null)
176 // throw new SlcException("No finished SLC Execution.");
177 // return slcExecutionFinished;
178 }
179
180 public static ExecutionModuleDescriptor findModule(
181 List<ExecutionModuleDescriptor> lst, String moduleName) {
182 ExecutionModuleDescriptor moduleDesc = null;
183 for (ExecutionModuleDescriptor desc : lst) {
184 if (desc.getName().equals(moduleName)) {
185 if (moduleDesc != null)
186 throw new SlcException(
187 "There is more than one module named " + moduleName
188 + " (versions: " + moduleDesc + " and "
189 + desc.getVersion() + ")");
190 moduleDesc = desc;
191 }
192 }
193 return moduleDesc;
194 }
195
196 public static ExecutionFlowDescriptor findFlow(
197 ExecutionModuleDescriptor moduleDesc, String flowName) {
198 ExecutionFlowDescriptor flowDesc = null;
199 for (ExecutionFlowDescriptor desc : moduleDesc.getExecutionFlows()) {
200 if (desc.getName().equals(flowName)) {
201 flowDesc = desc;
202 }
203 }
204 return flowDesc;
205 }
206
207 public List<ExecutionModuleDescriptor> listModuleDescriptors(String agentId) {
208 Map<String, String> parameters = new HashMap<String, String>();
209 parameters.put(PARAM_AGENT_ID, agentId);
210
211 List<ExecutionModuleDescriptor> moduleDescriptors = new ArrayList<ExecutionModuleDescriptor>();
212 ObjectList ol = callService(LIST_MODULE_DESCRIPTORS, parameters);
213 ol.fill(moduleDescriptors);
214 return moduleDescriptors;
215 }
216
217 public ExecutionModuleDescriptor getModuleDescriptor(String agentId,
218 String moduleName, String version) {
219 Map<String, String> parameters = new HashMap<String, String>();
220 parameters.put(PARAM_AGENT_ID, agentId);
221 parameters.put("moduleName", moduleName);
222 parameters.put("version", version);
223 ExecutionModuleDescriptor moduleDescriptor = callService(
224 GET_MODULE_DESCRIPTOR, parameters);
225 return moduleDescriptor;
226 }
227
228 public SlcAgentDescriptor waitForOneAgent() {
229 ObjectList objectList = callServiceSafe(LIST_AGENTS, null,
230 new Condition<ObjectList>() {
231 public Boolean check(ObjectList obj) {
232 int size = obj.getObjects().size();
233 if (log.isTraceEnabled())
234 log.trace("Object list size: " + size);
235 return size == 1;
236 }
237 }, null);
238 return (SlcAgentDescriptor) objectList.getObjects().get(0);
239 }
240
241 public void waitForServerToBeReady() {
242 ExecutionAnswer answer = callServiceSafe(IS_SERVER_READY, null, null,
243 serverReadyTimeout);
244 if (!answer.isOk())
245 throw new SlcException("Server is not ready: " + answer);
246 }
247
248 /**
249 * Timeout in ms after which the client will stop waiting for the server to
250 * be ready and throw an exception. Default is 120s.
251 */
252 public void setServerReadyTimeout(Long serverReadyTimeout) {
253 this.serverReadyTimeout = serverReadyTimeout;
254 }
255
256 }