1
2
3
4
5
6
7
8 package net.kwfgrid.gwes;
9
10 import net.kwfgrid.gwes.exception.DatabaseException;
11 import net.kwfgrid.gwes.exception.NoResultForDatabaseQueryException;
12 import net.kwfgrid.gwes.exception.NoSuchWorkflowException;
13 import net.kwfgrid.gwes.xmldbclient.*;
14 import net.kwfgrid.gwes.prorater.ResourceQualityCalculator;
15 import net.kwfgrid.gwes.workflowanalyzer.DistributionSet;
16 import net.kwfgrid.gwes.workflowanalyzer.LongDistribution;
17 import net.kwfgrid.gwes.util.StringUtils;
18 import net.kwfgrid.gworkflowdl.protocol.server.ServerWorkflow;
19 import net.kwfgrid.gworkflowdl.structure.Token;
20 import net.kwfgrid.gworkflowdl.structure.Workflow;
21 import net.kwfgrid.gworkflowdl.structure.Property;
22 import net.kwfgrid.gworkflowdl.structure.OperationCandidate;
23 import org.antlr.stringtemplate.*;
24 import org.antlr.stringtemplate.language.DefaultTemplateLexer;
25 import org.apache.log4j.Logger;
26 import org.apache.axiom.attachments.ByteArrayDataSource;
27
28 import javax.activation.DataHandler;
29 import java.net.URL;
30 import java.rmi.RemoteException;
31 import java.text.DecimalFormat;
32 import java.text.NumberFormat;
33 import java.util.*;
34
35
36
37
38
39
40
41
42
43 public class XMLDB {
44
45
46
47
48 final static Logger logger = Logger.getLogger(XMLDB.class);
49
50 private static XMLDB _i;
51 private static String gworkflowdlBaseCollection = "/db/gworkflowdl";
52 private static String dgrdlBaseCollection = System.getProperty(Constants.PROP_SYSTEM_RESOURCE_REPOSITORY_DGRDL_COLLECTION, "/db/dgrdl");
53 private static String xsdPath = System.getProperty(Constants.PROP_SYSTEM_GWORKFLOWDL_XSD_PATH, "http://www.gridworkflow.org/kwfgrid/src/xsd");
54
55 private static String[] STATUS_ARRAY_NAMES = {
56 "ID",
57 Constants.PROP_STATUS,
58 Constants.PROP_BIRTHDAY_MS,
59 Constants.PROP_DURATION_UNDEFINED_MS,
60 Constants.PROP_DURATION_INITIATED_MS,
61 Constants.PROP_DURATION_RUNNING_MS,
62 Constants.PROP_DURATION_ACTIVE_MS,
63 Constants.PROP_DURATION_SUSPENDED_MS,
64 Constants.PROP_DURATION_TOTAL_MS,
65 Constants.PROP_END_TIME_MS,
66 Constants.PROP_LEVEL,
67 "description",
68 Constants.PROP_USER_ID
69 };
70
71 private String user;
72 private String password;
73
74
75
76
77
78
79
80
81
82
83 private QueryInterface query;
84 private AdminInterface admin;
85
86 private DecimalFormat intformat;
87
88 private XMLDBSessionManager sessionManager;
89
90 private StringTemplateGroup xqueryWorkflowGroup;
91
92
93
94
95
96 private XMLDB() throws Exception {
97 this(System.getProperty(Constants.PROP_SYSTEM_WORKFLOW_REPOSITORY_URL), System.getProperty(Constants.PROP_SYSTEM_WORKFLOW_REPOSITORY_USER), System.getProperty(Constants.PROP_SYSTEM_WORKFLOW_REPOSITORY_PASSWORD));
98 }
99
100 private XMLDB(String xmldbUrl, String user, String password) throws Exception {
101 this.user = user;
102 this.password = password;
103 String queryServiceStr = xmldbUrl + "/services/Query";
104 String adminServiceStr = xmldbUrl + "/services/Admin";
105 if (xmldbUrl == null || user == null || password == null) {
106 throw new DatabaseException("The XMLDB client has not been configured correctly. Please set the properties "
107 + Constants.PROP_SYSTEM_WORKFLOW_REPOSITORY_URL + ", "
108 + Constants.PROP_SYSTEM_WORKFLOW_REPOSITORY_USER + ", and "
109 + Constants.PROP_SYSTEM_WORKFLOW_REPOSITORY_PASSWORD
110 + " in the file gwes.properties");
111 }
112 URL adminServiceUrl = new URL(adminServiceStr);
113
114
115 intformat = (DecimalFormat) NumberFormat.getIntegerInstance();
116 intformat.applyPattern("0000000000");
117
118 query = new RemoteQuery(queryServiceStr);
119 logger.debug("connected to " + queryServiceStr);
120
121 admin = new RemoteAdmin(adminServiceStr);
122 logger.debug("connected to " + adminServiceUrl);
123
124
125 sessionManager = new XMLDBSessionManager(this);
126
127
128 xqueryWorkflowGroup = loadSTGFileFromClasspath("xquery_workflow");
129
130 }
131
132
133
134
135
136
137 public synchronized static XMLDB getInstance() throws DatabaseException {
138 if (_i == null) try {
139 _i = new XMLDB();
140 } catch (Exception e) {
141 throw new DatabaseException("Could not connect to XML database: " + e, e);
142 }
143 return _i;
144 }
145
146
147
148
149
150
151 public synchronized static XMLDB getFirstInstance(String xmldbUrl, String user, String password) throws DatabaseException {
152 if (_i == null) try {
153 _i = new XMLDB(xmldbUrl, user, password);
154 return _i;
155 } catch (Exception e) {
156 throw new DatabaseException("Could not connect to XML database: " + e, e);
157 } else {
158 throw new DatabaseException("XML database already instantiated! Use getInstance() to connect to this instance.");
159 }
160 }
161
162
163
164
165
166
167
168 protected String connectQuery() throws DatabaseException {
169 try {
170 return query.connect(user, password);
171 } catch (RemoteException e) {
172 throw new DatabaseException("Could not connect to database: " + e, e);
173 }
174 }
175
176
177
178
179
180
181
182 protected void disconnectQuery(String sessionId) throws DatabaseException {
183 try {
184 query.disconnect(sessionId);
185 } catch (RemoteException e) {
186 throw new DatabaseException("Could not disconnect from database: " + e, e);
187 }
188 }
189
190
191
192
193
194
195
196 protected String connectAdmin() throws DatabaseException {
197 try {
198 return admin.connect(user, password);
199 } catch (RemoteException e) {
200 throw new DatabaseException("Could not connect to database: " + e, e);
201 }
202 }
203
204
205
206
207
208
209
210 protected void disconnectAdmin(String sessionId) throws DatabaseException {
211 try {
212 admin.disconnect(sessionId);
213 } catch (RemoteException e) {
214 throw new DatabaseException("Could not disconnect from database: " + e, e);
215 }
216 }
217
218 public String[] xQueryResult(String xquery) throws DatabaseException {
219 logger.debug(xquery);
220 try {
221 synchronized (sessionManager.queryLock) {
222 String sessionid = sessionManager.getQuerySessionId();
223 QueryServiceStub.QueryResponse r = query.query(sessionid, xquery);
224 if (r.getHits() > 0) {
225 String[] results = query.retrieve(sessionid, 1, 0, false, false, "none");
226 if (results == null) throw new NoResultForDatabaseQueryException("Result is null for database XQuery \"" + xquery +"\"");
227 if(logger.isDebugEnabled()) {
228 logger.debug("XQuery response:");
229 for (String result : results) {
230 logger.debug(result);
231 }
232 }
233 return results;
234 } else {
235 throw new NoResultForDatabaseQueryException("No result for database XQuery \"" + xquery +"\"");
236 }
237 }
238 } catch (RemoteException e) {
239 sessionManager.resetSession();
240 throw new DatabaseException("Could not access remote database: " + e, e);
241 }
242 }
243
244 public String xQueryResult0(String xquery) throws DatabaseException {
245 logger.debug(xquery);
246 try {
247 synchronized (sessionManager.queryLock) {
248 String sessionid = sessionManager.getQuerySessionId();
249 QueryServiceStub.QueryResponse r = query.query(sessionid, xquery);
250 if (r.getHits() > 0) {
251 String[] results = query.retrieve(sessionid, 1, 1, false, false, "none");
252 if (results == null) throw new NoResultForDatabaseQueryException("Result is null for database XQuery \"" + xquery +"\"");
253 if(logger.isDebugEnabled()) {
254 logger.debug("XQuery response:"+ results[0]);
255 }
256 return results[0];
257 } else {
258 throw new NoResultForDatabaseQueryException("No result for database XQuery \"" + xquery +"\"");
259 }
260 }
261 } catch (RemoteException e) {
262 sessionManager.resetSession();
263 throw new DatabaseException("Could not access remote database: " + e, e);
264 }
265 }
266
267 public QueryServiceStub.QueryResponse xQueryResponse(String xquery) throws DatabaseException {
268 logger.debug(xquery);
269 QueryServiceStub.QueryResponse r;
270 try {
271 synchronized (sessionManager.queryLock) {
272 String sessionid = sessionManager.getQuerySessionId();
273 r = query.query(sessionid, xquery);
274 }
275 } catch (RemoteException e) {
276 sessionManager.resetSession();
277 throw new DatabaseException("Could not access remote database: " + e, e);
278 }
279 return r;
280 }
281
282 public String storeWorkflow(Workflow workflow) throws DatabaseException {
283 String path;
284 try {
285 String collection = gworkflowdlBaseCollection + "/" + workflow.getID();
286 byte[] wf;
287 ServerWorkflow serverWorkflow = (ServerWorkflow) workflow;
288
289 synchronized (serverWorkflow.getStructureLock()) {
290 path = new StringBuffer()
291 .append(collection)
292 .append("/")
293 .append(intformat.format(serverWorkflow.getVersionNumber()))
294 .append(".xml").toString();
295 wf = serverWorkflow.getXML().getBytes();
296 }
297
298 synchronized (sessionManager.adminLock) {
299 String sessionid = sessionManager.getAdminSessionId();
300 admin.createCollection(sessionid, collection);
301 DataHandler dh = new DataHandler(new ByteArrayDataSource(wf));
302 admin.store(sessionid, dh, "UTF-8", path, true);
303 }
304 logger.debug("stored " + path);
305 } catch (ClassCastException e) {
306 logger.error("ERROR in GWorkflowDL library: " + e, e);
307 throw new DatabaseException("Could not store workflow: " + e, e);
308 } catch (Exception e) {
309 sessionManager.resetSession();
310 throw new DatabaseException("Could not store workflow: " + e, e);
311 }
312 return path;
313 }
314
315 public int getStatus(String workflowID) throws DatabaseException, NoSuchWorkflowException {
316 logger.debug("Retrieving workflow status from XML database ...");
317 String collection = gworkflowdlBaseCollection + "/" + workflowID;
318 String document = getLatestWorkflowDocument(collection);
319 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getStatus");
320 t.setAttribute("document", document);
321 String r = xQueryResult0(t.toString());
322 return WorkflowStatus.getStatusAsInt(r);
323 }
324
325 public String[] getCheckpoints(String workflowID) throws DatabaseException {
326 logger.debug("Retrieving checkpoints for workflow \"" + workflowID + "\" from XML database ...");
327 String[] workflowCheckpoints;
328 try {
329 String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
330 logger.debug("collection = " + collectionStr);
331 synchronized (sessionManager.queryLock) {
332 String sessionid = sessionManager.getQuerySessionId();
333 workflowCheckpoints = query.listCollection(sessionid, collectionStr).getResources().getElements();
334 }
335
336 for (int i = 0; i < workflowCheckpoints.length; i++) {
337 workflowCheckpoints[i] = collectionStr + "/" + workflowCheckpoints[i];
338 }
339 } catch (Exception e) {
340 sessionManager.resetSession();
341 throw new DatabaseException("Could not retrieve workflow checkpoints: " + e, e);
342 }
343
344 Arrays.sort(workflowCheckpoints);
345 return workflowCheckpoints;
346 }
347
348 public String getSpecificWorkflowXML(String resourcePath) throws DatabaseException {
349 logger.debug("Retrieving workflow \"" + resourcePath + "\" from XML database ...");
350 String xml;
351 try {
352 synchronized (sessionManager.queryLock) {
353 String sessionid = sessionManager.getQuerySessionId();
354 xml = query.getResource(sessionid, resourcePath, true, true);
355 }
356 } catch (Exception e) {
357 sessionManager.resetSession();
358 throw new DatabaseException("Could not retrieve workflow: " + e, e);
359 }
360 return xml;
361 }
362
363 public String getLatestWorkflowXML(String workflowID) throws DatabaseException, NoSuchWorkflowException {
364 logger.debug("Retrieving workflow \"" + workflowID + "\" from XML database ...");
365 String xml;
366 try {
367 String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
368 logger.debug("collection = " + collectionStr);
369 String path = getLatestWorkflowDocument(collectionStr);
370 synchronized (sessionManager.queryLock) {
371 String sessionid = sessionManager.getQuerySessionId();
372 logger.debug("Loading " + path + " ...");
373 xml = query.getResource(sessionid, path, true, true);
374 }
375 } catch (RemoteException e) {
376 sessionManager.resetSession();
377 throw new DatabaseException("Could not retrieve workflow: " + e, e);
378 }
379 return xml;
380 }
381
382 public String getEarliestWorkflowXML(String workflowID) throws DatabaseException {
383 logger.debug("Retrieving workflow \"" + workflowID + "\" from XML database ...");
384 String xml;
385 try {
386 String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
387 logger.debug("collection = " + collectionStr);
388 synchronized (sessionManager.queryLock) {
389 String sessionid = sessionManager.getQuerySessionId();
390 String[] workflowVersions = query.listCollection(sessionid, collectionStr).getResources().getElements();
391 for (String storedWorkflow : workflowVersions) {
392 logger.debug("Available workflow version: " + storedWorkflow);
393 }
394
395 String earliestVersion = workflowVersions[0];
396 for (String workflowVersion : workflowVersions) {
397 if (workflowVersion.compareTo(earliestVersion) < 0) {
398 earliestVersion = workflowVersion;
399 }
400 }
401 String path = collectionStr + "/" + earliestVersion;
402 logger.debug("Loading " + path + " ...");
403 xml = query.getResource(sessionid, path, true, true);
404 }
405 } catch (RemoteException e) {
406 sessionManager.resetSession();
407 throw new DatabaseException("Could not retrieve workflow: " + e, e);
408 }
409 return xml;
410 }
411
412
413
414
415
416
417
418
419
420
421 public void storeTokenData(String workflowID, String placeID, Token token, String[] owls) throws DatabaseException {
422
423 if (token.getData()==null) return;
424
425 if (logger.isDebugEnabled()) {
426 logger.debug("Store token data for placeID " + placeID);
427 }
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448 String resourceName = placeID + ".xml";
449 String collectionPath = gworkflowdlBaseCollection + "/" + workflowID + "/data";
450 String documentName = collectionPath + "/" + resourceName;
451
452 boolean resourceIsAvailable = false;
453 boolean collectionIsAvailable = false;
454
455 try {
456
457 synchronized (sessionManager.queryLock) {
458 String sessionid = sessionManager.getQuerySessionId();
459 QueryServiceStub.Collection collection = query.listCollection(sessionid, collectionPath);
460 collectionIsAvailable = true;
461 QueryServiceStub.StringArray resources = collection.getResources();
462 if (resources != null) {
463 String[] elements = resources.getElements();
464 if (elements != null) {
465 for (String resource : resources.getElements()) {
466 if (resource.equals(resourceName)) {
467 resourceIsAvailable = true;
468 break;
469 }
470 }
471 }
472 }
473 }
474 } catch (RemoteException e) {
475
476 }
477
478 try {
479
480 if (!resourceIsAvailable) {
481 StringBuffer xml = new StringBuffer("<workflow xmlns=\"http://www.gridworkflow.org/gworkflowdl\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xsi:schemaLocation=\"http://www.gridworkflow.org/gworkflowdl " + xsdPath + "/gworkflowdl_1_0.xsd\" ID=\"");
482 xml.append(workflowID).append("\">\n");
483 xml.append(" <place ID=\"").append(placeID).append("\">\n");
484 if (owls != null) {
485 for (String owl : owls) {
486 xml.append(" <tokenClass type=\"data\"><owl>").append(owl).append("</owl></tokenClass>\n");
487 }
488 }
489 xml.append(" <token>\n");
490 Property[] props = token.getProperties().getProperties();
491 for (Property prop : props) {
492 xml.append(" <property name=\"").append(prop.getKey()).append("\">").append(prop.getValue()).append("</property>\n");
493 }
494 xml.append(" ").append(token.getData().toXML()).append("\n");
495 xml.append(" </token>\n");
496 xml.append(" </place>\n");
497 xml.append("</workflow>");
498 synchronized (sessionManager.adminLock) {
499 String sessionid = sessionManager.getAdminSessionId();
500 if (!collectionIsAvailable) admin.createCollection(sessionid, collectionPath);
501 DataHandler dh = new DataHandler(new ByteArrayDataSource(xml.toString().getBytes()));
502 admin.store(sessionid, dh, "UTF-8", documentName, true);
503 if (logger.isDebugEnabled()) {
504 logger.debug("stored " + documentName);
505 }
506 }
507 }
508
509
510 else {
511 StringBuffer xupdate = new StringBuffer("<xupdate:modifications version=\"1.0\" xmlns:xupdate=\"http://www.xmldb.org/xupdate\">\n");
512 xupdate.append(" <xupdate:append select=\"declare namespace wf='http://www.gridworkflow.org/gworkflowdl';/wf:workflow/wf:place\" child=\"last()\">\n");
513 xupdate.append(" <xupdate:element name=\"token\">");
514 Property[] props = token.getProperties().getProperties();
515 for (Property prop : props) {
516 xupdate.append(" <property name=\"").append(prop.getKey()).append("\">").append(prop.getValue()).append("</property>\n");
517 }
518 xupdate.append(" ").append(token.getData().toXML()).append("\n");
519 xupdate.append(" </xupdate:element>\n");
520 xupdate.append(" </xupdate:append>\n");
521 xupdate.append("</xupdate:modifications>");
522 synchronized (sessionManager.adminLock) {
523 String sessionid = sessionManager.getAdminSessionId();
524 int modifications = admin.xupdateResource(sessionid, documentName, xupdate.toString());
525 if (logger.isDebugEnabled()) {
526 logger.debug("updated " + documentName);
527 }
528 }
529 }
530 } catch (RemoteException e) {
531 sessionManager.resetSession();
532 throw new DatabaseException("Could not store token data to database: " + e, e);
533 }
534 }
535
536
537
538
539
540
541
542 public String getWSActivityDescription(OperationCandidate op) throws DatabaseException {
543 logger.debug("getWSActivityDescription ...");
544
545
546 String collection = dgrdlBaseCollection+"/resources";
547
548 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getWSActivityDescription");
549 t.setAttribute("collection", collection);
550 t.setAttribute("serviceUri", op.getOperationName());
551 return xQueryResult0(t.toString());
552 }
553
554
555
556
557
558
559
560 public ActivityDescription getGRAMActivityDescription(OperationCandidate op) throws DatabaseException {
561 logger.debug("getGRAMActivtyDescription ...");
562
563
564 String collection = dgrdlBaseCollection+"/resources";
565 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getGRAMActivityDescription");
566 t.setAttribute("collection", collection);
567 t.setAttribute("hardwareUri", op.getResourceName());
568 t.setAttribute("softwareUri", op.getOperationName());
569 String ret = xQueryResult0(t.toString());
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587 ActivityDescription ad = new ActivityDescription(op.getResourceName(),op.getOperationName());
588 ad.put("factoryEndpoint", StringUtils.extractStringFromXML(ret,"factoryEndpoint"));
589 ad.put("executable", StringUtils.extractStringFromXML(ret,"executable"));
590 putXmlValueToActivityDescription(ret, ad, "factoryType");
591 putXmlValueToActivityDescription(ret, ad, "directory");
592 putXmlValueToActivityDescription(ret, ad, "localUserId");
593 putXmlValueToActivityDescription(ret, ad, "queue");
594 putXmlValueToActivityDescription(ret, ad, "maxTime");
595 putXmlValueToActivityDescription(ret, ad, "maxWallTime");
596 putXmlValueToActivityDescription(ret, ad, "maxCpuTime");
597 putXmlValueToActivityDescription(ret, ad, "maxMemory");
598 putXmlValueToActivityDescription(ret, ad, "minMemory");
599 putXmlValueToActivityDescription(ret, ad, "jobType");
600 putXmlValueToActivityDescription(ret, ad, "count");
601
602
603 if (ad.get("factoryEndpoint")==null) throw new DatabaseException("Could not retrieve factoryEndpoint for hardware '" + op.getResourceName() + "'");
604 if (ad.get("executable")==null) throw new DatabaseException("Could not retrieve executable for software '" + op.getOperationName() + "'");
605
606 return ad;
607 }
608
609 private void putXmlValueToActivityDescription(String ret, ActivityDescription ad, String name) {
610 String str = StringUtils.extractStringFromXML(ret,name);
611 if (str != null) {
612 ad.put(name, str);
613 }
614 }
615
616 public String lookupManagedJobFactoryService(String resourceUri) throws DatabaseException {
617 if (logger.isDebugEnabled()) {
618 logger.debug("Retrieving GRAM endpoint from XML database for resource '" + resourceUri + "'");
619 }
620 String collection = dgrdlBaseCollection+"/resources";
621 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getManagedJobFactoryService");
622 t.setAttribute("collection", collection);
623 t.setAttribute("resourceUri", resourceUri);
624 return xQueryResult0(t.toString());
625 }
626
627 public String[] getWorkflowStatistics() throws DatabaseException {
628 String xquery = "declare namespace wf=\"http://www.gridworkflow.org/gworkflowdl\";" +
629 "<workflowstats>\n" +
630 " <total>{count(/wf:workflow)}</total>\n" +
631 " <completed>{count(/wf:workflow[wf:property[@name='status']='COMPLETED'])}</completed>\n" +
632 " <terminated>{count(/wf:workflow[wf:property[@name='status']='TERMINATED'])}</terminated>\n" +
633 " <ctm>{count(/wf:workflow[wf:property[@name='domain']='CTM'])}</ctm>\n" +
634 " <erp>{count(/wf:workflow[wf:property[@name='domain']='ERP'])}</erp>\n" +
635 " <ffsc>{count(/wf:workflow[wf:property[@name='domain']='FFSC'])}</ffsc>\n" +
636 "</workflowstats>";
637
638 return xQueryResult(xquery);
639 }
640
641 public float getHardwareQuality(String resourceUri) throws DatabaseException {
642 float quality = 0;
643
644 if (resourceUri == null) throw new DatabaseException("Undefined Resource Uri");
645 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getHardwareQuality");
646 t.setAttribute("collection", dgrdlBaseCollection+"/resources");
647 t.setAttribute("hardwareUri", resourceUri);
648
649
650
651
652
653
654
655
656
657 String[] res = xQueryResult(t.toString());
658 return calculateQuality(resourceUri, res);
659 }
660
661 private float calculateQuality(String resourceUri, String[] res) throws DatabaseException {
662 float quality;
663 String lrms = StringUtils.extractStringFromXML(res[0], "type");
664
665 if (lrms == null || lrms.length()==0 || lrms.equalsIgnoreCase("Fork")) {
666 String l = StringUtils.extractStringFromXML(res[0], "load");
667 String c = StringUtils.extractStringFromXML(res[0], "cpu");
668 if (l != null && c != null) {
669 quality = ResourceQualityCalculator.calculateForkQuality(Integer.parseInt(l),Integer.parseInt(c));
670 } else {
671 throw new DatabaseException("error [Prorater]: no information about load and number of CPUs available in database for resourceUri =" + resourceUri);
672 }
673 }
674
675 else if (lrms.equalsIgnoreCase("PBS") || lrms.equalsIgnoreCase("LSF")) {
676 float LRMSQuality, WaittimeQuality;
677 String run = StringUtils.extractStringFromXML(res[0], "running");
678 String wat = StringUtils.extractStringFromXML(res[0], "waiting");
679 String wtime = StringUtils.extractStringFromXML(res[0], "waittime");
680
681 if (run != null && wat != null) {
682 LRMSQuality = ResourceQualityCalculator.calculateBatchQueueQuality(Integer.parseInt(run), Integer.parseInt(wat));
683 } else {
684 throw new DatabaseException("error [Prorater]: no information about number of jobs in queue available in database for resourceUri =" + resourceUri);
685 }
686 if (wtime != null) {
687 WaittimeQuality = ResourceQualityCalculator.calculateWaittimeQuality(Float.parseFloat(wtime));
688 } else {
689 logger.warn("error [Prorater]: no information about estimated queue waittime available in database for resourceUri =" + resourceUri);
690 WaittimeQuality = 0.9f;
691 }
692 quality = LRMSQuality * WaittimeQuality;
693 }
694
695 else throw new DatabaseException("error [Prorater]: no type of local resource managementent system specified in database for resourceUri =" + resourceUri);
696 return quality;
697 }
698
699
700
701
702
703
704
705
706 public void incrementResourceScore(String resourceUri, int increment) throws DatabaseException {
707 if (resourceUri == null) return;
708 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("incrementResourceScore");
709 t.setAttribute("collection", dgrdlBaseCollection+"/resources");
710 t.setAttribute("resourceUri", resourceUri);
711 t.setAttribute("increment", increment);
712 xQueryResponse(t.toString());
713 }
714
715 public synchronized DistributionSet getResourceDurationStatistics(String operationName, String resourceName) throws DatabaseException {
716 if (operationName == null || resourceName == null) return null;
717
718 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getResourceDurationStatistics");
719 t.setAttribute("collection", dgrdlBaseCollection+"/resources");
720 t.setAttribute("resourceUri", operationName);
721 t.setAttribute("hardwareUri", resourceName);
722 String res = xQueryResult0(t.toString());
723
724
725
726
727
728
729
730
731
732
733
734
735
736 try {
737 long numberActivities = Long.parseLong(StringUtils.extractStringFromXML(res,"numberActivities"));
738 double totalMean = Double.parseDouble(StringUtils.extractStringFromXML(res,"total.mean"));
739 double totalStdDeviation = Double.parseDouble(StringUtils.extractStringFromXML(res,"total.stdDeviation"));
740 double totalExpSmooth = Double.parseDouble(StringUtils.extractStringFromXML(res,"total.expSmooth"));
741 long totalMin = Long.parseLong(StringUtils.extractStringFromXML(res,"total.min"));
742 long totalMax = Long.parseLong(StringUtils.extractStringFromXML(res,"total.max"));
743 double activeMean = Double.parseDouble(StringUtils.extractStringFromXML(res,"active.mean"));
744 double activeStdDeviation = Double.parseDouble(StringUtils.extractStringFromXML(res,"active.stdDeviation"));
745 double activeExpSmooth = Double.parseDouble(StringUtils.extractStringFromXML(res,"active.expSmooth"));
746 long activeMin = Long.parseLong(StringUtils.extractStringFromXML(res,"active.min"));
747 long activeMax = Long.parseLong(StringUtils.extractStringFromXML(res,"active.max"));
748 LongDistribution totalDist = new LongDistribution(numberActivities,totalMean,totalStdDeviation,totalExpSmooth,totalMin,totalMax);
749 LongDistribution activeDist = new LongDistribution(numberActivities,activeMean,activeStdDeviation,activeExpSmooth,activeMin,activeMax);
750 LongDistribution e = new LongDistribution();
751 return new DistributionSet(e, e, e, activeDist, e, totalDist);
752 } catch (NumberFormatException e) {
753 logger.warn("Could not retrieve duration distribution for operation '"+operationName+"' and resource '"+resourceName+"': "+e);
754 return null;
755 } catch (NullPointerException e) {
756 logger.warn("Could not retrieve duration distribution for operation '"+operationName+"' and resource '"+resourceName+"': "+e);
757 return null;
758 }
759 }
760
761
762 public synchronized void updateResourceDurationStatistics(String operationName, String resourceName, DistributionSet distSet) throws DatabaseException {
763 if (operationName == null || resourceName == null || distSet == null) return;
764
765 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("updateResourceDurationStatistics");
766 t.setAttribute("collection", dgrdlBaseCollection+"/resources");
767 t.setAttribute("resourceUri", operationName);
768 t.setAttribute("hardwareUri", resourceName);
769 t.setAttribute("activeDist", distSet.durationActive);
770 t.setAttribute("totalDist", distSet.durationTotal);
771 xQueryResponse(t.toString());
772 }
773
774 public void storeTemporaryWorkflowDirectory(String resourceUri, String host, String directory, String workflowID, String activityID) throws DatabaseException {
775 if (resourceUri == null) throw new DatabaseException("Undefined Resource Uri");
776 String collection = gworkflowdlBaseCollection + "/" + workflowID + "/resources";
777 String document = collection + "/" + resourceUri.replace(':', '_').replace('/', '_') + ".xml";
778
779 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("insertTemporaryWorkflowDirectory");
780 t.setAttribute("document", document);
781 t.setAttribute("resourceUri", resourceUri);
782 t.setAttribute("host", host);
783 t.setAttribute("directory", directory);
784 t.setAttribute("activityID", activityID);
785 String res = xQueryResult0(t.toString());
786
787
788 if (res.equals("<notAvailable/>")) {
789 t = xqueryWorkflowGroup.getInstanceOf("temporaryWorkflowDirectory");
790 t.setAttribute("xsdPath", xsdPath);
791 t.setAttribute("resourceUri", resourceUri);
792 t.setAttribute("host", host);
793 t.setAttribute("directory", directory);
794 t.setAttribute("activityID", activityID);
795 String resourceXML = t.toString();
796 try {
797 synchronized (sessionManager.adminLock) {
798 String sessionid = sessionManager.getAdminSessionId();
799 admin.createCollection(sessionid, collection);
800 DataHandler dh = new DataHandler(new ByteArrayDataSource(resourceXML.getBytes()));
801 admin.store(sessionid, dh, "UTF-8", document, false);
802 }
803 if (logger.isDebugEnabled()) {
804 logger.debug("stored " + document);
805 }
806 } catch (Exception e) {
807 sessionManager.resetSession();
808 throw new DatabaseException("Could not store information to database: " + e, e);
809 }
810 }
811 }
812
813 public String[] getTemporaryWorkflowDirectories(String workflowID) throws DatabaseException {
814 String collection = gworkflowdlBaseCollection + "/" + workflowID + "/resources";
815
816 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getTemporaryWorkflowDirectories");
817 t.setAttribute("collection", collection);
818
819 return xQueryResult(t.toString());
820 }
821
822 public void removeWorkflow(String workflowID) throws DatabaseException, NoSuchWorkflowException {
823 String collection = gworkflowdlBaseCollection + "/" + workflowID;
824 try {
825 synchronized (sessionManager.adminLock) {
826 String sessionid = sessionManager.getAdminSessionId();
827 boolean sucess = admin.removeCollection(sessionid, collection);
828 if (!sucess) {
829 throw new NoSuchWorkflowException("Could not remove workflow with ID '"+workflowID+"' from database!");
830 }
831 }
832 logger.debug("removed " + collection);
833 } catch (RemoteException e) {
834 sessionManager.resetSession();
835 throw new DatabaseException("Could not remove workflow from database: " + e, e);
836 }
837
838 }
839
840 public String[] getWorkflowIDs() throws DatabaseException {
841 logger.debug("Retrieving workflowIDs from XML database ...");
842 String[] ret;
843 try {
844 synchronized (sessionManager.queryLock) {
845 String sessionid = sessionManager.getQuerySessionId();
846 ret = query.listCollection(sessionid, gworkflowdlBaseCollection).getCollections().getElements();
847 }
848 } catch (Exception e) {
849 sessionManager.resetSession();
850 throw new DatabaseException("Could not retrieve workflow: " + e, e);
851 }
852 return ret;
853 }
854
855 public String[] getData(String workflowID, String placeID) throws DatabaseException, NoSuchWorkflowException {
856 logger.debug("Retrieving data from place " + placeID + " of workflow " + workflowID + " from XML database ...");
857 String collection = gworkflowdlBaseCollection + "/" + workflowID;
858 String document = getLatestWorkflowDocument(collection);
859 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getData");
860 t.setAttribute("document", document);
861 t.setAttribute("placeID", placeID);
862 return xQueryResult(t.toString());
863 }
864
865 public Map<String, String[]> getWorkflowStatusMap() throws DatabaseException {
866 logger.debug("Retrieving workflowStatusArray from XML database ...");
867
868 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getWorkflowStatusArray");
869 t.setAttribute("collection", gworkflowdlBaseCollection);
870
871 String[] res = xQueryResult(t.toString());
872
873 Map<String, String[]> ret = null;
874 if (res.length > 0) {
875 ret = new HashMap<String, String[]>(res.length);
876 for (String xml : res) {
877
878 String[] array = new String[STATUS_ARRAY_NAMES.length];
879 int i = 0;
880 for (String name : STATUS_ARRAY_NAMES) {
881 String value = StringUtils.extractStringFromXML(xml, name);
882 array[i++] = name + "=" + ((value != null) ? value : "0");
883 }
884 ret.put(array[0].substring(array[0].indexOf("=") + 1), array);
885 }
886 }
887 return ret;
888 }
889
890 public String getDescription(String workflowID) throws DatabaseException, NoSuchWorkflowException {
891 String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
892 String document = getLatestWorkflowDocument(collectionStr);
893 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getDescription");
894 t.setAttribute("document", document);
895
896 return xQueryResult0(t.toString());
897 }
898
899 public void setDescription(String workflowID, String description) throws DatabaseException, NoSuchWorkflowException {
900 String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
901 String document = getLatestWorkflowDocument(collectionStr);
902 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("setDescription");
903 t.setAttribute("document", document);
904 t.setAttribute("description", description);
905
906 xQueryResponse(t.toString());
907 }
908
909 public void setProperty(String workflowID, String name, String value) throws DatabaseException, NoSuchWorkflowException {
910 String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
911 String document = getLatestWorkflowDocument(collectionStr);
912 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("setProperty");
913 t.setAttribute("document", document);
914 t.setAttribute("name", name);
915 t.setAttribute("value", value);
916
917 xQueryResponse(t.toString());
918 }
919
920 public String getProperty(String workflowID, String name) throws DatabaseException, NoSuchWorkflowException {
921 String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
922 String document = getLatestWorkflowDocument(collectionStr);
923 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getProperty");
924 t.setAttribute("document", document);
925 t.setAttribute("name", name);
926
927 return xQueryResult0(t.toString());
928 }
929
930 public String[][] getProperties(String workflowID) throws DatabaseException, NoSuchWorkflowException {
931 String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
932 String document = getLatestWorkflowDocument(collectionStr);
933 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getProperties");
934 t.setAttribute("document", document);
935 String[] res = xQueryResult(t.toString());
936
937 String[][] ret;
938 ret = new String[res.length + 1][2];
939 for (int i = 0; i < res.length; i++) {
940 ret[i][0] = StringUtils.extractStringFromXML(res[i], "name");
941 ret[i][1] = StringUtils.extractStringFromXML(res[i], "value");
942 }
943 ret[res.length][0] = "level";
944 ret[res.length][1] = "DATABASE";
945
946 return ret;
947 }
948
949 public String[] getFaultToleranceStatistics() throws DatabaseException, NoSuchWorkflowException {
950 Map statusmap = getWorkflowStatusMap();
951 Iterator iter = statusmap.keySet().iterator();
952 String[] ret = new String[statusmap.size()];
953 int i=0;
954 while (iter.hasNext()) {
955 String workflowID = (String) iter.next();
956 logger.debug(workflowID);
957 String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
958 String document = getLatestWorkflowDocument(collectionStr);
959 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getFaultToleranceStatistics");
960 t.setAttribute("document", document);
961
962 ret[i++]=xQueryResult0(t.toString());
963 }
964 return ret;
965 }
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988 public String[] getAvailableResources(String ofClass) throws DatabaseException {
989 logger.debug("Retrieving available resources from XML database ...");
990 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getAvailableResources");
991 t.setAttribute("collection", dgrdlBaseCollection+"/resources");
992 if (ofClass != null) t.setAttribute("ofClass", ofClass);
993
994 String[] res = xQueryResult(t.toString());
995 return res;
996 }
997
998 public String getResourceDescription(String resourceUri) throws DatabaseException {
999 logger.debug("Retrieving resource description from XML database ...");
1000 StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getResourceDescription");
1001 t.setAttribute("collection", dgrdlBaseCollection+"/resources");
1002 t.setAttribute("resourceUri", resourceUri);
1003
1004 return xQueryResult0(t.toString());
1005 }
1006
1007
1008
1009
1010
1011
1012 private StringTemplateGroup loadSTGFileFromClasspath
1013 (String
1014 templateFilename) {
1015 String location = "templates";
1016 StringTemplateGroupLoader loader = new CommonGroupLoader(location, new LogStringTemplateErrorListener());
1017 StringTemplateGroup.registerGroupLoader(loader);
1018 StringTemplateGroup.registerDefaultLexer(DefaultTemplateLexer.class);
1019 return StringTemplateGroup.loadGroup(templateFilename);
1020 }
1021
1022 private class LogStringTemplateErrorListener implements StringTemplateErrorListener {
1023 public void error(String msg, Throwable e) {
1024 logger.error(msg, e);
1025 }
1026
1027 public void warning(String msg) {
1028 logger.warn(msg);
1029 }
1030 }
1031
1032 public String getLatestWorkflowDocument(String collectionStr) throws DatabaseException, NoSuchWorkflowException {
1033 String document;
1034 String[] workflowVersions;
1035
1036 try {
1037 synchronized (sessionManager.queryLock) {
1038 String sessionid = sessionManager.getQuerySessionId();
1039 workflowVersions = query.listCollection(sessionid, collectionStr).getResources().getElements();
1040 }
1041 } catch (RemoteException e) {
1042 sessionManager.resetSession();
1043 if (e.getMessage().contains("collection "+collectionStr+" not found")) {
1044 throw new NoSuchWorkflowException(e.getMessage(),e);
1045 }
1046 else {
1047 throw new DatabaseException("Could not access remote database: " + e, e);
1048 }
1049 }
1050
1051 if (logger.isDebugEnabled()) {
1052 for (String storedWorkflow : workflowVersions) {
1053 logger.debug("Available workflow version: " + storedWorkflow);
1054 }
1055 }
1056
1057 String latestVersion = workflowVersions[0];
1058 for (int i = 0; i < workflowVersions.length; i++) {
1059 if (workflowVersions[i].compareTo(latestVersion) > 0) {
1060 latestVersion = workflowVersions[i];
1061 }
1062 }
1063 if (logger.isDebugEnabled()) {
1064 logger.debug("Latest workflow version: " + latestVersion);
1065 }
1066
1067 document = collectionStr + "/" + latestVersion;
1068 return document;
1069 }
1070
1071 }