]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.server/src/main/java/org/argeo/slc/server/client/impl/SlcServerHttpClientImpl.java
301afb5b1007327d1f440bc07d2e30ec586e4f31
[gpl/argeo-slc.git] / runtime / org.argeo.slc.server / src / main / java / org / argeo / slc / server / client / impl / SlcServerHttpClientImpl.java
1 package org.argeo.slc.server.client.impl;
2
3 import java.io.Serializable;
4 import java.util.ArrayList;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.UUID;
9
10 import org.apache.commons.logging.Log;
11 import org.apache.commons.logging.LogFactory;
12 import org.argeo.slc.Condition;
13 import org.argeo.slc.SlcException;
14 import org.argeo.slc.execution.ExecutionFlowDescriptor;
15 import org.argeo.slc.execution.ExecutionModuleDescriptor;
16 import org.argeo.slc.msg.ExecutionAnswer;
17 import org.argeo.slc.msg.MsgConstants;
18 import org.argeo.slc.msg.ObjectList;
19 import org.argeo.slc.msg.event.SlcEvent;
20 import org.argeo.slc.process.RealizedFlow;
21 import org.argeo.slc.process.SlcExecution;
22 import org.argeo.slc.runtime.SlcAgentDescriptor;
23 import org.argeo.slc.server.client.SlcServerHttpClient;
24 import org.argeo.slc.services.EventPublisherAspect;
25
26 public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
27 implements SlcServerHttpClient {
28
29 protected final static String PARAM_AGENT_ID = "agentId";
30
31 private final static Log log = LogFactory
32 .getLog(SlcServerHttpClientImpl.class);
33
34 private Long serverReadyTimeout = 120 * 1000l;
35
36 public void waitForSlcExecutionFinished(SlcExecution slcExecution,
37 Long timeout) {
38 if (slcExecution.getStatus().equals(SlcExecution.STATUS_FINISHED))
39 return;
40
41 long begin = System.currentTimeMillis();
42 while (System.currentTimeMillis() - begin < timeout(timeout)) {
43 SlcEvent event = pollEvent(timeout);
44 String slcExecutionId = event.getHeaders().get(
45 MsgConstants.PROPERTY_SLC_EXECUTION_ID);
46 String status = event.getHeaders().get(
47 MsgConstants.PROPERTY_SLC_EXECUTION_STATUS);
48 if (slcExecutionId.equals(slcExecution.getUuid())
49 && status.equals(SlcExecution.STATUS_FINISHED)) {
50 return;
51 }
52 }
53 throw new SlcException("SLC Execution not completed after timeout "
54 + timeout(timeout) + " elapsed.");
55 }
56
57 public SlcEvent pollEvent(Long timeout) {
58 long begin = System.currentTimeMillis();
59 while (System.currentTimeMillis() - begin < timeout(timeout)) {
60 Object obj = callService(POLL_EVENT, null);
61 if (obj instanceof ExecutionAnswer) {
62 ExecutionAnswer answer = (ExecutionAnswer) obj;
63 if (answer.isError())
64 throw new SlcException(
65 "Unexpected exception when pollign event: "
66 + answer.getMessage());
67 } else {
68 return (SlcEvent) obj;
69 }
70 }
71 throw new SlcException("No event received after timeout "
72 + timeout(timeout) + " elapsed.");
73 }
74
75 public ExecutionAnswer addEventListener(String eventType, String eventFilter) {
76 Map<String, String> parameters = new HashMap<String, String>();
77 parameters.put(SlcEvent.EVENT_TYPE, eventType);
78 parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
79 return callService(ADD_EVENT_LISTENER, parameters);
80 }
81
82 public ExecutionAnswer removeEventListener(String eventType,
83 String eventFilter) {
84 Map<String, String> parameters = new HashMap<String, String>();
85 parameters.put(SlcEvent.EVENT_TYPE, eventType);
86 parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
87 return callService(REMOVE_EVENT_LISTENER, parameters);
88 }
89
90 public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow) {
91 SlcExecution slcExecution = new SlcExecution();
92 slcExecution.setUuid(UUID.randomUUID().toString());
93
94 slcExecution.getRealizedFlows().add(realizedFlow);
95
96 Map<String, String> parameters = new HashMap<String, String>();
97 parameters.put(MsgConstants.PROPERTY_SLC_AGENT_ID, agentId);
98 ExecutionAnswer answer = callService(NEW_SLC_EXECUTION, parameters,
99 slcExecution);
100 if (!answer.isOk())
101 throw new SlcException("Could not start flow on agent " + agentId
102 + ": " + answer.getMessage());
103 return slcExecution;
104 }
105
106 public SlcExecution startFlowDefault(String moduleName, String flowName,
107 Map<String, Object> args) {
108 SlcAgentDescriptor agentDescriptor = waitForOneAgent();
109 List<ExecutionModuleDescriptor> lst = listModuleDescriptors(agentDescriptor
110 .getUuid());
111 ExecutionModuleDescriptor moduleDescMinimal = findModule(lst,
112 moduleName);
113 String moduleVersion = moduleDescMinimal.getVersion();
114
115 ExecutionModuleDescriptor moduleDesc = getModuleDescriptor(
116 agentDescriptor.getUuid(), moduleName, moduleVersion);
117
118 RealizedFlow realizedFlow = new RealizedFlow();
119 realizedFlow.setModuleName(moduleName);
120 realizedFlow.setModuleVersion(moduleDesc.getVersion());
121
122 ExecutionFlowDescriptor flowDescriptor = findFlow(moduleDesc, flowName);
123 if (args != null) {
124 for (String key : args.keySet()) {
125 if (flowDescriptor.getValues().containsKey(key)) {
126 flowDescriptor.getValues().put(key, args.get(key));
127 }
128 }
129 }
130 realizedFlow.setFlowDescriptor(flowDescriptor);
131
132 return startFlow(agentDescriptor.getUuid(), realizedFlow);
133
134 // FIXME: polling not working when called from test: no unique
135 // session is created on server side
136 // SlcExecution slcExecutionFinished = null;
137 // try {
138 // addEventListener(
139 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
140 // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(),
141 // realizedFlow);
142 //
143 // waitForSlcExecutionFinished(slcExecution, null);
144 //
145 // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null);
146 // for (Serializable sr : ol.getObjects()) {
147 // SlcExecution se = (SlcExecution) sr;
148 // if (se.getUuid().equals(slcExecution.getUuid())) {
149 // slcExecutionFinished = se;
150 // break;
151 // }
152 // }
153 //
154 // } finally {
155 // removeEventListener(
156 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
157 // }
158 //
159 // if (slcExecutionFinished == null)
160 // throw new SlcException("No finished SLC Execution.");
161 // return slcExecutionFinished;
162 }
163
164 public static ExecutionModuleDescriptor findModule(
165 List<ExecutionModuleDescriptor> lst, String moduleName) {
166 ExecutionModuleDescriptor moduleDesc = null;
167 for (ExecutionModuleDescriptor desc : lst) {
168 if (desc.getName().equals(moduleName)) {
169 if (moduleDesc != null)
170 throw new SlcException(
171 "There is more than one module named " + moduleName
172 + " (versions: " + moduleDesc + " and "
173 + desc.getVersion() + ")");
174 moduleDesc = desc;
175 }
176 }
177 return moduleDesc;
178 }
179
180 public static ExecutionFlowDescriptor findFlow(
181 ExecutionModuleDescriptor moduleDesc, String flowName) {
182 ExecutionFlowDescriptor flowDesc = null;
183 for (ExecutionFlowDescriptor desc : moduleDesc.getExecutionFlows()) {
184 if (desc.getName().equals(flowName)) {
185 flowDesc = desc;
186 }
187 }
188 return flowDesc;
189 }
190
191 public List<ExecutionModuleDescriptor> listModuleDescriptors(String agentId) {
192 Map<String, String> parameters = new HashMap<String, String>();
193 parameters.put(PARAM_AGENT_ID, agentId);
194
195 List<ExecutionModuleDescriptor> moduleDescriptors = new ArrayList<ExecutionModuleDescriptor>();
196 ObjectList ol = callService(LIST_MODULE_DESCRIPTORS, parameters);
197 ol.fill(moduleDescriptors);
198 return moduleDescriptors;
199 }
200
201 public ExecutionModuleDescriptor getModuleDescriptor(String agentId,
202 String moduleName, String version) {
203 Map<String, String> parameters = new HashMap<String, String>();
204 parameters.put(PARAM_AGENT_ID, agentId);
205 parameters.put("moduleName", moduleName);
206 parameters.put("version", version);
207 ExecutionModuleDescriptor moduleDescriptor = callService(
208 GET_MODULE_DESCRIPTOR, parameters);
209 return moduleDescriptor;
210 }
211
212 public SlcAgentDescriptor waitForOneAgent() {
213 ObjectList objectList = callServiceSafe(LIST_AGENTS, null,
214 new Condition<ObjectList>() {
215 public Boolean check(ObjectList obj) {
216 int size = obj.getObjects().size();
217 if (log.isTraceEnabled())
218 log.trace("Object list size: " + size);
219 return size == 1;
220 }
221 }, null);
222 return (SlcAgentDescriptor) objectList.getObjects().get(0);
223 }
224
225 public void waitForServerToBeReady() {
226 ExecutionAnswer answer = callServiceSafe(IS_SERVER_READY, null, null,
227 serverReadyTimeout);
228 if (!answer.isOk())
229 throw new SlcException("Server is not ready: " + answer);
230 }
231
232 /**
233 * Timeout in ms after which the client will stop waiting for the server to
234 * be ready and throw an exception. Default is 120s.
235 */
236 public void setServerReadyTimeout(Long serverReadyTimeout) {
237 this.serverReadyTimeout = serverReadyTimeout;
238 }
239
240 }