]> git.argeo.org Git - gpl/argeo-slc.git/blob - runtime/org.argeo.slc.launcher/src/main/java/org/argeo/slc/server/client/impl/SlcServerHttpClientImpl.java
Restructure HTTP service client
[gpl/argeo-slc.git] / runtime / org.argeo.slc.launcher / src / main / java / org / argeo / slc / server / client / impl / SlcServerHttpClientImpl.java
1 package org.argeo.slc.server.client.impl;
2
3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.List;
6 import java.util.Map;
7 import java.util.UUID;
8
9 import org.apache.commons.logging.Log;
10 import org.apache.commons.logging.LogFactory;
11 import org.argeo.slc.Condition;
12 import org.argeo.slc.SlcException;
13 import org.argeo.slc.execution.ExecutionFlowDescriptor;
14 import org.argeo.slc.execution.ExecutionModuleDescriptor;
15 import org.argeo.slc.msg.ExecutionAnswer;
16 import org.argeo.slc.msg.MsgConstants;
17 import org.argeo.slc.msg.ObjectList;
18 import org.argeo.slc.msg.event.SlcEvent;
19 import org.argeo.slc.process.RealizedFlow;
20 import org.argeo.slc.process.SlcExecution;
21 import org.argeo.slc.runtime.SlcAgentDescriptor;
22 import org.argeo.slc.server.client.SlcServerHttpClient;
23
24 public class SlcServerHttpClientImpl extends AbstractHttpServicesClient
25 implements SlcServerHttpClient {
26
27 protected final static String PARAM_AGENT_ID = "agentId";
28
29 private final static Log log = LogFactory
30 .getLog(SlcServerHttpClientImpl.class);
31
32 private Long serverReadyTimeout = 120 * 1000l;
33
34 public void waitForSlcExecutionFinished(SlcExecution slcExecution,
35 Long timeout) {
36 if (slcExecution.getStatus().equals(SlcExecution.STATUS_FINISHED))
37 return;
38
39 long begin = System.currentTimeMillis();
40 while (System.currentTimeMillis() - begin < timeout(timeout)) {
41 SlcEvent event = pollEvent(timeout);
42 String slcExecutionId = event.getHeaders().get(
43 MsgConstants.PROPERTY_SLC_EXECUTION_ID);
44 String status = event.getHeaders().get(
45 MsgConstants.PROPERTY_SLC_EXECUTION_STATUS);
46 if (slcExecutionId.equals(slcExecution.getUuid())
47 && status.equals(SlcExecution.STATUS_FINISHED)) {
48 return;
49 }
50 }
51 throw new SlcException("SLC Execution not completed after timeout "
52 + timeout(timeout) + " elapsed.");
53 }
54
55 public SlcEvent pollEvent(Long timeout) {
56 long begin = System.currentTimeMillis();
57 while (System.currentTimeMillis() - begin < timeout(timeout)) {
58 Object obj = callService(POLL_EVENT, null);
59 if (obj instanceof ExecutionAnswer) {
60 ExecutionAnswer answer = (ExecutionAnswer) obj;
61 if (answer.isError())
62 throw new SlcException(
63 "Unexpected exception when polling event: "
64 + answer.getMessage());
65 } else {
66 return (SlcEvent) obj;
67 }
68 }
69 throw new SlcException("No event received after timeout "
70 + timeout(timeout) + " elapsed.");
71 }
72
73 public ExecutionAnswer addEventListener(String eventType, String eventFilter) {
74 Map<String, String> parameters = new HashMap<String, String>();
75 parameters.put(SlcEvent.EVENT_TYPE, eventType);
76 parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
77 return callService(ADD_EVENT_LISTENER, parameters);
78 }
79
80 public ExecutionAnswer removeEventListener(String eventType,
81 String eventFilter) {
82 Map<String, String> parameters = new HashMap<String, String>();
83 parameters.put(SlcEvent.EVENT_TYPE, eventType);
84 parameters.put(SlcEvent.EVENT_FILTER, eventFilter);
85 return callService(REMOVE_EVENT_LISTENER, parameters);
86 }
87
88 public SlcExecution startFlow(String agentId, RealizedFlow realizedFlow) {
89 SlcExecution slcExecution = new SlcExecution();
90 slcExecution.setUuid(UUID.randomUUID().toString());
91
92 slcExecution.getRealizedFlows().add(realizedFlow);
93
94 Map<String, String> parameters = new HashMap<String, String>();
95 parameters.put(MsgConstants.PROPERTY_SLC_AGENT_ID, agentId);
96 ExecutionAnswer answer = callService(NEW_SLC_EXECUTION, parameters,
97 slcExecution);
98 if (!answer.isOk())
99 throw new SlcException("Could not start flow on agent " + agentId
100 + ": " + answer.getMessage());
101 return slcExecution;
102 }
103
104 public SlcExecution startFlowDefault(String moduleName, String flowName,
105 Map<String, Object> args) {
106 SlcAgentDescriptor agentDescriptor = waitForOneAgent();
107 List<ExecutionModuleDescriptor> lst = listModuleDescriptors(agentDescriptor
108 .getUuid());
109 ExecutionModuleDescriptor moduleDescMinimal = findModule(lst,
110 moduleName);
111 if (moduleDescMinimal == null)
112 throw new SlcException("Cannot find module " + moduleName);
113 String moduleVersion = moduleDescMinimal.getVersion();
114
115 ExecutionModuleDescriptor moduleDesc = getModuleDescriptor(
116 agentDescriptor.getUuid(), moduleName, moduleVersion);
117
118 RealizedFlow realizedFlow = new RealizedFlow();
119 realizedFlow.setModuleName(moduleName);
120 realizedFlow.setModuleVersion(moduleDesc.getVersion());
121
122 ExecutionFlowDescriptor flowDescriptor = findFlow(moduleDesc, flowName);
123 if (args != null) {
124 for (String key : args.keySet()) {
125 if (flowDescriptor.getValues().containsKey(key)) {
126 flowDescriptor.getValues().put(key, args.get(key));
127 }
128 }
129 }
130 realizedFlow.setFlowDescriptor(flowDescriptor);
131
132 return startFlow(agentDescriptor.getUuid(), realizedFlow);
133
134 // FIXME: polling not working when called from test: no unique
135 // session is created on server side
136 // SlcExecution slcExecutionFinished = null;
137 // try {
138 // addEventListener(
139 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
140 // SlcExecution slcExecution = startFlow(agentDescriptor.getUuid(),
141 // realizedFlow);
142 //
143 // waitForSlcExecutionFinished(slcExecution, null);
144 //
145 // ObjectList ol = callService(LIST_SLC_EXECUTIONS, null);
146 // for (Serializable sr : ol.getObjects()) {
147 // SlcExecution se = (SlcExecution) sr;
148 // if (se.getUuid().equals(slcExecution.getUuid())) {
149 // slcExecutionFinished = se;
150 // break;
151 // }
152 // }
153 //
154 // } finally {
155 // removeEventListener(
156 // EventPublisherAspect.EVT_UPDATE_SLC_EXECUTION_STATUS, null);
157 // }
158 //
159 // if (slcExecutionFinished == null)
160 // throw new SlcException("No finished SLC Execution.");
161 // return slcExecutionFinished;
162 }
163
164 public static ExecutionModuleDescriptor findModule(
165 List<ExecutionModuleDescriptor> lst, String moduleName) {
166 ExecutionModuleDescriptor moduleDesc = null;
167 for (ExecutionModuleDescriptor desc : lst) {
168 if (desc.getName().equals(moduleName)) {
169 if (moduleDesc != null)
170 throw new SlcException(
171 "There is more than one module named " + moduleName
172 + " (versions: " + moduleDesc + " and "
173 + desc.getVersion() + ")");
174 moduleDesc = desc;
175 }
176 }
177 return moduleDesc;
178 }
179
180 public static ExecutionFlowDescriptor findFlow(
181 ExecutionModuleDescriptor moduleDesc, String flowName) {
182 ExecutionFlowDescriptor flowDesc = null;
183 for (ExecutionFlowDescriptor desc : moduleDesc.getExecutionFlows()) {
184 if (desc.getName().equals(flowName)) {
185 flowDesc = desc;
186 }
187 }
188 return flowDesc;
189 }
190
191 public List<ExecutionModuleDescriptor> listModuleDescriptors(String agentId) {
192 Map<String, String> parameters = new HashMap<String, String>();
193 parameters.put(PARAM_AGENT_ID, agentId);
194
195 List<ExecutionModuleDescriptor> moduleDescriptors = new ArrayList<ExecutionModuleDescriptor>();
196 ObjectList ol = callService(LIST_MODULE_DESCRIPTORS, parameters);
197 ol.fill(moduleDescriptors);
198 return moduleDescriptors;
199 }
200
201 public ExecutionModuleDescriptor getModuleDescriptor(String agentId,
202 String moduleName, String version) {
203 Map<String, String> parameters = new HashMap<String, String>();
204 parameters.put(PARAM_AGENT_ID, agentId);
205 parameters.put("moduleName", moduleName);
206 parameters.put("version", version);
207 ExecutionModuleDescriptor moduleDescriptor = callService(
208 GET_MODULE_DESCRIPTOR, parameters);
209 return moduleDescriptor;
210 }
211
212 public SlcAgentDescriptor waitForOneAgent() {
213 ObjectList objectList = callServiceSafe(LIST_AGENTS, null,
214 new Condition<ObjectList>() {
215 public Boolean check(ObjectList obj) {
216 int size = obj.getObjects().size();
217 if (log.isTraceEnabled())
218 log.trace("Object list size: " + size);
219 return size == 1;
220 }
221 }, null);
222 return (SlcAgentDescriptor) objectList.getObjects().get(0);
223 }
224
225 public void waitForServerToBeReady() {
226 ExecutionAnswer answer = callServiceSafe(IS_SERVER_READY, null, null,
227 serverReadyTimeout);
228 if (!answer.isOk())
229 throw new SlcException("Server is not ready: " + answer);
230 }
231
232 /**
233 * Timeout in ms after which the client will stop waiting for the server to
234 * be ready and throw an exception. Default is 120s.
235 */
236 public void setServerReadyTimeout(Long serverReadyTimeout) {
237 this.serverReadyTimeout = serverReadyTimeout;
238 }
239
240 }