View Javadoc

1   /*
2    * Copyright 2010 Fraunhofer Gesellschaft, Munich, Germany,
3    * for its Fraunhofer Institute for Computer Architecture and Software
4    * Technology (FIRST), Berlin, Germany. All rights reserved.
5    * http://www.first.fraunhofer.de/
6    */
7   
8   package net.kwfgrid.gwes;
9   
10  import net.kwfgrid.gwes.exception.*;
11  import net.kwfgrid.gwes.operationmapper.OperationMapper;
12  import net.kwfgrid.gwes.prorater.ResourceCoAllocator;
13  import net.kwfgrid.gwes.prorater.WeightAndRank;
14  import net.kwfgrid.gwes.workflowanalyzer.WorkflowStatistics;
15  import net.kwfgrid.gwes.workflowanalyzer.DoubleDistribution;
16  import net.kwfgrid.gworkflowdl.analysis.Conflict;
17  import net.kwfgrid.gworkflowdl.analysis.Decision;
18  import net.kwfgrid.gworkflowdl.analysis.WorkflowAnalyzer;
19  import net.kwfgrid.gworkflowdl.protocol.server.ServerWorkflow;
20  import net.kwfgrid.gworkflowdl.structure.*;
21  import org.apache.log4j.Logger;
22  import org.ietf.jgss.GSSCredential;
23  import org.ietf.jgss.GSSException;
24  
25  import java.io.IOException;
26  import java.util.*;
27  
28  /**
29   * The GWorkflowDLHandler is the dynamic and interactive workflow enactment machine for workflows described with
30   * the Generic Workflow Description Language (GWorkflowDL).
31   * @author Andreas Hoheisel
32   *         (<a href="http://www.andreas-hoheisel.de">www.andreas-hoheisel.de</a>)
33   * @author Dietmar Sommerfeld / GWDG (weight and rank part)
34   * @version $Id: GWorkflowDLHandler.java 1540 2011-08-17 13:30:37Z hoheisel $
35   */
36  public class GWorkflowDLHandler extends GenericWorkflowHandler {
37  
38      /**
39       * log4j logger.
40       */
41      private final static Logger logger = Logger.getLogger(GWorkflowDLHandler.class);
42  
43      /**
44       * net.kwfgrid.gworkflowdl.protocol.server.ServerWorkflow.
45       * (net.kwfgrid.gworkflowdl.structure.Workflow)
46       */
47      private ServerWorkflow workflow;
48  
49      /**
50       * GSS credential manager used to invoke activities.
51       */
52      private CredentialManager credentialManager;
53  
54      /**
55       * Operation mapper Red -> Yellow
56       */
57      private static OperationMapper red2Yellow;
58  
59      /**
60       * Operation mapper Yellow -> Blue
61       */
62      private static OperationMapper yellow2Blue;
63  
64      /**
65       * Operation mapper Blue -> Green
66       */
67      private static OperationMapper blue2Green;
68  
69      /**
70       * ProvenanceHandler
71       */
72      private static ProvenanceHandler provenanceHandler;
73  
74      /**
75       * Maps operation types to activity implementation classes, e.g.,
76       * "soap" to "de.fraunhofer.de.plugin.wsactivity.WSActivity.class"
77       */
78      private static final Map<String,Class> activityClassMap = new HashMap<String,Class>();
79  
80      /**
81       * Condition checker for conditions of transitions.
82       */
83      private final ConditionChecker conditionChecker;
84  
85      /**
86       * Workflow statistics
87       */
88      private WorkflowStatistics workflowStatistics;
89  
90      /**
91       * Minimum time period in ms for sleeping between two cycles if the workflow was not modified.
92       * Default is 20ms
93       */
94      private final long SLEEP_TIME_MIN;
95  
96      /**
97       * Maximum time period in ms for sleeping between two cycles if the workflow was not modified.
98       * Default is 60000ms = 1Minute
99       */
100     private final long SLEEP_TIME_MAX;
101 
102     /**
103      * Abort this workflow.
104      */
105     private boolean abort = false;
106 
107     /**
108      * Suspend (pause) this workflow.
109      */
110     private boolean suspend = false;
111 
112     /**
113      * Map to store the transition IDs (keys) and their corresponding TransitionStatus.
114      */
115     private final Map<String, TransitionStatus> transtionStatusTable;
116 
117     /**
118      * Map of current Decisions. The keys are the decision IDs containing the hash code of the decision toString().
119      */
120     private Map<String, Decision> currentDecisions;
121 
122     /**
123      * List of transitions, that are part of an conflict (not evaluating the conditions).
124      */
125     private final List<Transition> conflictTransitions;
126 
127     /**
128      * List of enabled Transitions with time-dependent false conditions.
129      */
130     private final ArrayList<Transition> enabledTimeDependendFalseTransitions;
131 
132     /**
133      * Policy for the fault management.
134      * Default policy is <code>FAULT_MANAGEMENT_ABORT_ON_ACTIVITY_TERMINATED</code>.
135      * use
136      *
137      * @see #FAULT_MANAGEMENT_ABORT_ON_ACTIVITY_TERMINATED
138      * @see #FAULT_MANAGEMENT_CONTINUE_ON_ACTIVITY_TERMINATED
139      * @see #FAULT_MANAGEMENT_SUSPEND_ON_ACTIVITY_TERMINATED
140      */
141     private int faultManagementPolicy;
142 
143     /**
144      * Abort the whole workflow if a single activity terminates (0).
145      */
146     public static final int FAULT_MANAGEMENT_ABORT_ON_ACTIVITY_TERMINATED = 0;
147 
148     /**
149      * Continue to execute the workflow, even if one or more activities terminate (1).
150      */
151     public static final int FAULT_MANAGEMENT_CONTINUE_ON_ACTIVITY_TERMINATED = 1;
152 
153     /**
154      * Suspend the workflow if an activity terminates (2).
155      */
156     public static final int FAULT_MANAGEMENT_SUSPEND_ON_ACTIVITY_TERMINATED = 2;
157 
158     /**
159      * Policy for workflow persistence.
160      * Default is <code>WORKFLOW_PERSISTENCE_TRUE</code>
161      * use
162      *
163      * @see #WORKFLOW_PERSISTENCE_FALSE
164      * @see #WORKFLOW_PERSISTENCE_TRUE
165      */
166     private int workflowPersistence;
167 
168     /**
169      * Do not store workflows in database automatically (0).
170      */
171     public static final int WORKFLOW_PERSISTENCE_FALSE = 0;
172 
173     /**
174      * Store workflows in database automatically (1).
175      */
176     public static final int WORKFLOW_PERSISTENCE_TRUE = 1;
177 
178     /**
179      * Occurrence sequence stores the sequence of fired transitions.
180      * Format: transitionID space transitionID space ...
181      * String is null if occurrence sequence should not be logged.
182      */
183     private String occurrenceSequence = null;
184 
185     /**
186      * Constructor.
187      *
188      * @param engine              The parent GWES Engine
189      * @param originalWorkflowDescriptionType The original type of workflow description, e.g., "GWorkflowDL version 2.0"
190      * @param workflowDescription The workflow description in GWorkflowDL format
191      * @param userIdCredential              user identifier
192      */
193     public GWorkflowDLHandler(GWESEngine engine, String originalWorkflowDescriptionType, String workflowDescription, String userIdCredential) throws WorkflowSecurityException, LoggingException {
194         super(engine, originalWorkflowDescriptionType, workflowDescription, CredentialManager.extractUserId(userIdCredential));
195         transtionStatusTable = new HashMap<String, TransitionStatus>();
196         conflictTransitions = new ArrayList<Transition>();
197         currentDecisions = new HashMap<String, Decision>();
198         enabledTimeDependendFalseTransitions = new ArrayList<Transition>();
199         conditionChecker = new ConditionChecker();
200 
201         // set operation mapper red -> yellow
202         try {
203             if (red2Yellow == null)
204                 red2Yellow = constructOperationMapper(Constants.PROP_SYSTEM_GWES_OPERATIONMAPPER_RED2YELLOW_CLASS);
205         } catch (OperationMapperException e) {
206             logger.warn("exception during construction of mapper red -> yellow: " + e);
207         }
208 
209         // set operation mapper yellow -> blue
210         try {
211             if (yellow2Blue == null)
212                 yellow2Blue = constructOperationMapper(Constants.PROP_SYSTEM_GWES_OPERATIONMAPPER_YELLOW2BLUE_CLASS);
213         } catch (OperationMapperException e) {
214             logger.warn("exception during construction of mapper yellow -> blue: " + e);
215         }
216 
217         // set operation mapper blue -> green
218         try {
219             if (blue2Green == null)
220                 blue2Green = constructOperationMapper(Constants.PROP_SYSTEM_GWES_OPERATIONMAPPER_BLUE2GREEN_CLASS);
221         } catch (OperationMapperException e) {
222             logger.warn("exception during construction of mapper blue -> green: " + e);
223         }
224 
225         // set provenance handler
226         try {
227             if (provenanceHandler == null) {
228                 provenanceHandler = constructProvenanceHandler(Constants.PROP_SYSTEM_GWES_PROVENANCE_HANDLER_CLASS);
229             }
230         } catch (ProvenanceHandlerException e) {
231             logger.debug("exception during construction of provenance handler: "+e);
232         }
233 
234         // create credential manager
235         if (Boolean.getBoolean(Constants.PROP_SYSTEM_GWES_CREDENTIAL_WORKFLOW_CERTIFICATE)
236                 && CredentialManager.extractCredential(userIdCredential) != null) {
237             try {
238                 credentialManager = new CredentialManager(userIdCredential);
239             } catch (GSSException e) {
240                 throw new WorkflowSecurityException("Could not create credential manager from userIdCredential: "+e,e);
241             }
242         } else if (Boolean.getBoolean(Constants.PROP_SYSTEM_GWES_CREDENTIAL_DEFAULT_CERTIFICATE)) {
243             try {
244                 credentialManager = new CredentialManager();
245             } catch (GSSException e) {
246                 throw new WorkflowSecurityException("Could not load default Grid certificate: "+e,e);
247             } catch (IOException e) {
248                 throw new WorkflowSecurityException("Could not load default Grid certificate: "+e,e);
249             }
250         } else {
251             logger.info("Not using any credential manager.");
252         }
253 
254         SLEEP_TIME_MIN = Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_WORKFLOW_SLEEPTIME_MIN, "20"));
255         SLEEP_TIME_MAX = Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_WORKFLOW_SLEEPTIME_MAX, "60000"));
256     }
257 
258     private static OperationMapper constructOperationMapper(String propertyclassname) throws OperationMapperException {
259         String classname = System.getProperty(propertyclassname);
260         if (classname == null)
261             throw new OperationMapperException("Property \"" + propertyclassname + "\" not available. Please set this property in gwes.properties.");
262         if (classname.length() == 0)
263             throw new OperationMapperException("Property \"" + propertyclassname + "\" has no value");
264         OperationMapper mapper;
265         try {
266             Class c = Class.forName(classname);
267             mapper = (OperationMapper) c.getConstructor().newInstance();
268         } catch (Exception e) {
269             throw new OperationMapperException("Exception during constructing operation mapper with classname '"+classname+"':" + e, e);
270         }
271         return mapper;
272     }
273 
274     private static ProvenanceHandler constructProvenanceHandler(String propertyclassname) throws ProvenanceHandlerException {
275         String classname = System.getProperty(propertyclassname);
276         if (classname == null)
277             throw new ProvenanceHandlerException("Property \"" + propertyclassname + "\" not available. Please set this property in gwes.properties.");
278         if (classname.length() == 0)
279             throw new ProvenanceHandlerException("Property \"" + propertyclassname + "\" has no value");
280         ProvenanceHandler handler;
281         try {
282             Class c = Class.forName(classname);
283             handler = (ProvenanceHandler) c.getConstructor().newInstance();
284         } catch (Exception e) {
285             throw new ProvenanceHandlerException("Exception during constructing provenance handler with classname '"+classname+"':" + e, e);
286         }
287         return handler;
288     }
289 
290     private Activity constructActivity(String type, TransitionOccurrence to, OperationCandidate oc) throws ActivityException {
291         Activity activity = null;
292         String classname = null;
293         Class c;
294         try {
295             Throwable error =  null;
296             try {
297                 // search for cached mapping to class
298                 c = activityClassMap.get(type);
299                 if (c==null) {
300                     String propertyclassname = Constants.PROP_SYSTEM_GWES_ACTIVITY_+type+Constants.PROP_SYSTEM_GWES_ACTIVITY__CLASS;
301                     classname = System.getProperty(propertyclassname);
302                     if (classname == null) throw new ActivityException("Could not map operation type \""+type+"\" onto matching Activity implementation. Please define property \"" + propertyclassname + "\" in gwes.properties.");
303                     c = Class.forName(classname);
304                     // cache mapping
305                     activityClassMap.put(type,c);
306                 } else {
307                     classname = c.getName();
308                 }
309                 // construct instance of activity
310                 activity = (Activity) c
311                         .getConstructor(GenericWorkflowHandler.class, TransitionOccurrence.class, OperationCandidate.class)
312                         .newInstance(this,to,oc);
313                 if (activity == null) throw new ActivityException("Activity is null: Exception during constructing activity with classname '"+classname+"'");
314             } catch (ActivityException ae) {
315                 error = ae; throw ae;
316             } catch (ClassNotFoundException e) {
317                 ActivityException ae = new ActivityException("Exception during constructing activity with classname '"+classname+"': " + e, e);
318                 error = ae; throw ae;
319             } catch (Throwable e) {
320                 ActivityException ae = new ActivityException("Unexpected Exception during constructing activity with classname '"+classname+"': " + e, e);
321                 error = ae; throw ae;
322             } finally {
323                 if (error == null) { // SUCCESS
324                     if (getGlog().i()) getGlog().logEvent(
325                                     GWESLogger.Event.ACTIVITY_CONSTRUCT,
326                                     getUserID(),
327                                     this,
328                                     new String[]{"activityID","class","operationType"},
329                                     null,
330                                     new String[]{activity.getID(),classname,type});
331                 } else {             // ERROR
332                     getGlog().logEvent(
333                             GWESLogger.Event.ACTIVITY_CONSTRUCT,
334                             GWESLogger.EventOutcome.ERROR,
335                             getUserID(),
336                             this,
337                             new String[]{"activityID","class","operationType"},
338                             null,
339                             new String[]{(activity != null ? activity.getID() : null),classname,type},
340                             error);
341                 }
342             }
343         } catch (LoggingException e) {
344             logger.error("exception:\n" + e, e);
345             throw new ActivityException("Logging exception during constructActivity(): "+e,e);
346         }
347 
348         return activity;
349     }
350 
351     /****************************************************************************************************************
352      * Methods from abstract class GenericWorkflowHandler                                                              *
353      ****************************************************************************************************************/
354 
355     /**
356      * K-Wf Grid specific implementation of <CODE>GenericWorkflowHandler.initiateWorkflow()</CODE>
357      */
358     public synchronized void initiateWorkflow() throws WorkflowFormatException, StateTransitionException {
359         logger.debug("initiateWorkflow ...");
360 
361         //check current status
362         if (getStatus() != WorkflowStatus.STATUS_UNDEFINED) {
363             logger.error("invalid status " + getStatusAsString() + " for initiating the workflow");
364             throw new StateTransitionException("Invalid status " + getStatusAsString() + " for initiating the workflow");
365         }
366 
367         //parse workflow description and create workflow object.
368         try {
369             workflow = (ServerWorkflow) Factory.newWorkflow(getID());
370             // include workflow listeners
371             workflow.addStructureListener(WorkflowModificationListener.getInstance());
372             // parse workflow description
373             workflow.fromXML(super.getWorkflowDescription());
374             if (logger.isDebugEnabled()) {
375                 logger.debug("workflow description: " + workflow.getDescription());
376             }
377         } catch (CapacityException e) {
378             logger.error("exception: " + e, e);
379             throw new WorkflowFormatException("Wrong number of tokens: " + e, e);
380         } catch (IllegalArgumentException e) {
381             logger.error("exception: " + e, e);
382             throw new WorkflowFormatException("Wrong workflow ID. Must begin with a letter or underscore: " + e, e);
383         }
384 
385         // set conversion property
386         if (!getOriginalWorkflowDescriptionType().equals(Constants.WORKFLOW_DESCRIPTION_TYPE_DEFAULT)) {
387             getWorkflow().getProperties().put(Constants.PROP_WORKFLOW_CONVERTED_FROM,getOriginalWorkflowDescriptionType());
388         }
389 
390         // construct workflow statistics
391         workflowStatistics = new WorkflowStatistics(workflow);
392 
393         //Analyze the workflow, default is Karp-Miller Tree analysis
394         analyzeWorkflow();
395 
396         //set fault management policy
397         String faultManagementString = workflow.getProperties().get(Constants.PROP_FAULT_MANAGEMENT_POLICY);
398         if (faultManagementString != null) {
399             if (faultManagementString.equalsIgnoreCase("abortonactivityterminated"))
400                 setFaultManagementPolicy(FAULT_MANAGEMENT_ABORT_ON_ACTIVITY_TERMINATED);
401             else if (faultManagementString.equalsIgnoreCase("continueonactivityterminated"))
402                 setFaultManagementPolicy(FAULT_MANAGEMENT_CONTINUE_ON_ACTIVITY_TERMINATED);
403             else if (faultManagementString.equalsIgnoreCase("suspendonactivityterminated"))
404                 setFaultManagementPolicy(FAULT_MANAGEMENT_SUSPEND_ON_ACTIVITY_TERMINATED);
405                 // set default value
406             else {
407                 logger.warn("The fault management policy \"" + faultManagementString + "\" is not supported. Falling back to \"AbortOnActivityTerminated\".");
408                 workflow.getProperties().put(Constants.PROP_FAULT_MANAGEMENT_POLICY, "AbortOnActivityTerminated");
409                 workflow.getProperties().put(Constants.PROP_WARN_ + createNewErrorID(),
410                         "The fault management policy \"" + faultManagementString + "\" is not supported. Falling back to \"AbortOnActivityTerminated\".");
411             }
412         } else {
413             // set default value
414             workflow.getProperties().put(Constants.PROP_FAULT_MANAGEMENT_POLICY, "AbortOnActivityTerminated");
415         }
416 
417         //set workflow activity maxattempts if not set yet
418         String attemptsStr = workflow.getProperties().get(Constants.PROP_ACTIVITY_MAXATTEMPTS);
419         if (attemptsStr == null) {
420             // evaluate deprecated property PROP_REDISTRIBUTION_OF_FAILED_ACTIVITIES for backward compatibility
421             String dfaString = workflow.getProperties().get(Constants.PROP_REDISTRIBUTION_OF_FAILED_ACTIVITIES);
422             int attempts = 1;
423             // if PROP_REDISTRIBUTION_OF_FAILED_ACTIVITIES is not set or is TRUE then evaluate system property
424             if (dfaString == null || dfaString.equalsIgnoreCase(Constants.TRUE)) {
425                 attemptsStr = System.getProperty(Constants.PROP_SYSTEM_GWES_ACTIVITY_MAXATTEMPTS);
426                 if (attemptsStr != null) {
427                     try {
428                         attempts = Integer.parseInt(attemptsStr);
429                     } catch (NumberFormatException e) {
430                         logger.warn("The property value of "+Constants.PROP_SYSTEM_GWES_ACTIVITY_MAXATTEMPTS+" has wrong format: "+e+"\nSetting maxattempts to 1");
431                     }
432                 }
433             }
434             // write back to workflow document
435             workflow.getProperties().put(Constants.PROP_ACTIVITY_MAXATTEMPTS,""+attempts);
436         }
437 
438         //set workflow persistence policy
439         // workflow property overrides gwes.properties overrides default value "true".
440         try {
441             setWorkflowPersistence(workflow.getProperties().get(Constants.PROP_WORKFLOW_PERSISTENCE));
442         } catch (Exception e) {
443             logger.warn(e);
444             setWorkflowPersistence(System.getProperty(Constants.PROP_SYSTEM_WORKFLOW_PERSISTENCE, Constants.TRUE));
445         }
446 
447         //put userID to workflow properties.
448         if (getUserID() != null) {
449             workflow.getProperties().put(Constants.PROP_USER_ID, getUserID());
450         }
451 
452         //put credential DN to workflow properties.
453         if (credentialManager != null) {
454             try {
455                 GSSCredential credential = credentialManager.getCredential();
456                 if (credential != null) {
457                     workflow.getProperties().put(Constants.PROP_DN, credential.getName().toString());
458                 }
459             } catch (GSSException e) {
460                 logger.warn("WARNING: Could not load user credential: " + e);
461             } catch (IOException e) {
462                 logger.warn("WARNING: Could not load user credential: " + e);
463             }
464         }
465 
466         // set storage of occurrence sequence
467         occurrenceSequence = workflow.getProperties().get(Constants.PROP_OCCURRENCE_SEQUENCE);
468 
469         //set new status
470         setStatus(WorkflowStatus.STATUS_INITIATED);
471         logger.debug("initiateWorkflow ... done");
472 
473         // set birthday
474         workflow.getProperties().put(Constants.PROP_BIRTHDAY_MS, "" + getBirthday());
475 
476         //store workflow and data tokens in database
477         if (workflowPersistence == WORKFLOW_PERSISTENCE_TRUE) {
478             try {
479                 storeWorkflow();
480                 storeDataTokens();
481             } catch (DatabaseException e) {
482                 logger.warn("WARNING Workflow not stored due to database problems: " + e);
483             }
484         }
485     }
486 
487     /**
488      * K-Wf Grid specific implementation of <CODE>GenericWorkflowHandler.startWorkflow()</CODE>. Calls
489      * <CODE>Thread.start()</CODE>.
490      */
491     public synchronized void startWorkflow() throws StateTransitionException {
492         logger.debug("startWorkflow() ...");
493 
494         //check status
495         if (getStatus() != WorkflowStatus.STATUS_INITIATED) {
496             logger.error("invalid status " + getStatusAsString() + " for starting the workflow");
497             throw new StateTransitionException("Invalid status " + getStatusAsString() + " for starting the workflow");
498         }
499 
500         // initialize provenance handler if available
501         if (provenanceHandler != null) provenanceHandler.initialize(workflow);
502 
503         //start workflow
504         if (!isAlive()) {
505             start();
506             setStatus(WorkflowStatus.STATUS_RUNNING);
507         }
508 
509         logger.debug("startWorkflow() ... done");
510     }
511 
512     public synchronized void suspendWorkflow() throws StateTransitionException {
513         /* if status was ACTIVE it will first switch to RUNNING before going to SUSPENDED */
514         if (getStatus() == WorkflowStatus.STATUS_RUNNING || getStatus() == WorkflowStatus.STATUS_ACTIVE) {
515             suspend = true;
516             try {
517                 waitForStatusChangeTo(WorkflowStatus.STATUS_SUSPENDED);
518             } catch (InterruptedException e) {
519                 logger.error("exception: " + e, e);
520             }
521         } else {
522             logger.error("invalid status " + getStatusAsString() + " for suspending the workflow");
523             throw new StateTransitionException("Invalid status " + getStatusAsString() + " for suspending the workflow");
524         }
525     }
526 
527     public synchronized void suspendWorkflowAsync() throws StateTransitionException {
528         /* if status was ACTIVE it will first switch to RUNNING before going to SUSPENDED */
529         if (getStatus() == WorkflowStatus.STATUS_RUNNING || getStatus() == WorkflowStatus.STATUS_ACTIVE) {
530             suspend = true;
531         } else {
532             logger.error("invalid status " + getStatusAsString() + " for suspending the workflow");
533             throw new StateTransitionException("Invalid status " + getStatusAsString() + " for suspending the workflow");
534         }
535     }
536 
537     public synchronized void resumeWorkflow() throws StateTransitionException {
538         if (getStatus() == WorkflowStatus.STATUS_SUSPENDED && isAlive()) {
539             suspend = false;
540             setStatus(WorkflowStatus.STATUS_RUNNING);
541         } else {
542             logger.error("invalid status " + getStatusAsString() + " for resuming the workflow");
543             throw new StateTransitionException("Invalid status " + getStatusAsString() + " for resuming the workflow");
544         }
545     }
546 
547     public synchronized void abortWorkflow() throws StateTransitionException {
548 
549         // you cannot abort an UNDEFINED, COMPLETED, or TERMINATED workflow
550         if (getStatus() == WorkflowStatus.STATUS_COMPLETED || getStatus() == WorkflowStatus.STATUS_TERMINATED) {
551             logger.error("invalid status " + getStatusAsString() + " for aborting the workflow");
552             throw new StateTransitionException("Invalid status " + getStatusAsString() + " for aborting the workflow");
553         }
554 
555         abort = true;
556 
557         // Workflow is NOT running nor active
558         if (getStatus() == WorkflowStatus.STATUS_INITIATED || getStatus() == WorkflowStatus.STATUS_SUSPENDED || getStatus() == WorkflowStatus.STATUS_UNDEFINED) {
559             setStatus(WorkflowStatus.STATUS_TERMINATED);
560         }
561 
562         // Workflow is running or active
563         else {
564             try {
565                 waitForStatusChangeToCompletedOrTerminated();
566             } catch (InterruptedException e) {
567                 logger.error("exception: " + e, e);
568             }
569         }
570     }
571 
572     public synchronized void abortWorkflowAsync() throws StateTransitionException {
573         // you cannot abort an UNDEFINED, COMPLETED, or TERMINATED workflow
574         if (getStatus() == WorkflowStatus.STATUS_COMPLETED || getStatus() == WorkflowStatus.STATUS_TERMINATED) {
575             logger.error("invalid status " + getStatusAsString() + " for aborting the workflow");
576             throw new StateTransitionException("Invalid status " + getStatusAsString() + " for aborting the workflow");
577         }
578 
579         abort = true;
580 
581         // Workflow is NOT running nor active
582         if (getStatus() == WorkflowStatus.STATUS_INITIATED || getStatus() == WorkflowStatus.STATUS_SUSPENDED || getStatus() == WorkflowStatus.STATUS_UNDEFINED) {
583             setStatus(WorkflowStatus.STATUS_TERMINATED);
584         }
585     }
586 
587     /**
588      * Stores this workflow. Implement this method in all derived classes!
589      *
590      * @return The location identifier of the storage location.
591      */
592     public String storeWorkflow() throws DatabaseException {
593         updateWorkflowRuntimeVersion();
594         return XMLDB.getInstance().storeWorkflow(workflow);
595     }
596 
597     /**
598      * Get specific data that is hold inside the workflow and that is referenced by a data place identifier.
599      *
600      * @param placeID The place identifier that refers to the data
601      * @return The data as an array of Strings or <code>null</code> if place ID is not valid.
602      */
603     public String[] getData(String placeID) {
604         Place place = workflow.getPlace(placeID);
605         if (place == null) {
606             logger.warn("getData(" + placeID + "): place is not available!");
607             return null;
608         }
609         Token[] tokens = place.getTokens();
610         String[] ret = new String[tokens.length];
611         for (int i = 0; i < tokens.length; i++) {
612             // data token
613             Data data = tokens[i].getData();
614             if (data != null) {
615                 ret[i] = tokens[i].getData().toXML();
616             } else {
617                 // control token
618                 Boolean control = tokens[i].getControl();
619                 if (control != null) {
620                     ret[i] = (control ? "<control>true</control>" : "<control>false</control>");
621                 } else ret[i] = null;
622             }
623         }
624         return ret;
625     }
626 
627     /**
628      * Get the human-readable description of the workflow.
629      *
630      * @return A String containing the description.
631      */
632     public String getDescription() {
633         return workflow.getDescription();
634     }
635 
636     /**
637      * Set the human-readable description of the workflow.
638      *
639      * @param description The description.
640      */
641     public void setDescription(String description) {
642         workflow.setDescription(description);
643     }
644 
645     /**
646      * Get the current workflow description of this workflow. If the workflow description changes during runtime,
647      * you should override this method by the concrete implementation.
648      *
649      * @return The workflow description
650      */
651     public String getWorkflowDescription() {
652         updateWorkflowRuntimeVersion();
653         try {
654             return JdomString.workflow2string(workflow);
655         } catch (IOException e) {
656             logger.error("exception: " + e, e);
657             return null;
658         }
659     }
660 
661     /**
662      * Set the current workflow description of this workflow.
663      *
664      * @param workflowDescription The new workflow description.
665      * @throws WorkflowFormatException If the workflow format is wrong.
666      * @throws StateTransitionException If status of workflow is in final status STATUS_COMPLETED or STATUS_TERMINATED.
667      */
668     public synchronized void setWorkflowDescription(String workflowDescription) throws WorkflowFormatException, StateTransitionException {
669         // check status
670         if (getStatus() == WorkflowStatus.STATUS_COMPLETED || getStatus() == WorkflowStatus.STATUS_TERMINATED) {
671             throw new StateTransitionException("Invalid status " + getStatusAsString() + " for setting the workflow description."
672             + " Should NOT have final status COMPLETED or TERMINATED.");
673         }
674 
675         // suspend RUNNING or ACTIVE workflow.
676         /// ToDo: FixMe: Problem when calling setWorkflowDescription from within run() method!
677 //        boolean suspended = false;
678 //        if (getStatus() == STATUS_RUNNING || getStatus() == STATUS_ACTIVE) {
679 //            logger.info("Suspending workflow with status '" + getStatusAsString() + "' for setting new workflow description...");
680 //            // should NOT call suspendWorkflow() as it will block because of synchronized!?
681 //            suspend = true;
682 //            try {
683 //                waitForStatusChangeTo(STATUS_SUSPENDED);
684 //                logger.info("Suspending workflow with status '" + getStatusAsString() + "' for setting new workflow description... done.");
685 //            } catch (InterruptedException e) {
686 //                logger.error("exception: " + e, e);
687 //            }
688 //            suspended = true;
689 //        }
690 
691         // checkpoint old workflow
692         if (workflowPersistence == WORKFLOW_PERSISTENCE_TRUE) {
693             try {
694                 storeWorkflow();
695                 storeDataTokens();
696             } catch (DatabaseException e) {
697                 logger.warn("WARNING Workflow not stored due to database problems: " + e);
698             }
699         }
700 
701         // parse and overwrite workflow
702         // ToDo: versioning? detecting version conflicts?
703         try {
704             workflow.fromXML(workflowDescription);
705         } catch (CapacityException e) {
706             logger.error("exception: " + e, e);
707             throw new WorkflowFormatException("Wrong number of tokens: " + e, e);
708         }
709         super.setWorkflowDescription(workflowDescription);
710         analyzeWorkflow();
711 
712         // checkpoint new workflow
713         if (workflowPersistence == WORKFLOW_PERSISTENCE_TRUE) {
714             try {
715                 storeWorkflow();
716                 storeDataTokens();
717             } catch (DatabaseException e) {
718                 logger.warn("WARNING Workflow not stored due to database problems: " + e);
719             }
720         }
721 
722         // resume workflow
723 //        logger.info("New workflow has status "+getStatusAsString());
724 //        if (suspended && (getStatus() == STATUS_SUSPENDED)) {
725 //            logger.info("Resuming workflow with status '" + getStatusAsString() + "'...");
726 //            resumeWorkflow();
727 //            logger.info("Resuming workflow with status '" + getStatusAsString() + "'... done.");
728 //        }
729     }
730 
731     /**
732      * Set status of the workflow and notify threads that wait for a status change.
733      *
734      * @see WorkflowStatus#STATUS_UNDEFINED
735      * @see WorkflowStatus#STATUS_INITIATED
736      * @see WorkflowStatus#STATUS_RUNNING
737      * @see WorkflowStatus#STATUS_SUSPENDED
738      * @see WorkflowStatus#STATUS_ACTIVE
739      * @see WorkflowStatus#STATUS_TERMINATED
740      * @see WorkflowStatus#STATUS_COMPLETED
741      * @see #waitForStatusChangeTo(int)
742      * @see #waitForStatusChangeFrom(int)
743      */
744     protected synchronized void setStatus(int status) {
745         if (status != WorkflowStatus.STATUS_UNDEFINED) {
746             workflow.getProperties().put(Constants.PROP_STATUS, WorkflowStatus.getStatusAsString(status));
747         }
748         super.setStatus(status);
749     }
750 
751     /**
752      * *************************************************************************************************************
753      * This is the run method of the Thread, invoked by start(). In order to abort this thread set abort = true.
754      * **************************************************************************************************************
755      */
756     public void run() {
757         logger.debug("run() ...");
758 
759         int step = 0;
760         boolean modification = true;
761         long sleepTime = SLEEP_TIME_MIN;
762 
763         try {
764 
765             // get enabled transitions
766             List<Transition> enabledTransitions = new ArrayList<Transition>();
767             enabledTransitions.addAll(Arrays.asList(workflow.getEnabledTransitions()));
768             if (enabledTransitions.size() == 0) {
769                 logger.warn("workflow '" + getID() + "' does not contain any enabled transitions!");
770             }
771 
772             // while workflow is not to abort and there exists enabled transitions or this workflow is still active
773             while ((!abort && enabledTransitions.size() > 0) || getStatus() == WorkflowStatus.STATUS_ACTIVE) {
774 
775                 if (modification) {
776                     if (logger.isDebugEnabled()) {
777                     logger.debug(new StringBuffer()
778                             .append("--- step ")
779                             .append(step)
780                             .append(" (")
781                             .append(WorkflowStatus.getStatusAsString(getStatus()))
782                             .append(") --- ")
783                             .append(enabledTransitions.size())
784                             .append(" enabled transition(s)")
785                             .toString());
786                     }
787                     modification = false;
788                 }
789 
790                 // select enabled transition with true condition.
791                 TransitionOccurrence to = selectTransitionOccurrence(enabledTransitions);
792 
793                 // if a breakpoint has been reached, then suspend the workflow
794                 if (!abort && !suspend && to != null) {
795                     GenericProperties transprops = to.transition.getProperties();
796                     String breakstring = transprops.get(Constants.PROP_TRANSITION_BREAKPOINT);
797                     if (breakstring != null) {
798                         // If workflow is resumed, remove "REACHED and put "RELEASED" as value to breakpoint property
799                         if (breakstring.equals("REACHED")) {
800                             logger.info("released breakpoint at transition " + to.transition.getID());
801                             transprops.put(Constants.PROP_TRANSITION_BREAKPOINT, "RELEASED");
802                         } else {
803                             logger.info("reached breakpoint at transition " + to.transition.getID());
804                             transprops.put(Constants.PROP_TRANSITION_BREAKPOINT, "REACHED");
805                             suspend = true;
806                         }
807                     }
808                 }
809 
810                 // process selected transition
811                 if (!abort && !suspend && to != null) {
812                     if (logger.isDebugEnabled())
813                         logger.debug("--- step " + step + " --- " + "processing transition occurrence " + to.getID());
814 
815                     // control transition without operation
816                     if (to.transition.getAbstractionLevel() == Operation.BLACK) {
817                         processBlackTransition(to, step);
818                         modification = true;
819                     }
820 
821                     // concrete operation
822                     else if (to.transition.getAbstractionLevel() == Operation.GREEN) {
823                         if (processGreenTransition(to)) modification = true;
824                     }
825 
826                     // invoke firstcsscheduler. If successful, directly process concrete operation.
827                     else if (to.transition.getAbstractionLevel() == Operation.BLUE) {
828                         if (processBlueTransition(to)) modification = true;
829                     }
830 
831                     // invoke AAB or ResourceMatcher
832                     else if (to.transition.getAbstractionLevel() == Operation.YELLOW) {
833                         if (processYellowTransition(to)) modification = true;
834                     }
835 
836                     // invoke WCT
837                     else if (to.transition.getAbstractionLevel() == Operation.RED) {
838                         if (processRedTransition(to)) modification = true;
839                     }
840 
841                     // the rest
842                     else {
843                         workflowErrorAndAbort("Unsupported abstraction level.");
844                     }
845                 }
846 
847                 // check the status of all activities and process results if COMPLETED or TERMINATED.
848                 // check also for FAILED activities //MS
849                 if (checkActivityStatus(getActivityTable(), step)) modification = true;
850 
851                 // suspend if there is a deadlock because of false conditions
852                 if (getStatus() == WorkflowStatus.STATUS_RUNNING && !modification && to == null && enabledTimeDependendFalseTransitions.size() == 0) {
853                     modification = true;
854                     workflowWarnAndSuspend("Workflow suspended because all static conditions of all enabled transitions are false, or because of unresolved decision (conflict)!");
855                 }
856 
857                 // wait here if workflow has been suspended. Workflow must switch from ACTIVE to RUNNING first!
858                 if (suspend && getStatus() == WorkflowStatus.STATUS_RUNNING) {
859                     setStatus(WorkflowStatus.STATUS_SUSPENDED);
860                     modification = true;
861                     waitForStatusChangeFrom(WorkflowStatus.STATUS_SUSPENDED);
862                 }
863 
864                 // if no modification occurred this step, wait some time
865                 if (!modification && !abort) {
866                     try {
867                         if (logger.isDebugEnabled()) {
868                             logger.debug("Sleeping " + sleepTime + "ms ...");
869                         }
870                         Thread.sleep(sleepTime);
871                         // set dynamic sleep time
872                         sleepTime *= 1.5;
873                         if (sleepTime > SLEEP_TIME_MAX) sleepTime = SLEEP_TIME_MAX;
874                     } catch (InterruptedException ie) {
875                         interrupt();
876                     }
877                 }
878                 // reset sleep time if there was a modification
879                 else {
880                     sleepTime = SLEEP_TIME_MIN;
881                 }
882 
883                 // update enabled transitions
884                 if (!abort) {
885                     enabledTransitions.clear();
886                     enabledTransitions.addAll(Arrays.asList(workflow.getEnabledTransitions()));
887                 }
888 
889                 if (modification) {
890                     step++;
891                 }
892             }
893         } catch (Exception e) {
894             workflowErrorAndAbort("Unexpected error", e);
895         }
896 
897         // set exit status
898         setStatus((abort) ? WorkflowStatus.STATUS_TERMINATED : WorkflowStatus.STATUS_COMPLETED);
899         if (logger.isDebugEnabled())
900             logger.debug(new StringBuffer()
901                 .append("--- step ")
902                 .append(step)
903                 .append(" (")
904                 .append(WorkflowStatus.getStatusAsString(getStatus()))
905                 .append(") --- ")
906                 .toString());
907 
908         //display occurrence sequence
909         if (logger.isDebugEnabled() && occurrenceSequence!=null) {
910             logger.debug("Occurrence sequence: ["+occurrenceSequence+">");
911         }
912 
913         //store workflow in database
914         if (workflowPersistence == WORKFLOW_PERSISTENCE_TRUE) {
915             try {
916                 storeWorkflow();
917             } catch (DatabaseException e) {
918                 logger.warn("WARNING: " + e);
919             }
920         }
921 
922     }
923 
924     public void workflowErrorAndAbort(String error) {
925         logger.error(error);
926         if (workflow != null) workflow.getProperties().put(Constants.PROP_ERROR_ + createNewErrorID(), error);
927         abort = true;
928     }
929 
930     public void workflowErrorAndAbort(String error, Throwable e) {
931         logger.error(error + ": " + e, e);
932         if (workflow != null) workflow.getProperties().put(Constants.PROP_ERROR_ + createNewErrorID(), error + ": " + e);
933         abort = true;
934     }
935 
936     public void workflowWarn(String warning) {
937         logger.warn(warning);
938         if (workflow != null) workflow.getProperties().put(Constants.PROP_WARN_ + createNewErrorID(), warning);
939     }
940 
941     public void workflowWarnAndSuspend(String warning) {
942         logger.warn(warning);
943         if (workflow != null) workflow.getProperties().put(Constants.PROP_WARN_ + createNewErrorID(), warning);
944         suspend = true;
945     }
946 
947     /**
948      * Select one transition occurrence. Criterias are: condition == true and transition should not be part of an
949      * unresolved decision where all transitions have same priority.
950      * As a side effect this method updates and sorts the list <code>enabledTrueTransitions</code>.
951      * Transitions with a higher priority are selected first.
952      * The priority of a transition is defined by the <code>TransitionPriorityComparator</code>.
953      * The selection of the tokens for this occurrence is done within the contructor of the TransitionOccurrence.
954      *
955      * @param enabledTransitions An array of transitions to select from.
956      * @return The selected transition or <code>null</code> if there was no good transition.
957      * @throws WorkflowFormatException
958      */
959     private TransitionOccurrence selectTransitionOccurrence(List<Transition> enabledTransitions) throws WorkflowFormatException {
960         enabledTimeDependendFalseTransitions.clear();
961 
962         if (!abort && !suspend) {
963             //sort list according to the transition priority, beginning with high priority.
964             Collections.sort(enabledTransitions, Collections.reverseOrder(new TransitionPriorityComparator()));
965 
966             // gather conflicts
967             updateConflictTransitions();
968 
969             //loop enabled transitions beginning with high priority
970             enabledTransitionLoop:
971             for (Transition checktransition : enabledTransitions) {
972 //                logger.info("checking enabled transition " + checktransition.getID() + " ...");
973                 TransitionOccurrence to = new TransitionOccurrence(checktransition);
974 
975                 // check if part of conflict
976                 if (conflictTransitions.contains(checktransition)) {
977                     // conflict
978 
979                     // check pre selection property
980                     if (checktransition.getProperties().get(Constants.PROP_TRANSITION_PRE_SELECTED)!=null) {
981                         logger.debug("Found pre selected transition occurrence "+to.toString());
982                         checktransition.getProperties().remove(Constants.PROP_TRANSITION_PRE_SELECTED);
983                         return to;
984                     }
985 
986                     // check condition
987                     int condition = to.checkConditions(conditionChecker);
988                     if ((condition & ConditionChecker.CONDITION_TRUE) != 0) {
989                         // condition is true
990 
991                         // compare priorities
992 
993                         // get all decisions where this transition is involved
994                         List<Decision> decisions = new ArrayList<Decision>();
995                         GenericProperties props = checktransition.getProperties();
996                         for (Property prop : props.getProperties()) {
997                             if (prop.getKey().startsWith(Constants.PROP_DECISION_)) {
998                                 decisions.add(currentDecisions.get(prop.getKey()));
999                             }
1000                         }
1001 
1002                         // loop all decisions and compare priority of involved transitions
1003                         int thisPriority = TransitionPriorityComparator.getPriority(checktransition);
1004                         // -1 -> others have higher priority
1005                         //  0 -> others have same priority (but not higher)
1006                         // +1 -> this one has highest priority
1007                         int priorityComparison = 1;
1008                         decisionLoop:
1009                         for (Decision decision : decisions) {
1010                             for (Transition t : decision.transitions) {
1011                                 // skip comparison with itself
1012                                 if (t == checktransition) continue; 
1013                                 int otherPriority = TransitionPriorityComparator.getPriority(t);
1014                                 if (otherPriority > thisPriority) {
1015                                     TransitionOccurrence testTO = new TransitionOccurrence(t);
1016                                     if ( (testTO.checkConditions(conditionChecker) & ConditionChecker.CONDITION_TRUE) != 0) {
1017 //                                        logger.info("transition " + t.getID() + " has higher priority than checked transition " + checktransition.getID());
1018                                         priorityComparison = -1;
1019                                         break decisionLoop;
1020                                     }
1021                                 } else if (otherPriority == thisPriority) {
1022                                     TransitionOccurrence testTO = new TransitionOccurrence(t);
1023                                     if ( (testTO.checkConditions(conditionChecker) & ConditionChecker.CONDITION_TRUE) != 0) {
1024 //                                        logger.info("transition " + t.getID() + " has same priority as checked transition " + checktransition.getID());
1025                                         priorityComparison = 0;
1026                                     }
1027                                 }
1028                             }
1029                         }
1030 
1031                         switch (priorityComparison) {
1032                             // this one has highest priority
1033                             case 1: return to;
1034 
1035                             // others have same priority
1036                             case 0: continue enabledTransitionLoop;
1037 
1038                             // others have higher priority
1039                             case -1: continue enabledTransitionLoop;
1040                         }
1041                     } else if ((condition & ConditionChecker.CONDITION_FALSE_DYNAMIC) != 0) {
1042                         enabledTimeDependendFalseTransitions.add(checktransition);
1043                     }
1044                 } else {
1045                     // no conflict
1046                     int condition = to.checkConditions(conditionChecker);
1047                     if ((condition & ConditionChecker.CONDITION_TRUE) != 0) {
1048                         return to;
1049                     } else if ((condition & ConditionChecker.CONDITION_FALSE_DYNAMIC) != 0) {
1050                         enabledTimeDependendFalseTransitions.add(checktransition);
1051                     }
1052                 }
1053             }
1054         }
1055 
1056         return null;
1057     }
1058 
1059     /**
1060      * Update the list of transitions that are currently involved in a conflict.
1061      * Updates also the map "currentDecisions" as a side effect.
1062      */
1063     private void updateConflictTransitions() {
1064         conflictTransitions.clear();
1065 
1066         // key = decision ID, value = decision
1067         Map<String, Decision> newDecisions = new HashMap<String, Decision>();
1068 
1069         Decision[] decisions = Conflict.getDecisions(workflow);
1070         for (Decision decision : decisions) {
1071             //ignore PUT_CHOICE
1072             if (decision.type == Decision.PUT_CHOICE) continue;
1073 
1074 // TAKE_CHOICES are relevant decisions which may need user interaction.
1075 //            //ignore TAKE_CHOICE
1076 //            if (decision.type == Decision.TAKE_CHOICE) continue;
1077 
1078             // get decision ID (hash code of decision string)
1079             String decisionID = getDecisionID(decision);
1080 
1081             // put conflict transition to list
1082             for (Transition trans : decision.transitions) {
1083                 if (!conflictTransitions.contains(trans)) {
1084                     conflictTransitions.add(trans);
1085                 }
1086             }
1087 
1088             // put decision to newDecisions
1089             newDecisions.put(decisionID, decision);
1090 
1091             // if this is a new decision annotate corresponding workflow elements
1092             if (!currentDecisions.containsKey(decisionID)) {
1093                 String decisionType = decision.type2String();
1094                 // annotate transitions
1095                 for (Transition tran : decision.transitions) {
1096                     tran.getProperties().put(decisionID, decisionType);
1097                 }
1098                 // annotate place
1099                 decision.place.getProperties().put(decisionID, decisionType);
1100                 // annotate workflow
1101                 workflow.getProperties().put(decisionID, decisionType);
1102             }
1103         }
1104 
1105         // remove annotations of old decisions
1106         for (String decisionID : currentDecisions.keySet()) {
1107             if (!newDecisions.containsKey(decisionID)) {
1108                 Decision decision = currentDecisions.get(decisionID);
1109                 // first remove the workflow property!
1110                 workflow.getProperties().remove(decisionID);
1111                 decision.place.getProperties().remove(decisionID);
1112                 for (Transition transition : decision.transitions) {
1113                     transition.getProperties().remove(decisionID);
1114                 }
1115             }
1116         }
1117 
1118         currentDecisions = newDecisions;
1119     }
1120 
1121     /**
1122      * This method analyzes and annotates the workflow.
1123      */
1124     private void analyzeWorkflow() {
1125         String workflowAnalysis = workflow.getProperties().get(Constants.PROP_WORKFLOW_ANALYSIS);
1126         if (workflowAnalysis == null) workflowAnalysis = System.getProperty(Constants.PROP_SYSTEM_GWES_WORKFLOW_ANALYSIS,"karp-miller");
1127         if (workflowAnalysis.equalsIgnoreCase("karp-miller") || workflowAnalysis.equalsIgnoreCase(Constants.TRUE)) {
1128             workflow.getProperties().put(Constants.PROP_WORKFLOW_ANALYSIS,"karp-miller");
1129             karpMillerAnalysis();
1130         } else if (workflowAnalysis.equalsIgnoreCase(Constants.FALSE)) {
1131             workflow.getProperties().put(Constants.PROP_WORKFLOW_ANALYSIS,Constants.FALSE);
1132         } else {
1133             logger.warn("The workflow analysis method \"" + workflowAnalysis+ "\" is not supported. Falling back to \"karp-miller\".");
1134             workflow.getProperties().put(Constants.PROP_WORKFLOW_ANALYSIS,"karp-miller");
1135             workflow.getProperties().put(Constants.PROP_WARN_ + createNewErrorID(),
1136                     "The workflow analysis method \"" + workflowAnalysis+ "\" is not supported. Falling back to \"karp-miller\".");
1137             karpMillerAnalysis();
1138         }
1139     }
1140 
1141     private void karpMillerAnalysis() {
1142         logger.debug("Analysing workflow using Karp-Miller tree method...");
1143 
1144         try {
1145             WorkflowAnalyzer analyzer = new WorkflowAnalyzer(workflow);
1146             if (logger.isDebugEnabled()) {
1147                 logger.debug("workflow analysis:\n" + getID() + "\n" + analyzer.analysis2string());
1148             }
1149 
1150             //check workflow properties and annotate workflow description
1151             int complexity = analyzer.getComplexity();
1152             workflow.getProperties().put(Constants.PROP_COMPLEXITY, "" + complexity);
1153             boolean isunbounded = analyzer.isUnbounded();
1154             workflow.getProperties().put(Constants.PROP_IS_UNBOUNDED, "" + isunbounded);
1155 
1156             //loop through transitions and annotate with results of analysis
1157             Transition[] transitions = workflow.getTransitions();
1158             for (Transition transition : transitions) {
1159                 boolean isquasylive = analyzer.isQuasiLive(transition);
1160                 transition.getProperties().put(Constants.PROP_TRANSITION_IS_QUASI_LIVE, "" + isquasylive);
1161             }
1162 
1163             //loop through places and annotate with results of analysis
1164             Place[] places = workflow.getPlaces();
1165             for (Place place : places) {
1166                 boolean isquasylive = analyzer.isQuasiLive(place);
1167                 boolean isfinallymarked = analyzer.isFinallyMarked(place);
1168                 place.getProperties().put(Constants.PROP_PLACE_IS_QUASI_LIVE, "" + isquasylive);
1169                 place.getProperties().put(Constants.PROP_PLACE_IS_FINALLY_MARKED, "" + isfinallymarked);
1170             }
1171 
1172             logger.debug("Analysing workflow using Karp-Miller tree method ... done.");
1173         } catch (Exception e) {
1174             logger.error("Could not analyze workflow using Karp-Miller tree method: " + e, e);
1175         }
1176     }
1177 
1178     /**
1179      * Store all data tokens of this workflow to the XML database.
1180      */
1181     private void storeDataTokens() throws DatabaseException {
1182         logger.debug("storing tokens ...");
1183         for (Place place : workflow.getPlaces()) {
1184             if (place.getTokens() != null) {
1185                 for (Token token : place.getTokens()) {
1186                     if (token.getData() != null) {
1187                         XMLDB.getInstance().storeTokenData(workflow.getID(), place.getID(), token, place.getOwls());
1188                     }
1189                 }
1190             }
1191         }
1192     }
1193 
1194     /**
1195      * Check all current activities of this workflow.
1196      *
1197      * @param activities The Activity Table.
1198      * @param step       The current step
1199      * @return <code>true</code> if the workflow was modified due to changes in the activity states.
1200      */
1201     private boolean checkActivityStatus(ActivityTable activities, int step) throws ActivityException {
1202         boolean modification = false;
1203         // check status of activities in the right order
1204         Vector<String> activityIDVector = new Vector<String>(activities.keySet());
1205         Collections.sort(activityIDVector);
1206 
1207         int tempworkflowstatus = WorkflowStatus.STATUS_RUNNING;
1208         // loop through activities
1209         for (String activityID : activityIDVector) {
1210             Activity activity = activities.get(activityID);
1211             TransitionOccurrence to = activity.getTransitionOccurrence();
1212             Activity.Status activityStatus = activity.getStatus();
1213             if (logger.isDebugEnabled())
1214                 logger.debug("--- step " + step + " --- " + "activity#" + activityID + "=" + Activity.getStatusAsString(activityStatus));
1215 
1216             // activity has completed or terminated
1217             if (activity.isFinalStatus(activityStatus)) {
1218 
1219                 // set workflow warning property if there is any activity fault message
1220                 String faultMessage = activity.getFaultMessage();
1221                 if (faultMessage != null) {
1222                     workflowWarn("Fault in activity " + activity + ": " + faultMessage);
1223                 }
1224 
1225                 // resource co-allocation: generate the resource.allocation.group token property from activity resource.
1226                 to.generateResourceAllocationGroupPropertyFromActivity();
1227 
1228                 // remove reservation tokens from each output place
1229                 to.removeOutputReservationTokens();
1230 
1231                 try {
1232                     // evaluate XPath expressions
1233                     to.evaluateXPathEdgeExpressions();
1234                     // remove the corresponding token from each input place
1235                     // remark: tokens on read places have not been locked and are not removed.
1236                     to.removeInputTokens();
1237                     // update write tokens (currently only control tokens are updated).
1238                     to.updateWriteTokens();
1239                     //  put new real token on each output place
1240                     to.putOutputTokens();
1241                     // update probability property
1242                     updateProbabilityProperty(to);
1243                     // store provenance if handler is available
1244                     if (provenanceHandler != null) provenanceHandler.storeProvenance(to,step);
1245                 } catch (CapacityException e) {
1246                     workflowErrorAndAbort("error during checkActivityStatus()", e);
1247                 } catch (WorkflowFormatException e) {
1248                     workflowErrorAndAbort("error during checkActivityStatus()", e);
1249                 }
1250 
1251                 // store output token to database
1252                 if (workflowPersistence == WORKFLOW_PERSISTENCE_TRUE) {
1253                     try {
1254                         for (TokenParameter tp : to.tokens) {
1255                             switch (tp.scope) {
1256                                 case OUTPUT:
1257                                     if (tp.token != null) {
1258                                         XMLDB.getInstance().storeTokenData(getID(), tp.edge.getPlaceID(), tp.token, tp.edge.getPlace().getOwls());
1259                                     }
1260                                     break;
1261                             }
1262                         }
1263                     } catch (DatabaseException e) {
1264                         logger.warn("exception: " + e);
1265                     }
1266                 }
1267 
1268                 modification = true;
1269 
1270                 // store occurrence sequence
1271                 updateOccurrenceSequenceProperty(to);
1272 
1273                 // make statistics
1274                 workflowStatistics.updateActivityStatistics(activity, to.transition);
1275 
1276                 // remove activities
1277                 activities.remove(activityID);
1278 
1279                 // fault management regarding the fault management policy property
1280                 if (activityStatus == Activity.Status.TERMINATED) {
1281                     switch (faultManagementPolicy) {
1282                         case FAULT_MANAGEMENT_ABORT_ON_ACTIVITY_TERMINATED:
1283                             abort = true;
1284                             break;
1285                         case FAULT_MANAGEMENT_SUSPEND_ON_ACTIVITY_TERMINATED:
1286                             suspend = true;
1287                             break;
1288                     }
1289                 }
1290             } else if (activityStatus == Activity.Status.FAILED) {
1291                 tempworkflowstatus = WorkflowStatus.STATUS_ACTIVE;
1292 
1293                 // REMARK: FaultToleranceHandler is invoked in ActivityStarter on ActivityException
1294                 //         or within the Activity if no ActivityException is thrown.
1295 
1296                 //release locked tokens
1297                 to.unlockTokens();
1298                 activities.remove(activityID);
1299                 modification = true;
1300             } else {
1301                 // activity still not completed or terminated
1302                 tempworkflowstatus = WorkflowStatus.STATUS_ACTIVE;
1303                 // abort active activities if workflow is to abort
1304                 if (abort) {
1305                     activity.abortActivity();
1306                 }
1307             }
1308 
1309             // set transition status
1310             TransitionStatus ts = transtionStatusTable.get(to.transition.getID());
1311             if (ts == null) {
1312                 ts = new TransitionStatus();
1313                 transtionStatusTable.put(to.transition.getID(), ts);
1314             }
1315             // a FAILED activity still leads to a RUNNING transition, as trying to recover from failure.
1316             ts.setStatus(activityStatus==Activity.Status.FAILED?Activity.Status.RUNNING:activityStatus);
1317         }
1318 
1319         // annotate transitions with transition status
1320         for (Object o : transtionStatusTable.keySet()) {
1321             String transitionID = (String) o;
1322             TransitionStatus status = transtionStatusTable.get(transitionID);
1323             if (status.isModified()) {
1324                 workflow.getTransition(transitionID).getProperties().put(Constants.PROP_TRANSITION_STATUS, status.getStatusAsString());
1325                 status.resetModified();
1326             }
1327         }
1328 
1329         // set workflow status
1330         setStatus(tempworkflowstatus);
1331         return modification;
1332     }
1333 
1334 
1335     private void updateOccurrenceSequenceProperty(TransitionOccurrence to) {
1336         if (occurrenceSequence!=null) {
1337             occurrenceSequence += " "+ to.transition.getID();
1338             workflow.getProperties().put(Constants.PROP_OCCURRENCE_SEQUENCE,occurrenceSequence);
1339         }
1340     }
1341 
1342     private void updateProbabilityProperty(TransitionOccurrence to) {
1343         DoubleDistribution pDist = to.getProbabilityDistribution();
1344         if (pDist == null) return;
1345 
1346         double oldP = Constants.PROP_WORKFLOW_PROBABILITY_DEFAULT;
1347         String propstr = workflow.getProperties().get(Constants.PROP_WORKFLOW_PROBABILITY);
1348         if (propstr != null && propstr.length() > 0) {
1349             oldP = Double.parseDouble(propstr);
1350         }
1351         double p = pDist.getMean() * oldP;
1352         workflow.getProperties().put(Constants.PROP_WORKFLOW_PROBABILITY,Double.toString(p));
1353     }
1354 
1355     /**
1356      * Process red transitions. Invoke operation mapper.
1357      *
1358      * @return <code>true</code> if operation mapper was able to refine the workflow, <code>false</code> otherwise.
1359      */
1360     private boolean processRedTransition(TransitionOccurrence to) throws WorkflowFormatException {
1361         try {
1362             return red2Yellow.processTransition(this, to.transition);
1363         } catch (OperationMapperException e) {
1364             throw new WorkflowFormatException("Exception during processing of red "+ to + ": "+ e, e);
1365         } catch (NullPointerException e) {
1366             throw new WorkflowFormatException("Exception during processing of red transition: Red 2 Yellow Operation Mapper not available! " +
1367                     "Please check property '"+Constants.PROP_SYSTEM_GWES_OPERATIONMAPPER_RED2YELLOW_CLASS+"'.");
1368         }
1369     }
1370 
1371     /**
1372      * Process yellow transitions. Invoke operation mapper.
1373      *
1374      * @return <code>true</code> if the operation mapper was able to refine the workflow, <code>false</code> otherwise.
1375      */
1376     private boolean processYellowTransition(TransitionOccurrence to) throws WorkflowFormatException {
1377         boolean modification = false;
1378         try {
1379         	// (ResourceMatcher is invoked for single transition but machtes complete workflow)
1380             modification = yellow2Blue.processTransition(this, to.transition);
1381             if (modification == true) {
1382             	// ResourceMatcher was successful, now apply HEFT prioritization
1383             	WeightAndRank wr = new WeightAndRank();
1384             	wr.prioritizeTransitions(this.getWorkflow());
1385             }
1386             return modification;
1387         } catch (OperationMapperException e) {
1388             throw new WorkflowFormatException("Exception during processing of yellow "+ to + ": "+ e, e);
1389         } catch (NullPointerException e) {
1390             throw new WorkflowFormatException("Exception during processing of yellow transition: Yellow 2 Blue Operation Mapper not available! " +
1391                     "Please check property '"+Constants.PROP_SYSTEM_GWES_OPERATIONMAPPER_YELLOW2BLUE_CLASS+"'.");
1392         }
1393     }
1394 
1395     /**
1396      * Process blue transition occurrences. Invoke operation mapper.
1397      * As side effect this method also invokes processGreenTransition(), if mapping was successful.
1398      *
1399      * @return <code>true</code> if the operation mapper was able to refine the workflow, <code>false</code> otherwise.
1400      */
1401     private boolean processBlueTransition(TransitionOccurrence to) throws WorkflowFormatException, ActivityException {
1402         boolean modification = false;
1403         try {
1404             synchronized (to.transition.getOperation()) {
1405                 if (to.transition.getOperation().get() instanceof OperationClass) {
1406 
1407                     // check for deterministic spatial co-allocation which overrides scheduling
1408                     String allocateResource = ResourceCoAllocator.blue2greenCoAllocationFromToken(to, this);
1409 
1410                     // regular scheduling if there is no pre-defined resource co-allocation
1411                     modification = allocateResource != null || blue2Green.processTransition(this, to.transition);
1412 
1413                     // if mapping from blue to green was successful, then directly process green transition
1414                     // in order to reuse the transition occurrence object.
1415                     if (to.transition.getAbstractionLevel() == Operation.GREEN) {
1416                         modification = processGreenTransition(to);
1417                     }
1418                 } else {
1419                     workflowWarnAndSuspend("The transition \"" + to.transition.getID() + "\" contains an unsupported operation description.");
1420                     suspend = true;
1421                 }
1422             }
1423 
1424             return modification;
1425         } catch (OperationMapperException e) {
1426             throw new WorkflowFormatException("Exception during processing of blue "+ to + ": "+ e, e);
1427         } catch (NullPointerException e) {
1428             throw new WorkflowFormatException("Exception during processing of blue transition: Blue 2 Green Operation Mapper not available! " +
1429                     "Please check property '"+Constants.PROP_SYSTEM_GWES_OPERATIONMAPPER_BLUE2GREEN_CLASS+"'.");
1430         }
1431     }
1432 
1433     /**
1434      * Process green transition occurrences. Execute the operation that is related to this transition and tokens.
1435      *
1436      * @param to The transition occurrence
1437      * @return <code>true</code> if the workflow changed, <code>false</code> otherwise.
1438      */
1439     private boolean processGreenTransition(TransitionOccurrence to) throws WorkflowFormatException {
1440         boolean modification = false;
1441         Activity activity;
1442 
1443         synchronized (to.transition.getOperation()) {
1444             Object oo = to.transition.getOperation().get();
1445             if (oo instanceof OperationClass) {
1446                 OperationCandidate oc = null;
1447 
1448                 // use first selected candidate
1449                 OperationCandidate[] ops = ((OperationClass) oo).getOperationCandidates();
1450                 for (OperationCandidate candidate : ops) {
1451                     if (candidate.isSelected()) {
1452                         oc = candidate;
1453                         break;
1454                     }
1455                 }
1456 
1457                 if (oc == null) throw new WorkflowFormatException("The Operation Candidate of green "+to+" is null!");
1458 
1459                 if (logger.isDebugEnabled()) {
1460                     logger.debug("processing green " + to + " with operation candidate " + oc + "...");
1461                 }
1462 
1463                 // lock input tokens
1464                 to.lockTokens();
1465 
1466                 // ToDo: reservation of output tokens. Should be displayed as "ghost" tokens?
1467 
1468                 // construct activity depending the operation type
1469                 try {
1470                     activity = constructActivity(oc.getType(), to, oc);
1471                 } catch (ActivityException e) {
1472                     throw new WorkflowFormatException("transition \"" + to.transition.getID() + "\" is related to an activity which could not be constructed: "+e,e);
1473                 }
1474 
1475                 // set activity
1476                 to.setActivity(activity);
1477 
1478                 // set workflow status to active
1479                 setStatus(WorkflowStatus.STATUS_ACTIVE);
1480 
1481                 try {
1482                     // initiate activity and put to activity table
1483                     activity.initiateActivity();
1484                     getActivityTable().put(activity.getID(), activity);
1485 
1486                     // enqueue activity (will invoke activity async.)
1487                     activity.enqueueActivity();
1488 
1489                     // reset scheduling decision if there are two or more alternatives.
1490                     if (ops.length > 1) {
1491                         for (OperationCandidate candidate : ops) {
1492                             if (candidate.isSelected()) {
1493                                 // store last selection as transition property
1494                                 GenericProperties transprops = to.transition.getProperties();
1495                                 transprops.put(Constants.PROP_TRANSITION_LAST_RESOURCE_NAME, candidate.getResourceName());
1496                                 transprops.put(Constants.PROP_TRANSITION_LAST_OPERATION_NAME, candidate.getOperationName());
1497                                 // unset selection
1498                                 candidate.setSelected(false);
1499                             }
1500                         }
1501                     }
1502                 } catch (ActivityException e) {
1503                     FaultToleranceHandler.initiateFaultToleranceOrTerminate(activity);
1504                     getActivityTable().put(activity.getID(), activity);
1505                 }
1506             } else {
1507                 throw new WorkflowFormatException("transition \"" + to.transition.getID() + "\" is related to an activity which is not supported.");
1508             }
1509         }
1510 
1511         modification = true;
1512 
1513         return modification;
1514     }
1515 
1516     /**
1517      * Process black transition occurrence. Execute transition (fire).
1518      * Token properties are gathered from all read and input tokens and added to all output tokens.
1519      *
1520      * @param to The transition occurrence to execute
1521      * @param step       The current step
1522      * @throws WorkflowFormatException
1523      */
1524     private void processBlackTransition(TransitionOccurrence to, int step) throws WorkflowFormatException {
1525         if (logger.isDebugEnabled()) {
1526             logger.debug("processing black " + to + " ...");
1527         }
1528 
1529         // evaluate XPath expressions
1530         to.evaluateXPathEdgeExpressions();
1531 
1532         // remove input tokens
1533         to.removeInputTokens();
1534 
1535         try {
1536             // update write tokens (currently only control tokens are supported as write tokens).
1537             to.updateWriteTokens();
1538             // put output tokens
1539             to.putOutputTokens();
1540         } catch (CapacityException e) {
1541             workflowErrorAndAbort("Error during processBlackTransition() "+toString()+": "+e, e);
1542         }
1543 
1544         // update probability property
1545         updateProbabilityProperty(to);
1546 
1547         // store provenance if handler is available
1548         if (provenanceHandler != null) provenanceHandler.storeProvenance(to,step);
1549 
1550         // store occurrence sequence
1551         updateOccurrenceSequenceProperty(to);
1552     }
1553 
1554     /**
1555      * Create a new identifier for decisions, unique within this workflow.
1556      *
1557      * @return The new identifier.
1558      */
1559     private String getDecisionID(Decision decision) {
1560         return Constants.PROP_DECISION_ + decision.toString().hashCode();
1561     }
1562 
1563     /**
1564      * get the workflow.
1565      *
1566      * @return The workflow
1567      */
1568     public Workflow getWorkflow() {
1569         updateWorkflowRuntimeVersion();
1570         return workflow;
1571     }
1572 
1573     public Map<String, Decision> getCurrentDecisions() {
1574         return currentDecisions;
1575     }
1576 
1577     public List<Transition> getConflictTransitions() {
1578         return conflictTransitions;
1579     }
1580 
1581     public CredentialManager getCredentialManager(Activity activity) throws LoggingException, GSSException {
1582         getGlog().logEvent(
1583                 GWESLogger.Event.GET_CREDENTIAL,
1584                 getUserID(),
1585                 this,
1586                 new String[]{"activityID","DN"},
1587                 null,
1588                 new String[]{activity.getID(),credentialManager.getDN()}
1589         );
1590         return credentialManager;
1591     }
1592 
1593     /**
1594      * Set the policy for the fault management.
1595      *
1596      * @param faultManagementPolicy The integer code of the fault management policy.
1597      * @see #FAULT_MANAGEMENT_ABORT_ON_ACTIVITY_TERMINATED
1598      * @see #FAULT_MANAGEMENT_CONTINUE_ON_ACTIVITY_TERMINATED
1599      * @see #FAULT_MANAGEMENT_SUSPEND_ON_ACTIVITY_TERMINATED
1600      */
1601     public void setFaultManagementPolicy(int faultManagementPolicy) {
1602         this.faultManagementPolicy = faultManagementPolicy;
1603     }
1604 
1605     /**
1606      * Set the policy for the workflow persistence.
1607      *
1608      * @param workflowPersistence The integer code of the workflow persistence policy.
1609      * @see #WORKFLOW_PERSISTENCE_FALSE
1610      * @see #WORKFLOW_PERSISTENCE_TRUE
1611      */
1612     public void setWorkflowPersistence(int workflowPersistence) {
1613         this.workflowPersistence = workflowPersistence;
1614         switch (workflowPersistence) {
1615             case WORKFLOW_PERSISTENCE_FALSE:
1616                 workflow.getProperties().put(Constants.PROP_WORKFLOW_PERSISTENCE, Constants.FALSE);
1617                 break;
1618             case WORKFLOW_PERSISTENCE_TRUE:
1619                 workflow.getProperties().put(Constants.PROP_WORKFLOW_PERSISTENCE, Constants.TRUE);
1620                 break;
1621         }
1622     }
1623 
1624     /**
1625      * Set the policy for the workflow persistence.
1626      *
1627      * @param workflowPersistence The string of the workflow persistence policy.
1628      */
1629     public void setWorkflowPersistence(String workflowPersistence) throws WorkflowFormatException {
1630         if (workflowPersistence == null)  setWorkflowPersistence(WORKFLOW_PERSISTENCE_TRUE);
1631         else if (workflowPersistence.equalsIgnoreCase(Constants.TRUE)) setWorkflowPersistence(WORKFLOW_PERSISTENCE_TRUE);
1632         else if (workflowPersistence.equalsIgnoreCase(Constants.FALSE)) setWorkflowPersistence(WORKFLOW_PERSISTENCE_FALSE);
1633         else throw new WorkflowFormatException("Wrong format of workflow persistence policy: " + workflowPersistence);
1634     }
1635 
1636     /**
1637      * Overrides method of GenericWorkflowHandler and writes property to workflow document
1638      *
1639      * @param terminatedActivities
1640      */
1641     public synchronized void incrementTerminatedActivities(int terminatedActivities) {
1642         super.incrementTerminatedActivities(terminatedActivities);
1643         if (workflow != null) workflow.getProperties().put(Constants.PROP_ACTIVITIES_TERMINATED, "" + getTerminatedActivities());
1644     }
1645 
1646     /**
1647      * Overrides method of GenericWorkflowHandler and writes property to workflow document
1648      */
1649     public synchronized void incrementCompletedActivities(int completedActivities) {
1650         super.incrementCompletedActivities(completedActivities);
1651         if (workflow != null) workflow.getProperties().put(Constants.PROP_ACTIVITIES_COMPLETED, "" + getCompletedActivities());
1652     }
1653 
1654     /**
1655      * Overrides method of GenericWorkflowHandler in order to include statistics in the workflow description files as well.
1656      */
1657     protected void logStatistics() {
1658         super.logStatistics();
1659         GenericProperties workflowProps = workflow.getProperties();
1660         workflowProps.put(Constants.PROP_DURATION_UNDEFINED_MS, "" + getDurationUndefined());
1661         workflowProps.put(Constants.PROP_DURATION_INITIATED_MS, "" + getDurationInitiated());
1662         workflowProps.put(Constants.PROP_DURATION_RUNNING_MS, "" + getDurationRunning());
1663         workflowProps.put(Constants.PROP_DURATION_ACTIVE_MS, "" + getDurationActive());
1664         workflowProps.put(Constants.PROP_DURATION_SUSPENDED_MS, "" + getDurationSuspended());
1665         workflowProps.put(Constants.PROP_DURATION_TOTAL_MS, "" + getDurationTotal());
1666         if (getEndTime() > 0L) {
1667             workflowProps.put(Constants.PROP_END_TIME_MS, "" + getEndTime());
1668         }
1669         workflowStatistics.updateBranchingFactor();
1670         workflowStatistics.updateSequentialExecutionPathSize();
1671         workflowStatistics.updateSpeedupTotal();
1672         workflowStatistics.updateSpeedupActive();
1673 
1674         // only update operation statistics in database if workflow finished successfully
1675         if (getStatus() == WorkflowStatus.STATUS_COMPLETED) {
1676         	try {
1677                 workflowStatistics.updateDatabaseActivityStatistics();
1678             } catch (DatabaseException e) {
1679                 logger.warn("Could not update activity duration statistics in database\n" + e);
1680             }
1681         }
1682     }
1683 
1684     /**
1685      * Updates the runtime version number property of the workflow handled by this handler.
1686      * Should be called each time before giving workflow outside the handler.
1687      */
1688     private void updateWorkflowRuntimeVersion() {
1689         if (workflow == null) return;
1690         GenericProperties props = workflow.getProperties();
1691         String propVersion = props.get(Constants.PROP_WORKFLOW_RUNTIME_VERSION);
1692         // do nothing if propVersion is already up to date. 
1693         if (propVersion == null || !propVersion.equals(""+workflow.getVersionNumber()) ) {
1694             // the put method will increment the internal version number when writing the version to property.
1695             String newVersion = ""+workflow.getVersionNumber()+1;
1696             props.put(Constants.PROP_WORKFLOW_RUNTIME_VERSION,newVersion);
1697         }
1698     }
1699 
1700 }