View Javadoc

1   /*
2    * Copyright 2010 Fraunhofer Gesellschaft, Munich, Germany,
3    * for its Fraunhofer Institute for Computer Architecture and Software
4    * Technology (FIRST), Berlin, Germany. All rights reserved.
5    * http://www.first.fraunhofer.de/
6    */
7   
8   package net.kwfgrid.gwes;
9   
10  import net.kwfgrid.gwes.exception.DatabaseException;
11  import net.kwfgrid.gwes.exception.NoResultForDatabaseQueryException;
12  import net.kwfgrid.gwes.exception.NoSuchWorkflowException;
13  import net.kwfgrid.gwes.xmldbclient.*;
14  import net.kwfgrid.gwes.prorater.ResourceQualityCalculator;
15  import net.kwfgrid.gwes.workflowanalyzer.DistributionSet;
16  import net.kwfgrid.gwes.workflowanalyzer.LongDistribution;
17  import net.kwfgrid.gwes.util.StringUtils;
18  import net.kwfgrid.gworkflowdl.protocol.server.ServerWorkflow;
19  import net.kwfgrid.gworkflowdl.structure.Token;
20  import net.kwfgrid.gworkflowdl.structure.Workflow;
21  import net.kwfgrid.gworkflowdl.structure.Property;
22  import net.kwfgrid.gworkflowdl.structure.OperationCandidate;
23  import org.antlr.stringtemplate.*;
24  import org.antlr.stringtemplate.language.DefaultTemplateLexer;
25  import org.apache.log4j.Logger;
26  import org.apache.axiom.attachments.ByteArrayDataSource;
27  
28  import javax.activation.DataHandler;
29  import java.net.URL;
30  import java.rmi.RemoteException;
31  import java.text.DecimalFormat;
32  import java.text.NumberFormat;
33  import java.util.*;
34  
35  /**
36   * Singleton pattern.
37   * Assumes that workflows and resources are in the same eXist database.
38   * @author Andreas Hoheisel
39   *         (<a href="http://www.andreas-hoheisel.de">www.andreas-hoheisel.de</a>)
40   * @author Dietmar Sommerfeld / GWDG (part of quality calculation)
41   * @version $Id: XMLDB.java 1540 2011-08-17 13:30:37Z hoheisel $
42   */
43  public class XMLDB {
44  
45      /**
46       * log4j logger.
47       */
48      final static Logger logger = Logger.getLogger(XMLDB.class);
49  
50      private static XMLDB _i;
51      private static String gworkflowdlBaseCollection = "/db/gworkflowdl";
52      private static String dgrdlBaseCollection = System.getProperty(Constants.PROP_SYSTEM_RESOURCE_REPOSITORY_DGRDL_COLLECTION, "/db/dgrdl");
53      private static String xsdPath = System.getProperty(Constants.PROP_SYSTEM_GWORKFLOWDL_XSD_PATH, "http://www.gridworkflow.org/kwfgrid/src/xsd");
54  
55      private static String[] STATUS_ARRAY_NAMES = {
56              "ID",
57              Constants.PROP_STATUS,
58              Constants.PROP_BIRTHDAY_MS,
59              Constants.PROP_DURATION_UNDEFINED_MS,
60              Constants.PROP_DURATION_INITIATED_MS,
61              Constants.PROP_DURATION_RUNNING_MS,
62              Constants.PROP_DURATION_ACTIVE_MS,
63              Constants.PROP_DURATION_SUSPENDED_MS,
64              Constants.PROP_DURATION_TOTAL_MS,
65              Constants.PROP_END_TIME_MS,
66              Constants.PROP_LEVEL,
67              "description",
68              Constants.PROP_USER_ID
69      };
70  
71      private String user;
72      private String password;
73  
74      // Axis 1.3
75  //    private Query_PortType query;
76  //    private Admin_PortType admin;
77  
78      // Axis 1.2RC2
79  //    private Query query;
80  //    private Admin admin;
81  
82      // Axis2 1.5.1
83      private QueryInterface query;
84      private AdminInterface admin;
85  
86      private DecimalFormat intformat;
87  
88      private XMLDBSessionManager sessionManager;
89  
90      private StringTemplateGroup xqueryWorkflowGroup;
91  
92      /**
93       * Constructor that connects to a database specified by system properties.
94       * @throws Exception
95       */
96      private XMLDB() throws Exception {
97          this(System.getProperty(Constants.PROP_SYSTEM_WORKFLOW_REPOSITORY_URL), System.getProperty(Constants.PROP_SYSTEM_WORKFLOW_REPOSITORY_USER), System.getProperty(Constants.PROP_SYSTEM_WORKFLOW_REPOSITORY_PASSWORD));
98      }
99  
100     private XMLDB(String xmldbUrl, String user, String password) throws Exception {
101         this.user = user;
102         this.password = password;
103         String queryServiceStr = xmldbUrl + "/services/Query";
104         String adminServiceStr = xmldbUrl + "/services/Admin";
105         if (xmldbUrl == null || user == null || password == null) {
106             throw new DatabaseException("The XMLDB client has not been configured correctly. Please set the properties "
107                     + Constants.PROP_SYSTEM_WORKFLOW_REPOSITORY_URL + ", "
108                     + Constants.PROP_SYSTEM_WORKFLOW_REPOSITORY_USER + ", and "
109                     + Constants.PROP_SYSTEM_WORKFLOW_REPOSITORY_PASSWORD
110                     + " in the file gwes.properties");
111         }
112         URL adminServiceUrl = new URL(adminServiceStr);
113 
114         // format for sequence number
115         intformat = (DecimalFormat) NumberFormat.getIntegerInstance();
116         intformat.applyPattern("0000000000");
117 
118         query = new RemoteQuery(queryServiceStr);
119         logger.debug("connected to " + queryServiceStr);
120 
121         admin = new RemoteAdmin(adminServiceStr);
122         logger.debug("connected to " + adminServiceUrl);
123 
124         // start XMLDBSession Manager
125         sessionManager = new XMLDBSessionManager(this);
126 
127         // read template group
128         xqueryWorkflowGroup = loadSTGFileFromClasspath("xquery_workflow");
129 
130     }
131 
132     /**
133      * Get the xmldb instance or create a new one if null.
134      *
135      * @return The current XMLDB instance or <code>null</code> if there an exception occurs.
136      */
137     public synchronized static XMLDB getInstance() throws DatabaseException {
138         if (_i == null) try {
139             _i = new XMLDB();
140         } catch (Exception e) {
141             throw new DatabaseException("Could not connect to XML database: " + e, e);
142         }
143         return _i;
144     }
145 
146     /**
147      * Create a new xmldb instance with url, username and password.
148      *
149      * @return The XMLDB instance.
150      */
151     public synchronized static XMLDB getFirstInstance(String xmldbUrl, String user, String password) throws DatabaseException {
152         if (_i == null) try {
153             _i = new XMLDB(xmldbUrl, user, password);
154             return _i;
155         } catch (Exception e) {
156             throw new DatabaseException("Could not connect to XML database: " + e, e);
157         } else {
158             throw new DatabaseException("XML database already instantiated! Use getInstance() to connect to this instance.");
159         }
160     }
161 
162     /**
163      * Used by the XMLDBSessionManager.
164      *
165      * @return query session ID.
166      * @throws DatabaseException
167      */
168     protected String connectQuery() throws DatabaseException {
169         try {
170             return query.connect(user, password);
171         } catch (RemoteException e) {
172             throw new DatabaseException("Could not connect to database: " + e, e);
173         }
174     }
175 
176     /**
177      * Used by the XMLDBSessionManager.
178      *
179      * @param sessionId
180      * @throws DatabaseException
181      */
182     protected void disconnectQuery(String sessionId) throws DatabaseException {
183         try {
184             query.disconnect(sessionId);
185         } catch (RemoteException e) {
186             throw new DatabaseException("Could not disconnect from database: " + e, e);
187         }
188     }
189 
190     /**
191      * Used by the XMLDBSessionManager.
192      *
193      * @return admin session ID.
194      * @throws DatabaseException
195      */
196     protected String connectAdmin() throws DatabaseException {
197         try {
198             return admin.connect(user, password);
199         } catch (RemoteException e) {
200             throw new DatabaseException("Could not connect to database: " + e, e);
201         }
202     }
203 
204     /**
205      * Used by the XMLDBSessionManager.
206      *
207      * @param sessionId
208      * @throws DatabaseException
209      */
210     protected void disconnectAdmin(String sessionId) throws DatabaseException {
211         try {
212             admin.disconnect(sessionId);
213         } catch (RemoteException e) {
214             throw new DatabaseException("Could not disconnect from database: " + e, e);
215         }
216     }
217 
218     public String[] xQueryResult(String xquery) throws DatabaseException {
219         logger.debug(xquery);
220         try {
221             synchronized (sessionManager.queryLock) {
222                 String sessionid = sessionManager.getQuerySessionId();
223                 QueryServiceStub.QueryResponse r = query.query(sessionid, xquery);
224                 if (r.getHits() > 0) {
225                     String[] results = query.retrieve(sessionid, 1, 0, false, false, "none");
226                     if (results == null) throw new NoResultForDatabaseQueryException("Result is null for database XQuery \"" + xquery +"\"");
227                     if(logger.isDebugEnabled()) {
228                         logger.debug("XQuery response:");
229                         for (String result : results) {
230                             logger.debug(result);
231                         }
232                     }
233                     return results;
234                 } else {
235                     throw new NoResultForDatabaseQueryException("No result for database XQuery \"" + xquery +"\"");
236                 }
237             }
238         } catch (RemoteException e) {
239             sessionManager.resetSession();
240             throw new DatabaseException("Could not access remote database: " + e, e);
241         }
242     }
243 
244     public String xQueryResult0(String xquery) throws DatabaseException {
245         logger.debug(xquery);
246         try {
247             synchronized (sessionManager.queryLock) {
248                 String sessionid = sessionManager.getQuerySessionId();
249                 QueryServiceStub.QueryResponse r = query.query(sessionid, xquery);
250                 if (r.getHits() > 0) {
251                     String[] results = query.retrieve(sessionid, 1, 1, false, false, "none");
252                     if (results == null) throw new NoResultForDatabaseQueryException("Result is null for database XQuery \"" + xquery +"\"");
253                     if(logger.isDebugEnabled()) {
254                         logger.debug("XQuery response:"+ results[0]);
255                     }
256                     return results[0];
257                 } else {
258                     throw new NoResultForDatabaseQueryException("No result for database XQuery \"" + xquery +"\"");
259                 }
260             }
261         } catch (RemoteException e) {
262             sessionManager.resetSession();
263             throw new DatabaseException("Could not access remote database: " + e, e);
264         }
265     }
266 
267     public QueryServiceStub.QueryResponse xQueryResponse(String xquery) throws DatabaseException {
268         logger.debug(xquery);
269         QueryServiceStub.QueryResponse r;
270         try {
271             synchronized (sessionManager.queryLock) {
272                 String sessionid = sessionManager.getQuerySessionId();
273                 r = query.query(sessionid, xquery);
274             }
275         } catch (RemoteException e) {
276             sessionManager.resetSession();
277             throw new DatabaseException("Could not access remote database: " + e, e);
278         }
279         return r;
280     }
281 
282     public String storeWorkflow(Workflow workflow) throws DatabaseException {
283         String path;
284         try {
285             String collection = gworkflowdlBaseCollection + "/" + workflow.getID();
286             byte[] wf;
287             ServerWorkflow serverWorkflow = (ServerWorkflow) workflow;
288 
289             synchronized (serverWorkflow.getStructureLock()) {
290                 path = new StringBuffer()
291                         .append(collection)
292                         .append("/")
293                         .append(intformat.format(serverWorkflow.getVersionNumber()))
294                         .append(".xml").toString();
295                 wf = serverWorkflow.getXML().getBytes();
296             }
297 
298             synchronized (sessionManager.adminLock) {
299                 String sessionid = sessionManager.getAdminSessionId();
300                 admin.createCollection(sessionid, collection);
301                 DataHandler dh = new DataHandler(new ByteArrayDataSource(wf));
302                 admin.store(sessionid, dh, "UTF-8", path, true);
303             }
304             logger.debug("stored " + path);
305         } catch (ClassCastException e) {
306             logger.error("ERROR in GWorkflowDL library: " + e, e);
307             throw new DatabaseException("Could not store workflow: " + e, e);
308         } catch (Exception e) {
309             sessionManager.resetSession();
310             throw new DatabaseException("Could not store workflow: " + e, e);
311         }
312         return path;
313     }
314 
315     public int getStatus(String workflowID) throws DatabaseException, NoSuchWorkflowException {
316         logger.debug("Retrieving workflow status from XML database ...");
317         String collection = gworkflowdlBaseCollection + "/" + workflowID;
318         String document = getLatestWorkflowDocument(collection);
319         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getStatus");
320         t.setAttribute("document", document);
321         String r = xQueryResult0(t.toString());
322         return WorkflowStatus.getStatusAsInt(r);
323     }
324 
325     public String[] getCheckpoints(String workflowID) throws DatabaseException {
326         logger.debug("Retrieving checkpoints for workflow \"" + workflowID + "\" from XML database ...");
327         String[] workflowCheckpoints;
328         try {
329             String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
330             logger.debug("collection = " + collectionStr);
331             synchronized (sessionManager.queryLock) {
332                 String sessionid = sessionManager.getQuerySessionId();
333                 workflowCheckpoints = query.listCollection(sessionid, collectionStr).getResources().getElements();
334             }
335             // include collection
336             for (int i = 0; i < workflowCheckpoints.length; i++) {
337                 workflowCheckpoints[i] = collectionStr + "/" + workflowCheckpoints[i];
338             }
339         } catch (Exception e) {
340             sessionManager.resetSession();
341             throw new DatabaseException("Could not retrieve workflow checkpoints: " + e, e);
342         }
343 
344         Arrays.sort(workflowCheckpoints);
345         return workflowCheckpoints;
346     }
347 
348     public String getSpecificWorkflowXML(String resourcePath) throws DatabaseException {
349         logger.debug("Retrieving workflow \"" + resourcePath + "\" from XML database ...");
350         String xml;
351         try {
352             synchronized (sessionManager.queryLock) {
353                 String sessionid = sessionManager.getQuerySessionId();
354                 xml = query.getResource(sessionid, resourcePath, true, true);
355             }
356         } catch (Exception e) {
357             sessionManager.resetSession();
358             throw new DatabaseException("Could not retrieve workflow: " + e, e);
359         }
360         return xml;
361     }
362 
363     public String getLatestWorkflowXML(String workflowID) throws DatabaseException, NoSuchWorkflowException {
364         logger.debug("Retrieving workflow \"" + workflowID + "\" from XML database ...");
365         String xml;
366         try {
367             String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
368             logger.debug("collection = " + collectionStr);
369             String path = getLatestWorkflowDocument(collectionStr);
370             synchronized (sessionManager.queryLock) {
371                 String sessionid = sessionManager.getQuerySessionId();
372                 logger.debug("Loading " + path + " ...");
373                 xml = query.getResource(sessionid, path, true, true);
374             }
375         } catch (RemoteException e) {
376             sessionManager.resetSession();
377             throw new DatabaseException("Could not retrieve workflow: " + e, e);
378         }
379         return xml;
380     }
381 
382     public String getEarliestWorkflowXML(String workflowID) throws DatabaseException {
383         logger.debug("Retrieving workflow \"" + workflowID + "\" from XML database ...");
384         String xml;
385         try {
386             String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
387             logger.debug("collection = " + collectionStr);
388             synchronized (sessionManager.queryLock) {
389                 String sessionid = sessionManager.getQuerySessionId();
390                 String[] workflowVersions = query.listCollection(sessionid, collectionStr).getResources().getElements();
391                 for (String storedWorkflow : workflowVersions) {
392                     logger.debug("Available workflow version: " + storedWorkflow);
393                 }
394 
395                 String earliestVersion = workflowVersions[0];
396                 for (String workflowVersion : workflowVersions) {
397                     if (workflowVersion.compareTo(earliestVersion) < 0) {
398                         earliestVersion = workflowVersion;
399                     }
400                 }
401                 String path = collectionStr + "/" + earliestVersion;
402                 logger.debug("Loading " + path + " ...");
403                 xml = query.getResource(sessionid, path, true, true);
404             }
405         } catch (RemoteException e) {
406             sessionManager.resetSession();
407             throw new DatabaseException("Could not retrieve workflow: " + e, e);
408         }
409         return xml;
410     }
411 
412     /**
413      * Store a data token in the XML database.
414      *
415      * @param workflowID
416      * @param placeID
417      * @param token
418      * @param owls
419      * @throws DatabaseException
420      */
421     public void storeTokenData(String workflowID, String placeID, Token token, String[] owls) throws DatabaseException {
422         // return if control token (data = null)
423         if (token.getData()==null) return;
424 
425         if (logger.isDebugEnabled()) {
426             logger.debug("Store token data for placeID " + placeID);
427         }
428 
429         //   /db/gworkflowdl/<workflowID>/data/place_<placeID>.xml
430         //
431         //   <workflow ID="...">
432         //     <place ID="...">
433         //       <tokenClass type="data">
434         //         <owl>...</owl>
435         //       </tokenClass>
436         //       <token>
437         //         ...
438         //       </token>
439         //       <token>
440         //         ...
441         //       </token>
442         //       <token>
443         //         ...
444         //       </token>
445         //     </place>
446         //   </workflow>
447 
448         String resourceName = placeID + ".xml";
449         String collectionPath = gworkflowdlBaseCollection + "/" + workflowID + "/data";
450         String documentName = collectionPath + "/" + resourceName;
451 
452         boolean resourceIsAvailable = false;
453         boolean collectionIsAvailable = false;
454 
455         try {
456             // check whether resource and collection already exists.
457             synchronized (sessionManager.queryLock) {
458                 String sessionid = sessionManager.getQuerySessionId();
459                 QueryServiceStub.Collection collection = query.listCollection(sessionid, collectionPath);
460                 collectionIsAvailable = true;
461                 QueryServiceStub.StringArray resources = collection.getResources();
462                 if (resources != null) {
463                     String[] elements = resources.getElements();
464                     if (elements != null) {
465                         for (String resource : resources.getElements()) {
466                             if (resource.equals(resourceName)) {
467                                 resourceIsAvailable = true;
468                                 break;
469                             }
470                         }
471                     }
472                 }
473             }
474         } catch (RemoteException e) {
475             ///collection is not available.
476         }
477 
478         try {
479             // create new resource if it does not exist yet
480             if (!resourceIsAvailable) {
481                 StringBuffer xml = new StringBuffer("<workflow xmlns=\"http://www.gridworkflow.org/gworkflowdl\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xsi:schemaLocation=\"http://www.gridworkflow.org/gworkflowdl " + xsdPath + "/gworkflowdl_1_0.xsd\" ID=\"");
482                 xml.append(workflowID).append("\">\n");
483                 xml.append("  <place ID=\"").append(placeID).append("\">\n");
484                 if (owls != null) {
485                     for (String owl : owls) {
486                         xml.append("    <tokenClass type=\"data\"><owl>").append(owl).append("</owl></tokenClass>\n");
487                     }
488                 }
489                 xml.append("    <token>\n");
490                 Property[] props = token.getProperties().getProperties();
491                 for (Property prop : props) {
492                     xml.append("      <property name=\"").append(prop.getKey()).append("\">").append(prop.getValue()).append("</property>\n");
493                 }
494                 xml.append("      ").append(token.getData().toXML()).append("\n");
495                 xml.append("    </token>\n");
496                 xml.append("  </place>\n");
497                 xml.append("</workflow>");
498                 synchronized (sessionManager.adminLock) {
499                     String sessionid = sessionManager.getAdminSessionId();
500                     if (!collectionIsAvailable) admin.createCollection(sessionid, collectionPath);
501                     DataHandler dh = new DataHandler(new ByteArrayDataSource(xml.toString().getBytes())); 
502                     admin.store(sessionid, dh, "UTF-8", documentName, true);
503                     if (logger.isDebugEnabled()) {
504                         logger.debug("stored " + documentName);
505                     }
506                 }
507             }
508 
509             // update resource if it already exists
510             else {
511                 StringBuffer xupdate = new StringBuffer("<xupdate:modifications version=\"1.0\" xmlns:xupdate=\"http://www.xmldb.org/xupdate\">\n");
512                 xupdate.append("  <xupdate:append select=\"declare namespace wf='http://www.gridworkflow.org/gworkflowdl';/wf:workflow/wf:place\" child=\"last()\">\n");
513                 xupdate.append("    <xupdate:element name=\"token\">");
514                 Property[] props = token.getProperties().getProperties();
515                 for (Property prop : props) {
516                     xupdate.append("      <property name=\"").append(prop.getKey()).append("\">").append(prop.getValue()).append("</property>\n");
517                 }
518                 xupdate.append("      ").append(token.getData().toXML()).append("\n");
519                 xupdate.append("    </xupdate:element>\n");
520                 xupdate.append("  </xupdate:append>\n");
521                 xupdate.append("</xupdate:modifications>");
522                 synchronized (sessionManager.adminLock) {
523                     String sessionid = sessionManager.getAdminSessionId();
524                     int modifications = admin.xupdateResource(sessionid, documentName, xupdate.toString());
525                     if (logger.isDebugEnabled()) {
526                         logger.debug("updated " + documentName);
527                     }
528                 }
529             }
530         } catch (RemoteException e) {
531             sessionManager.resetSession();
532             throw new DatabaseException("Could not store token data to database: " + e, e);
533         }
534     }
535 
536     /**
537      * Lookup WS activity description for specific WSOperation.
538      * @param op
539      * @return
540      * @throws DatabaseException
541      */
542     public String getWSActivityDescription(OperationCandidate op) throws DatabaseException {
543         logger.debug("getWSActivityDescription ...");
544 
545         //retrieve information from database
546         String collection = dgrdlBaseCollection+"/resources";
547         // getWSActivityDescription(collection,serviceUri)
548         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getWSActivityDescription");
549         t.setAttribute("collection", collection);
550         t.setAttribute("serviceUri", op.getOperationName());
551         return xQueryResult0(t.toString());
552     }
553 
554     /**
555      * Lookup GRAM activity description for specific hardware/software.
556      * @param op
557      * @return
558      * @throws DatabaseException
559      */
560     public ActivityDescription getGRAMActivityDescription(OperationCandidate op) throws DatabaseException {
561         logger.debug("getGRAMActivtyDescription ...");
562 
563         //retrieve information from database
564         String collection = dgrdlBaseCollection+"/resources";
565         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getGRAMActivityDescription");
566         t.setAttribute("collection", collection);
567         t.setAttribute("hardwareUri", op.getResourceName());
568         t.setAttribute("softwareUri", op.getOperationName());
569         String ret = xQueryResult0(t.toString());
570 //        <job>
571 //          <factoryEndpoint></factoryEndpoint>
572 //          <localUserId></localUserId>
573 //          <executable></executable>
574 //          <queue></queue>
575 //          <maxTime></maxTime>
576 //          <maxWallTime></maxWallTime>
577 //          <maxCpuTime></maxCpuTime>
578 //          <maxMemory></maxMemory>
579 //          <minMemory></minMemory>
580 //          <jobType></jobType>
581 //          <count></count>
582 //          <software>$softwareUri$</software>
583 //          <directory></directory>
584 //          <factoryType></factoryType>
585 //        </job>
586 
587         ActivityDescription ad = new ActivityDescription(op.getResourceName(),op.getOperationName());
588         ad.put("factoryEndpoint", StringUtils.extractStringFromXML(ret,"factoryEndpoint"));
589         ad.put("executable", StringUtils.extractStringFromXML(ret,"executable"));
590         putXmlValueToActivityDescription(ret, ad, "factoryType");
591         putXmlValueToActivityDescription(ret, ad, "directory");
592         putXmlValueToActivityDescription(ret, ad, "localUserId");
593         putXmlValueToActivityDescription(ret, ad, "queue");
594         putXmlValueToActivityDescription(ret, ad, "maxTime");
595         putXmlValueToActivityDescription(ret, ad, "maxWallTime");
596         putXmlValueToActivityDescription(ret, ad, "maxCpuTime");
597         putXmlValueToActivityDescription(ret, ad, "maxMemory");
598         putXmlValueToActivityDescription(ret, ad, "minMemory");
599         putXmlValueToActivityDescription(ret, ad, "jobType");
600         putXmlValueToActivityDescription(ret, ad, "count");
601 
602         // check return value
603         if (ad.get("factoryEndpoint")==null) throw new DatabaseException("Could not retrieve factoryEndpoint for hardware '" + op.getResourceName() + "'");
604         if (ad.get("executable")==null) throw new DatabaseException("Could not retrieve executable for software '" + op.getOperationName() + "'");
605         // return GRAM activity description
606         return ad;
607     }
608 
609     private void putXmlValueToActivityDescription(String ret, ActivityDescription ad, String name) {
610         String str = StringUtils.extractStringFromXML(ret,name);
611         if (str != null) {
612             ad.put(name, str);
613         }
614     }
615 
616     public String lookupManagedJobFactoryService(String resourceUri) throws DatabaseException {
617         if (logger.isDebugEnabled()) {
618             logger.debug("Retrieving GRAM endpoint from XML database for resource '" + resourceUri + "'");
619         }
620         String collection = dgrdlBaseCollection+"/resources";
621         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getManagedJobFactoryService");
622         t.setAttribute("collection", collection);
623         t.setAttribute("resourceUri", resourceUri);
624         return xQueryResult0(t.toString());
625     }
626 
627     public String[] getWorkflowStatistics() throws DatabaseException {
628         String xquery = "declare namespace wf=\"http://www.gridworkflow.org/gworkflowdl\";" +
629                 "<workflowstats>\n" +
630                 "  <total>{count(/wf:workflow)}</total>\n" +
631                 "  <completed>{count(/wf:workflow[wf:property[@name='status']='COMPLETED'])}</completed>\n" +
632                 "  <terminated>{count(/wf:workflow[wf:property[@name='status']='TERMINATED'])}</terminated>\n" +
633                 "  <ctm>{count(/wf:workflow[wf:property[@name='domain']='CTM'])}</ctm>\n" +
634                 "  <erp>{count(/wf:workflow[wf:property[@name='domain']='ERP'])}</erp>\n" +
635                 "  <ffsc>{count(/wf:workflow[wf:property[@name='domain']='FFSC'])}</ffsc>\n" +
636                 "</workflowstats>";
637 
638         return xQueryResult(xquery);
639     }
640 
641     public float getHardwareQuality(String resourceUri) throws DatabaseException {
642         float quality = 0;
643 
644         if (resourceUri == null) throw new DatabaseException("Undefined Resource Uri");
645         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getHardwareQuality");
646         t.setAttribute("collection", dgrdlBaseCollection+"/resources");
647         t.setAttribute("hardwareUri", resourceUri);
648 //        <hardware>
649 //            <type>{data(\$hardware/simpleProperty[@ident="Info.LRMSType"])}</type>
650 //            <load>{data(\$hardware/simpleProperty[@ident="Load"]/field[@ident="Last1Min"])}</load>
651 //            <cpu>{data(\$hardware/simpleProperty[@ident="Architecture"]/field[@ident="SMPSize"])}</cpu>
652 //            <running>{data(\$hardware/simpleProperty[@ident="State.RunningJobs"])}</running>
653 //            <waiting>{data(\$hardware/simpleProperty[@ident="State.WaitingJobs"])}</waiting>
654 //            <total>{data(\$hardware/simpleProperty[@ident="State.TotalJobs"])}</total>
655 //            <waittime>{data(\$hardware/simpleProperty[@ident="State.QueueWaittime"])}</waittime>
656 //        </hardware>
657         String[] res = xQueryResult(t.toString());
658         return calculateQuality(resourceUri, res);
659     }
660 
661     private float calculateQuality(String resourceUri, String[] res) throws DatabaseException {
662         float quality;
663         String lrms = StringUtils.extractStringFromXML(res[0], "type");
664         // Fork
665         if (lrms == null || lrms.length()==0 || lrms.equalsIgnoreCase("Fork")) {
666             String l = StringUtils.extractStringFromXML(res[0], "load");
667             String c = StringUtils.extractStringFromXML(res[0], "cpu");
668             if (l != null && c != null) {
669                 quality = ResourceQualityCalculator.calculateForkQuality(Integer.parseInt(l),Integer.parseInt(c));
670             } else {
671                 throw new DatabaseException("error [Prorater]: no information about load and number of CPUs available in database for resourceUri =" + resourceUri);
672             }
673         }
674         // PBS or LSF
675         else if (lrms.equalsIgnoreCase("PBS") || lrms.equalsIgnoreCase("LSF")) {
676             float LRMSQuality, WaittimeQuality;
677             String run = StringUtils.extractStringFromXML(res[0], "running");
678             String wat = StringUtils.extractStringFromXML(res[0], "waiting");
679             String wtime = StringUtils.extractStringFromXML(res[0], "waittime");
680             // String tot = extractStringFromXML(res[0], "total");
681             if (run != null && wat != null) {
682                 LRMSQuality = ResourceQualityCalculator.calculateBatchQueueQuality(Integer.parseInt(run), Integer.parseInt(wat));
683             } else {
684                 throw new DatabaseException("error [Prorater]: no information about number of jobs in queue available in database for resourceUri =" + resourceUri);
685             }
686             if (wtime != null) {
687             	WaittimeQuality = ResourceQualityCalculator.calculateWaittimeQuality(Float.parseFloat(wtime));
688             } else {
689             	logger.warn("error [Prorater]: no information about estimated queue waittime available in database for resourceUri =" + resourceUri);
690             	WaittimeQuality = 0.9f;
691             }
692             quality = LRMSQuality * WaittimeQuality;
693         }
694         // no lrms defined.
695         else throw new DatabaseException("error [Prorater]: no type of local resource managementent system specified in database for resourceUri =" + resourceUri);
696         return quality;
697     }
698 
699     /**
700      * Increment or decrement the score of a resource.
701      *
702      * @param resourceUri The URI of the resource.
703      * @param increment   can be positive to increase or negative to decrease the score.
704      * @throws DatabaseException
705      */
706     public void incrementResourceScore(String resourceUri, int increment) throws DatabaseException {
707         if (resourceUri == null) return;
708         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("incrementResourceScore");
709         t.setAttribute("collection", dgrdlBaseCollection+"/resources");
710         t.setAttribute("resourceUri", resourceUri);
711         t.setAttribute("increment", increment);
712         xQueryResponse(t.toString());
713     }
714 
715     public synchronized DistributionSet getResourceDurationStatistics(String operationName, String resourceName) throws DatabaseException {
716         if (operationName == null || resourceName == null) return null;
717         // getResourceDurationStatistics(collection,resourceUri,hardwareUri)
718         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getResourceDurationStatistics");
719         t.setAttribute("collection", dgrdlBaseCollection+"/resources");
720         t.setAttribute("resourceUri", operationName);
721         t.setAttribute("hardwareUri", resourceName);
722         String res = xQueryResult0(t.toString());
723         // <duration>
724         //   <numberActivities>5</numberActivities>
725         //   <total.mean>50.2</total.mean>
726         //   <total.stdDeviation>16.85823241030921</total.stdDeviation>
727         //   <total.expSmooth>52.0</total.expSmooth>
728         //   <total.min>30</total.min>
729         //   <total.max>71</total.max>
730         //   <active.mean>33.6</active.mean>
731         //   <active.stdDeviation>9.044335243676008</active.stdDeviation>
732         //   <active.expSmooth>35.0</active.expSmooth>
733         //   <active.min>27</active.min>
734         //   <active.max>49</active.max>
735         // </duration>
736         try {
737             long numberActivities = Long.parseLong(StringUtils.extractStringFromXML(res,"numberActivities"));
738             double totalMean = Double.parseDouble(StringUtils.extractStringFromXML(res,"total.mean"));
739             double totalStdDeviation = Double.parseDouble(StringUtils.extractStringFromXML(res,"total.stdDeviation"));
740             double totalExpSmooth = Double.parseDouble(StringUtils.extractStringFromXML(res,"total.expSmooth"));
741             long totalMin = Long.parseLong(StringUtils.extractStringFromXML(res,"total.min"));
742             long totalMax = Long.parseLong(StringUtils.extractStringFromXML(res,"total.max"));
743             double activeMean = Double.parseDouble(StringUtils.extractStringFromXML(res,"active.mean"));
744             double activeStdDeviation = Double.parseDouble(StringUtils.extractStringFromXML(res,"active.stdDeviation"));
745             double activeExpSmooth = Double.parseDouble(StringUtils.extractStringFromXML(res,"active.expSmooth"));
746             long activeMin = Long.parseLong(StringUtils.extractStringFromXML(res,"active.min"));
747             long activeMax = Long.parseLong(StringUtils.extractStringFromXML(res,"active.max"));
748             LongDistribution totalDist = new LongDistribution(numberActivities,totalMean,totalStdDeviation,totalExpSmooth,totalMin,totalMax);
749             LongDistribution activeDist = new LongDistribution(numberActivities,activeMean,activeStdDeviation,activeExpSmooth,activeMin,activeMax);
750             LongDistribution e = new LongDistribution();
751             return new DistributionSet(e, e, e, activeDist, e, totalDist);
752         } catch (NumberFormatException e) {
753             logger.warn("Could not retrieve duration distribution for operation '"+operationName+"' and resource '"+resourceName+"': "+e);
754             return null;
755         } catch (NullPointerException e) {
756             logger.warn("Could not retrieve duration distribution for operation '"+operationName+"' and resource '"+resourceName+"': "+e);
757             return null;
758         }
759     }
760 
761 
762     public synchronized void updateResourceDurationStatistics(String operationName, String resourceName, DistributionSet distSet) throws DatabaseException {
763         if (operationName == null || resourceName == null || distSet == null) return;
764         // updateResourceDurationStatistics(collection,resourceUri,hardwareUri,activeDist,totalDist)
765         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("updateResourceDurationStatistics");
766         t.setAttribute("collection", dgrdlBaseCollection+"/resources");
767         t.setAttribute("resourceUri", operationName);
768         t.setAttribute("hardwareUri", resourceName);
769         t.setAttribute("activeDist", distSet.durationActive);
770         t.setAttribute("totalDist", distSet.durationTotal);
771         xQueryResponse(t.toString());
772     }
773 
774     public void storeTemporaryWorkflowDirectory(String resourceUri, String host, String directory, String workflowID, String activityID) throws DatabaseException {
775         if (resourceUri == null) throw new DatabaseException("Undefined Resource Uri");
776         String collection = gworkflowdlBaseCollection + "/" + workflowID + "/resources";
777         String document = collection + "/" + resourceUri.replace(':', '_').replace('/', '_') + ".xml";
778         // try to insert temporary workflow directory
779         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("insertTemporaryWorkflowDirectory");
780         t.setAttribute("document", document);
781         t.setAttribute("resourceUri", resourceUri);
782         t.setAttribute("host", host);
783         t.setAttribute("directory", directory);
784         t.setAttribute("activityID", activityID);
785         String res = xQueryResult0(t.toString());
786 
787         // if resource is not available add new resource document
788         if (res.equals("<notAvailable/>")) {
789             t = xqueryWorkflowGroup.getInstanceOf("temporaryWorkflowDirectory");
790             t.setAttribute("xsdPath", xsdPath);
791             t.setAttribute("resourceUri", resourceUri);
792             t.setAttribute("host", host);
793             t.setAttribute("directory", directory);
794             t.setAttribute("activityID", activityID);
795             String resourceXML = t.toString();
796             try {
797                 synchronized (sessionManager.adminLock) {
798                     String sessionid = sessionManager.getAdminSessionId();
799                     admin.createCollection(sessionid, collection);
800                     DataHandler dh = new DataHandler(new ByteArrayDataSource(resourceXML.getBytes()));
801                     admin.store(sessionid, dh, "UTF-8", document, false);
802                 }
803                 if (logger.isDebugEnabled()) {
804                     logger.debug("stored " + document);
805                 }
806             } catch (Exception e) {
807                 sessionManager.resetSession();
808                 throw new DatabaseException("Could not store information to database: " + e, e);
809             }
810         }
811     }
812 
813     public String[] getTemporaryWorkflowDirectories(String workflowID) throws DatabaseException {
814         String collection = gworkflowdlBaseCollection + "/" + workflowID + "/resources";
815         // try to insert temporary workflow directory
816         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getTemporaryWorkflowDirectories");
817         t.setAttribute("collection", collection);
818         // logger.info(t.toString());
819         return xQueryResult(t.toString());
820     }
821 
822     public void removeWorkflow(String workflowID) throws DatabaseException, NoSuchWorkflowException {
823         String collection = gworkflowdlBaseCollection + "/" + workflowID;
824         try {
825             synchronized (sessionManager.adminLock) {
826                 String sessionid = sessionManager.getAdminSessionId();
827                 boolean sucess = admin.removeCollection(sessionid, collection);
828                 if (!sucess) {
829                     throw new NoSuchWorkflowException("Could not remove workflow with ID '"+workflowID+"' from database!");
830                 }
831             }
832             logger.debug("removed " + collection);
833         } catch (RemoteException e) {
834             sessionManager.resetSession();
835             throw new DatabaseException("Could not remove workflow from database: " + e, e);
836         }
837 
838     }
839 
840     public String[] getWorkflowIDs() throws DatabaseException {
841         logger.debug("Retrieving workflowIDs from XML database ...");
842         String[] ret;
843         try {
844             synchronized (sessionManager.queryLock) {
845                 String sessionid = sessionManager.getQuerySessionId();
846                 ret = query.listCollection(sessionid, gworkflowdlBaseCollection).getCollections().getElements();
847             }
848         } catch (Exception e) {
849             sessionManager.resetSession();
850             throw new DatabaseException("Could not retrieve workflow: " + e, e);
851         }
852         return ret;
853     }
854 
855     public String[] getData(String workflowID, String placeID) throws DatabaseException, NoSuchWorkflowException {
856         logger.debug("Retrieving data from place " + placeID + " of workflow " + workflowID + " from XML database ...");
857         String collection = gworkflowdlBaseCollection + "/" + workflowID;
858         String document = getLatestWorkflowDocument(collection);
859         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getData");
860         t.setAttribute("document", document);
861         t.setAttribute("placeID", placeID);
862         return xQueryResult(t.toString());
863     }
864 
865     public Map<String, String[]> getWorkflowStatusMap() throws DatabaseException {
866         logger.debug("Retrieving workflowStatusArray from XML database ...");
867 
868         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getWorkflowStatusArray");
869         t.setAttribute("collection", gworkflowdlBaseCollection);
870 //        logger.info(t.toString());
871         String[] res = xQueryResult(t.toString());
872 
873         Map<String, String[]> ret = null;
874         if (res.length > 0) {
875             ret = new HashMap<String, String[]>(res.length);
876             for (String xml : res) {
877 //                    logger.info(xml);
878                 String[] array = new String[STATUS_ARRAY_NAMES.length];
879                 int i = 0;
880                 for (String name : STATUS_ARRAY_NAMES) {
881                     String value = StringUtils.extractStringFromXML(xml, name);
882                     array[i++] = name + "=" + ((value != null) ? value : "0");
883                 }
884                 ret.put(array[0].substring(array[0].indexOf("=") + 1), array);
885             }
886         }
887         return ret;
888     }
889 
890     public String getDescription(String workflowID) throws DatabaseException, NoSuchWorkflowException {
891         String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
892         String document = getLatestWorkflowDocument(collectionStr);
893         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getDescription");
894         t.setAttribute("document", document);
895         // logger.info("XQuery:\n"+t.toString());
896         return xQueryResult0(t.toString());
897     }
898 
899     public void setDescription(String workflowID, String description) throws DatabaseException, NoSuchWorkflowException {
900         String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
901         String document = getLatestWorkflowDocument(collectionStr);
902         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("setDescription");
903         t.setAttribute("document", document);
904         t.setAttribute("description", description);
905         // logger.info("XQuery:\n"+t.toString());
906         xQueryResponse(t.toString());
907     }
908 
909     public void setProperty(String workflowID, String name, String value) throws DatabaseException, NoSuchWorkflowException {
910         String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
911         String document = getLatestWorkflowDocument(collectionStr);
912         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("setProperty");
913         t.setAttribute("document", document);
914         t.setAttribute("name", name);
915         t.setAttribute("value", value);
916         // logger.info("XQuery:\n"+t.toString());
917         xQueryResponse(t.toString());
918     }
919 
920     public String getProperty(String workflowID, String name) throws DatabaseException, NoSuchWorkflowException {
921         String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
922         String document = getLatestWorkflowDocument(collectionStr);
923         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getProperty");
924         t.setAttribute("document", document);
925         t.setAttribute("name", name);
926         // logger.info("XQuery:\n"+t.toString());
927         return xQueryResult0(t.toString());
928     }
929 
930     public String[][] getProperties(String workflowID) throws DatabaseException, NoSuchWorkflowException {
931         String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
932         String document = getLatestWorkflowDocument(collectionStr);
933         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getProperties");
934         t.setAttribute("document", document);
935         String[] res = xQueryResult(t.toString());
936 
937         String[][] ret;
938         ret = new String[res.length + 1][2];
939         for (int i = 0; i < res.length; i++) {
940             ret[i][0] = StringUtils.extractStringFromXML(res[i], "name");
941             ret[i][1] = StringUtils.extractStringFromXML(res[i], "value");
942         }
943         ret[res.length][0] = "level";
944         ret[res.length][1] = "DATABASE";
945 
946         return ret;
947     }
948 
949     public String[] getFaultToleranceStatistics() throws DatabaseException, NoSuchWorkflowException {
950         Map statusmap = getWorkflowStatusMap();
951         Iterator iter = statusmap.keySet().iterator();
952         String[] ret = new String[statusmap.size()];
953         int i=0;
954         while (iter.hasNext()) {
955             String workflowID = (String) iter.next();
956             logger.debug(workflowID);
957             String collectionStr = gworkflowdlBaseCollection + "/" + workflowID;
958             String document = getLatestWorkflowDocument(collectionStr);
959             StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getFaultToleranceStatistics");
960             t.setAttribute("document", document);
961             // logger.info("XQuery:\n"+t.toString());
962             ret[i++]=xQueryResult0(t.toString());
963         }
964         return ret;
965     }
966 
967     /**
968      * Get a list of all available resources.
969      * The information is returned in an Array of XML Strings:
970      * Example:
971      * <pre>
972      * String[] resources = getAvailableResources("urn:dgrdl:software");
973      * resources[0]=
974      * &lt;resource&gt;
975      *   &lt;uri&gt;software:cat-fhrg&lt;/uri&gt;                     // Operation URI
976      *   &lt;classUri&gt;urn:dgrdl:software:cat&lt;/classUri&gt;      // Operation Class URI (workflow: operationClass name="...")
977      *   &lt;name&gt;cat&lt;/name&gt;                                 // Operation Name
978      *   &lt;description&gt;Program that concatenates two files&lt;description&gt; // Operation Description
979      * &lt;/resource&gt;
980      * </pre>
981      * @param ofClass The class which this resource should be part of. This method only checkes whether the class starts
982      * with the given ofClass parameter, e.g., "urn:dgrdl:software" would match both classes: "urn:dgrdl:software:A"
983      * as well as "urn:dgrdl:software:B". If <code>ofClass</code> is <code>null</code> then all classes of resources are
984      * returned.
985      * @return The list of resources of a certain class as array of XML strings.
986      * @throws DatabaseException
987      */
988     public String[] getAvailableResources(String ofClass) throws DatabaseException {
989         logger.debug("Retrieving available resources from XML database ...");
990         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getAvailableResources");
991         t.setAttribute("collection", dgrdlBaseCollection+"/resources");
992         if (ofClass != null) t.setAttribute("ofClass", ofClass);
993         //logger.info(t.toString());
994         String[] res = xQueryResult(t.toString());
995         return res;
996     }
997 
998     public String getResourceDescription(String resourceUri) throws DatabaseException {
999         logger.debug("Retrieving resource description from XML database ...");
1000         StringTemplate t = xqueryWorkflowGroup.getInstanceOf("getResourceDescription");
1001         t.setAttribute("collection", dgrdlBaseCollection+"/resources");
1002         t.setAttribute("resourceUri", resourceUri);
1003 //        logger.info(t.toString());
1004         return xQueryResult0(t.toString());
1005     }
1006 
1007     /**
1008      * ************************************************************
1009      * private methods
1010      * *************************************************************
1011      */
1012     private StringTemplateGroup loadSTGFileFromClasspath
1013             (String
1014                     templateFilename) {
1015         String location = "templates";
1016         StringTemplateGroupLoader loader = new CommonGroupLoader(location, new LogStringTemplateErrorListener());
1017         StringTemplateGroup.registerGroupLoader(loader);
1018         StringTemplateGroup.registerDefaultLexer(DefaultTemplateLexer.class);
1019         return StringTemplateGroup.loadGroup(templateFilename);
1020     }
1021 
1022     private class LogStringTemplateErrorListener implements StringTemplateErrorListener {
1023         public void error(String msg, Throwable e) {
1024             logger.error(msg, e);
1025         }
1026 
1027         public void warning(String msg) {
1028             logger.warn(msg);
1029         }
1030     }
1031 
1032     public String getLatestWorkflowDocument(String collectionStr) throws DatabaseException, NoSuchWorkflowException {
1033         String document;
1034         String[] workflowVersions;
1035 
1036         try {
1037             synchronized (sessionManager.queryLock) {
1038                 String sessionid = sessionManager.getQuerySessionId();
1039                 workflowVersions = query.listCollection(sessionid, collectionStr).getResources().getElements();
1040             }
1041         } catch (RemoteException e) {
1042             sessionManager.resetSession();
1043             if (e.getMessage().contains("collection "+collectionStr+" not found")) {
1044                 throw new NoSuchWorkflowException(e.getMessage(),e);
1045             }
1046             else {
1047                 throw new DatabaseException("Could not access remote database: " + e, e);
1048             }
1049         }
1050 
1051         if (logger.isDebugEnabled()) {
1052             for (String storedWorkflow : workflowVersions) {
1053                 logger.debug("Available workflow version: " + storedWorkflow);
1054             }
1055         }
1056 
1057         String latestVersion = workflowVersions[0];
1058         for (int i = 0; i < workflowVersions.length; i++) {
1059             if (workflowVersions[i].compareTo(latestVersion) > 0) {
1060                 latestVersion = workflowVersions[i];
1061             }
1062         }
1063         if (logger.isDebugEnabled()) {
1064             logger.debug("Latest workflow version: " + latestVersion);
1065         }
1066 
1067         document = collectionStr + "/" + latestVersion;
1068         return document;
1069     }
1070 
1071 }