1
2
3
4
5
6
7
8 package net.kwfgrid.gwes;
9
10 import net.kwfgrid.gwes.exception.*;
11 import net.kwfgrid.gwes.operationmapper.OperationMapper;
12 import net.kwfgrid.gwes.prorater.ResourceCoAllocator;
13 import net.kwfgrid.gwes.prorater.WeightAndRank;
14 import net.kwfgrid.gwes.workflowanalyzer.WorkflowStatistics;
15 import net.kwfgrid.gwes.workflowanalyzer.DoubleDistribution;
16 import net.kwfgrid.gworkflowdl.analysis.Conflict;
17 import net.kwfgrid.gworkflowdl.analysis.Decision;
18 import net.kwfgrid.gworkflowdl.analysis.WorkflowAnalyzer;
19 import net.kwfgrid.gworkflowdl.protocol.server.ServerWorkflow;
20 import net.kwfgrid.gworkflowdl.structure.*;
21 import org.apache.log4j.Logger;
22 import org.ietf.jgss.GSSCredential;
23 import org.ietf.jgss.GSSException;
24
25 import java.io.IOException;
26 import java.util.*;
27
28
29
30
31
32
33
34
35
36 public class GWorkflowDLHandler extends GenericWorkflowHandler {
37
38
39
40
41 private final static Logger logger = Logger.getLogger(GWorkflowDLHandler.class);
42
43
44
45
46
47 private ServerWorkflow workflow;
48
49
50
51
52 private CredentialManager credentialManager;
53
54
55
56
57 private static OperationMapper red2Yellow;
58
59
60
61
62 private static OperationMapper yellow2Blue;
63
64
65
66
67 private static OperationMapper blue2Green;
68
69
70
71
72 private static ProvenanceHandler provenanceHandler;
73
74
75
76
77
78 private static final Map<String,Class> activityClassMap = new HashMap<String,Class>();
79
80
81
82
83 private final ConditionChecker conditionChecker;
84
85
86
87
88 private WorkflowStatistics workflowStatistics;
89
90
91
92
93
94 private final long SLEEP_TIME_MIN;
95
96
97
98
99
100 private final long SLEEP_TIME_MAX;
101
102
103
104
105 private boolean abort = false;
106
107
108
109
110 private boolean suspend = false;
111
112
113
114
115 private final Map<String, TransitionStatus> transtionStatusTable;
116
117
118
119
120 private Map<String, Decision> currentDecisions;
121
122
123
124
125 private final List<Transition> conflictTransitions;
126
127
128
129
130 private final ArrayList<Transition> enabledTimeDependendFalseTransitions;
131
132
133
134
135
136
137
138
139
140
141 private int faultManagementPolicy;
142
143
144
145
146 public static final int FAULT_MANAGEMENT_ABORT_ON_ACTIVITY_TERMINATED = 0;
147
148
149
150
151 public static final int FAULT_MANAGEMENT_CONTINUE_ON_ACTIVITY_TERMINATED = 1;
152
153
154
155
156 public static final int FAULT_MANAGEMENT_SUSPEND_ON_ACTIVITY_TERMINATED = 2;
157
158
159
160
161
162
163
164
165
166 private int workflowPersistence;
167
168
169
170
171 public static final int WORKFLOW_PERSISTENCE_FALSE = 0;
172
173
174
175
176 public static final int WORKFLOW_PERSISTENCE_TRUE = 1;
177
178
179
180
181
182
183 private String occurrenceSequence = null;
184
185
186
187
188
189
190
191
192
193 public GWorkflowDLHandler(GWESEngine engine, String originalWorkflowDescriptionType, String workflowDescription, String userIdCredential) throws WorkflowSecurityException, LoggingException {
194 super(engine, originalWorkflowDescriptionType, workflowDescription, CredentialManager.extractUserId(userIdCredential));
195 transtionStatusTable = new HashMap<String, TransitionStatus>();
196 conflictTransitions = new ArrayList<Transition>();
197 currentDecisions = new HashMap<String, Decision>();
198 enabledTimeDependendFalseTransitions = new ArrayList<Transition>();
199 conditionChecker = new ConditionChecker();
200
201
202 try {
203 if (red2Yellow == null)
204 red2Yellow = constructOperationMapper(Constants.PROP_SYSTEM_GWES_OPERATIONMAPPER_RED2YELLOW_CLASS);
205 } catch (OperationMapperException e) {
206 logger.warn("exception during construction of mapper red -> yellow: " + e);
207 }
208
209
210 try {
211 if (yellow2Blue == null)
212 yellow2Blue = constructOperationMapper(Constants.PROP_SYSTEM_GWES_OPERATIONMAPPER_YELLOW2BLUE_CLASS);
213 } catch (OperationMapperException e) {
214 logger.warn("exception during construction of mapper yellow -> blue: " + e);
215 }
216
217
218 try {
219 if (blue2Green == null)
220 blue2Green = constructOperationMapper(Constants.PROP_SYSTEM_GWES_OPERATIONMAPPER_BLUE2GREEN_CLASS);
221 } catch (OperationMapperException e) {
222 logger.warn("exception during construction of mapper blue -> green: " + e);
223 }
224
225
226 try {
227 if (provenanceHandler == null) {
228 provenanceHandler = constructProvenanceHandler(Constants.PROP_SYSTEM_GWES_PROVENANCE_HANDLER_CLASS);
229 }
230 } catch (ProvenanceHandlerException e) {
231 logger.debug("exception during construction of provenance handler: "+e);
232 }
233
234
235 if (Boolean.getBoolean(Constants.PROP_SYSTEM_GWES_CREDENTIAL_WORKFLOW_CERTIFICATE)
236 && CredentialManager.extractCredential(userIdCredential) != null) {
237 try {
238 credentialManager = new CredentialManager(userIdCredential);
239 } catch (GSSException e) {
240 throw new WorkflowSecurityException("Could not create credential manager from userIdCredential: "+e,e);
241 }
242 } else if (Boolean.getBoolean(Constants.PROP_SYSTEM_GWES_CREDENTIAL_DEFAULT_CERTIFICATE)) {
243 try {
244 credentialManager = new CredentialManager();
245 } catch (GSSException e) {
246 throw new WorkflowSecurityException("Could not load default Grid certificate: "+e,e);
247 } catch (IOException e) {
248 throw new WorkflowSecurityException("Could not load default Grid certificate: "+e,e);
249 }
250 } else {
251 logger.info("Not using any credential manager.");
252 }
253
254 SLEEP_TIME_MIN = Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_WORKFLOW_SLEEPTIME_MIN, "20"));
255 SLEEP_TIME_MAX = Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_WORKFLOW_SLEEPTIME_MAX, "60000"));
256 }
257
258 private static OperationMapper constructOperationMapper(String propertyclassname) throws OperationMapperException {
259 String classname = System.getProperty(propertyclassname);
260 if (classname == null)
261 throw new OperationMapperException("Property \"" + propertyclassname + "\" not available. Please set this property in gwes.properties.");
262 if (classname.length() == 0)
263 throw new OperationMapperException("Property \"" + propertyclassname + "\" has no value");
264 OperationMapper mapper;
265 try {
266 Class c = Class.forName(classname);
267 mapper = (OperationMapper) c.getConstructor().newInstance();
268 } catch (Exception e) {
269 throw new OperationMapperException("Exception during constructing operation mapper with classname '"+classname+"':" + e, e);
270 }
271 return mapper;
272 }
273
274 private static ProvenanceHandler constructProvenanceHandler(String propertyclassname) throws ProvenanceHandlerException {
275 String classname = System.getProperty(propertyclassname);
276 if (classname == null)
277 throw new ProvenanceHandlerException("Property \"" + propertyclassname + "\" not available. Please set this property in gwes.properties.");
278 if (classname.length() == 0)
279 throw new ProvenanceHandlerException("Property \"" + propertyclassname + "\" has no value");
280 ProvenanceHandler handler;
281 try {
282 Class c = Class.forName(classname);
283 handler = (ProvenanceHandler) c.getConstructor().newInstance();
284 } catch (Exception e) {
285 throw new ProvenanceHandlerException("Exception during constructing provenance handler with classname '"+classname+"':" + e, e);
286 }
287 return handler;
288 }
289
290 private Activity constructActivity(String type, TransitionOccurrence to, OperationCandidate oc) throws ActivityException {
291 Activity activity = null;
292 String classname = null;
293 Class c;
294 try {
295 Throwable error = null;
296 try {
297
298 c = activityClassMap.get(type);
299 if (c==null) {
300 String propertyclassname = Constants.PROP_SYSTEM_GWES_ACTIVITY_+type+Constants.PROP_SYSTEM_GWES_ACTIVITY__CLASS;
301 classname = System.getProperty(propertyclassname);
302 if (classname == null) throw new ActivityException("Could not map operation type \""+type+"\" onto matching Activity implementation. Please define property \"" + propertyclassname + "\" in gwes.properties.");
303 c = Class.forName(classname);
304
305 activityClassMap.put(type,c);
306 } else {
307 classname = c.getName();
308 }
309
310 activity = (Activity) c
311 .getConstructor(GenericWorkflowHandler.class, TransitionOccurrence.class, OperationCandidate.class)
312 .newInstance(this,to,oc);
313 if (activity == null) throw new ActivityException("Activity is null: Exception during constructing activity with classname '"+classname+"'");
314 } catch (ActivityException ae) {
315 error = ae; throw ae;
316 } catch (ClassNotFoundException e) {
317 ActivityException ae = new ActivityException("Exception during constructing activity with classname '"+classname+"': " + e, e);
318 error = ae; throw ae;
319 } catch (Throwable e) {
320 ActivityException ae = new ActivityException("Unexpected Exception during constructing activity with classname '"+classname+"': " + e, e);
321 error = ae; throw ae;
322 } finally {
323 if (error == null) {
324 if (getGlog().i()) getGlog().logEvent(
325 GWESLogger.Event.ACTIVITY_CONSTRUCT,
326 getUserID(),
327 this,
328 new String[]{"activityID","class","operationType"},
329 null,
330 new String[]{activity.getID(),classname,type});
331 } else {
332 getGlog().logEvent(
333 GWESLogger.Event.ACTIVITY_CONSTRUCT,
334 GWESLogger.EventOutcome.ERROR,
335 getUserID(),
336 this,
337 new String[]{"activityID","class","operationType"},
338 null,
339 new String[]{(activity != null ? activity.getID() : null),classname,type},
340 error);
341 }
342 }
343 } catch (LoggingException e) {
344 logger.error("exception:\n" + e, e);
345 throw new ActivityException("Logging exception during constructActivity(): "+e,e);
346 }
347
348 return activity;
349 }
350
351
352
353
354
355
356
357
358 public synchronized void initiateWorkflow() throws WorkflowFormatException, StateTransitionException {
359 logger.debug("initiateWorkflow ...");
360
361
362 if (getStatus() != WorkflowStatus.STATUS_UNDEFINED) {
363 logger.error("invalid status " + getStatusAsString() + " for initiating the workflow");
364 throw new StateTransitionException("Invalid status " + getStatusAsString() + " for initiating the workflow");
365 }
366
367
368 try {
369 workflow = (ServerWorkflow) Factory.newWorkflow(getID());
370
371 workflow.addStructureListener(WorkflowModificationListener.getInstance());
372
373 workflow.fromXML(super.getWorkflowDescription());
374 if (logger.isDebugEnabled()) {
375 logger.debug("workflow description: " + workflow.getDescription());
376 }
377 } catch (CapacityException e) {
378 logger.error("exception: " + e, e);
379 throw new WorkflowFormatException("Wrong number of tokens: " + e, e);
380 } catch (IllegalArgumentException e) {
381 logger.error("exception: " + e, e);
382 throw new WorkflowFormatException("Wrong workflow ID. Must begin with a letter or underscore: " + e, e);
383 }
384
385
386 if (!getOriginalWorkflowDescriptionType().equals(Constants.WORKFLOW_DESCRIPTION_TYPE_DEFAULT)) {
387 getWorkflow().getProperties().put(Constants.PROP_WORKFLOW_CONVERTED_FROM,getOriginalWorkflowDescriptionType());
388 }
389
390
391 workflowStatistics = new WorkflowStatistics(workflow);
392
393
394 analyzeWorkflow();
395
396
397 String faultManagementString = workflow.getProperties().get(Constants.PROP_FAULT_MANAGEMENT_POLICY);
398 if (faultManagementString != null) {
399 if (faultManagementString.equalsIgnoreCase("abortonactivityterminated"))
400 setFaultManagementPolicy(FAULT_MANAGEMENT_ABORT_ON_ACTIVITY_TERMINATED);
401 else if (faultManagementString.equalsIgnoreCase("continueonactivityterminated"))
402 setFaultManagementPolicy(FAULT_MANAGEMENT_CONTINUE_ON_ACTIVITY_TERMINATED);
403 else if (faultManagementString.equalsIgnoreCase("suspendonactivityterminated"))
404 setFaultManagementPolicy(FAULT_MANAGEMENT_SUSPEND_ON_ACTIVITY_TERMINATED);
405
406 else {
407 logger.warn("The fault management policy \"" + faultManagementString + "\" is not supported. Falling back to \"AbortOnActivityTerminated\".");
408 workflow.getProperties().put(Constants.PROP_FAULT_MANAGEMENT_POLICY, "AbortOnActivityTerminated");
409 workflow.getProperties().put(Constants.PROP_WARN_ + createNewErrorID(),
410 "The fault management policy \"" + faultManagementString + "\" is not supported. Falling back to \"AbortOnActivityTerminated\".");
411 }
412 } else {
413
414 workflow.getProperties().put(Constants.PROP_FAULT_MANAGEMENT_POLICY, "AbortOnActivityTerminated");
415 }
416
417
418 String attemptsStr = workflow.getProperties().get(Constants.PROP_ACTIVITY_MAXATTEMPTS);
419 if (attemptsStr == null) {
420
421 String dfaString = workflow.getProperties().get(Constants.PROP_REDISTRIBUTION_OF_FAILED_ACTIVITIES);
422 int attempts = 1;
423
424 if (dfaString == null || dfaString.equalsIgnoreCase(Constants.TRUE)) {
425 attemptsStr = System.getProperty(Constants.PROP_SYSTEM_GWES_ACTIVITY_MAXATTEMPTS);
426 if (attemptsStr != null) {
427 try {
428 attempts = Integer.parseInt(attemptsStr);
429 } catch (NumberFormatException e) {
430 logger.warn("The property value of "+Constants.PROP_SYSTEM_GWES_ACTIVITY_MAXATTEMPTS+" has wrong format: "+e+"\nSetting maxattempts to 1");
431 }
432 }
433 }
434
435 workflow.getProperties().put(Constants.PROP_ACTIVITY_MAXATTEMPTS,""+attempts);
436 }
437
438
439
440 try {
441 setWorkflowPersistence(workflow.getProperties().get(Constants.PROP_WORKFLOW_PERSISTENCE));
442 } catch (Exception e) {
443 logger.warn(e);
444 setWorkflowPersistence(System.getProperty(Constants.PROP_SYSTEM_WORKFLOW_PERSISTENCE, Constants.TRUE));
445 }
446
447
448 if (getUserID() != null) {
449 workflow.getProperties().put(Constants.PROP_USER_ID, getUserID());
450 }
451
452
453 if (credentialManager != null) {
454 try {
455 GSSCredential credential = credentialManager.getCredential();
456 if (credential != null) {
457 workflow.getProperties().put(Constants.PROP_DN, credential.getName().toString());
458 }
459 } catch (GSSException e) {
460 logger.warn("WARNING: Could not load user credential: " + e);
461 } catch (IOException e) {
462 logger.warn("WARNING: Could not load user credential: " + e);
463 }
464 }
465
466
467 occurrenceSequence = workflow.getProperties().get(Constants.PROP_OCCURRENCE_SEQUENCE);
468
469
470 setStatus(WorkflowStatus.STATUS_INITIATED);
471 logger.debug("initiateWorkflow ... done");
472
473
474 workflow.getProperties().put(Constants.PROP_BIRTHDAY_MS, "" + getBirthday());
475
476
477 if (workflowPersistence == WORKFLOW_PERSISTENCE_TRUE) {
478 try {
479 storeWorkflow();
480 storeDataTokens();
481 } catch (DatabaseException e) {
482 logger.warn("WARNING Workflow not stored due to database problems: " + e);
483 }
484 }
485 }
486
487
488
489
490
491 public synchronized void startWorkflow() throws StateTransitionException {
492 logger.debug("startWorkflow() ...");
493
494
495 if (getStatus() != WorkflowStatus.STATUS_INITIATED) {
496 logger.error("invalid status " + getStatusAsString() + " for starting the workflow");
497 throw new StateTransitionException("Invalid status " + getStatusAsString() + " for starting the workflow");
498 }
499
500
501 if (provenanceHandler != null) provenanceHandler.initialize(workflow);
502
503
504 if (!isAlive()) {
505 start();
506 setStatus(WorkflowStatus.STATUS_RUNNING);
507 }
508
509 logger.debug("startWorkflow() ... done");
510 }
511
512 public synchronized void suspendWorkflow() throws StateTransitionException {
513
514 if (getStatus() == WorkflowStatus.STATUS_RUNNING || getStatus() == WorkflowStatus.STATUS_ACTIVE) {
515 suspend = true;
516 try {
517 waitForStatusChangeTo(WorkflowStatus.STATUS_SUSPENDED);
518 } catch (InterruptedException e) {
519 logger.error("exception: " + e, e);
520 }
521 } else {
522 logger.error("invalid status " + getStatusAsString() + " for suspending the workflow");
523 throw new StateTransitionException("Invalid status " + getStatusAsString() + " for suspending the workflow");
524 }
525 }
526
527 public synchronized void suspendWorkflowAsync() throws StateTransitionException {
528
529 if (getStatus() == WorkflowStatus.STATUS_RUNNING || getStatus() == WorkflowStatus.STATUS_ACTIVE) {
530 suspend = true;
531 } else {
532 logger.error("invalid status " + getStatusAsString() + " for suspending the workflow");
533 throw new StateTransitionException("Invalid status " + getStatusAsString() + " for suspending the workflow");
534 }
535 }
536
537 public synchronized void resumeWorkflow() throws StateTransitionException {
538 if (getStatus() == WorkflowStatus.STATUS_SUSPENDED && isAlive()) {
539 suspend = false;
540 setStatus(WorkflowStatus.STATUS_RUNNING);
541 } else {
542 logger.error("invalid status " + getStatusAsString() + " for resuming the workflow");
543 throw new StateTransitionException("Invalid status " + getStatusAsString() + " for resuming the workflow");
544 }
545 }
546
547 public synchronized void abortWorkflow() throws StateTransitionException {
548
549
550 if (getStatus() == WorkflowStatus.STATUS_COMPLETED || getStatus() == WorkflowStatus.STATUS_TERMINATED) {
551 logger.error("invalid status " + getStatusAsString() + " for aborting the workflow");
552 throw new StateTransitionException("Invalid status " + getStatusAsString() + " for aborting the workflow");
553 }
554
555 abort = true;
556
557
558 if (getStatus() == WorkflowStatus.STATUS_INITIATED || getStatus() == WorkflowStatus.STATUS_SUSPENDED || getStatus() == WorkflowStatus.STATUS_UNDEFINED) {
559 setStatus(WorkflowStatus.STATUS_TERMINATED);
560 }
561
562
563 else {
564 try {
565 waitForStatusChangeToCompletedOrTerminated();
566 } catch (InterruptedException e) {
567 logger.error("exception: " + e, e);
568 }
569 }
570 }
571
572 public synchronized void abortWorkflowAsync() throws StateTransitionException {
573
574 if (getStatus() == WorkflowStatus.STATUS_COMPLETED || getStatus() == WorkflowStatus.STATUS_TERMINATED) {
575 logger.error("invalid status " + getStatusAsString() + " for aborting the workflow");
576 throw new StateTransitionException("Invalid status " + getStatusAsString() + " for aborting the workflow");
577 }
578
579 abort = true;
580
581
582 if (getStatus() == WorkflowStatus.STATUS_INITIATED || getStatus() == WorkflowStatus.STATUS_SUSPENDED || getStatus() == WorkflowStatus.STATUS_UNDEFINED) {
583 setStatus(WorkflowStatus.STATUS_TERMINATED);
584 }
585 }
586
587
588
589
590
591
592 public String storeWorkflow() throws DatabaseException {
593 updateWorkflowRuntimeVersion();
594 return XMLDB.getInstance().storeWorkflow(workflow);
595 }
596
597
598
599
600
601
602
603 public String[] getData(String placeID) {
604 Place place = workflow.getPlace(placeID);
605 if (place == null) {
606 logger.warn("getData(" + placeID + "): place is not available!");
607 return null;
608 }
609 Token[] tokens = place.getTokens();
610 String[] ret = new String[tokens.length];
611 for (int i = 0; i < tokens.length; i++) {
612
613 Data data = tokens[i].getData();
614 if (data != null) {
615 ret[i] = tokens[i].getData().toXML();
616 } else {
617
618 Boolean control = tokens[i].getControl();
619 if (control != null) {
620 ret[i] = (control ? "<control>true</control>" : "<control>false</control>");
621 } else ret[i] = null;
622 }
623 }
624 return ret;
625 }
626
627
628
629
630
631
632 public String getDescription() {
633 return workflow.getDescription();
634 }
635
636
637
638
639
640
641 public void setDescription(String description) {
642 workflow.setDescription(description);
643 }
644
645
646
647
648
649
650
651 public String getWorkflowDescription() {
652 updateWorkflowRuntimeVersion();
653 try {
654 return JdomString.workflow2string(workflow);
655 } catch (IOException e) {
656 logger.error("exception: " + e, e);
657 return null;
658 }
659 }
660
661
662
663
664
665
666
667
668 public synchronized void setWorkflowDescription(String workflowDescription) throws WorkflowFormatException, StateTransitionException {
669
670 if (getStatus() == WorkflowStatus.STATUS_COMPLETED || getStatus() == WorkflowStatus.STATUS_TERMINATED) {
671 throw new StateTransitionException("Invalid status " + getStatusAsString() + " for setting the workflow description."
672 + " Should NOT have final status COMPLETED or TERMINATED.");
673 }
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692 if (workflowPersistence == WORKFLOW_PERSISTENCE_TRUE) {
693 try {
694 storeWorkflow();
695 storeDataTokens();
696 } catch (DatabaseException e) {
697 logger.warn("WARNING Workflow not stored due to database problems: " + e);
698 }
699 }
700
701
702
703 try {
704 workflow.fromXML(workflowDescription);
705 } catch (CapacityException e) {
706 logger.error("exception: " + e, e);
707 throw new WorkflowFormatException("Wrong number of tokens: " + e, e);
708 }
709 super.setWorkflowDescription(workflowDescription);
710 analyzeWorkflow();
711
712
713 if (workflowPersistence == WORKFLOW_PERSISTENCE_TRUE) {
714 try {
715 storeWorkflow();
716 storeDataTokens();
717 } catch (DatabaseException e) {
718 logger.warn("WARNING Workflow not stored due to database problems: " + e);
719 }
720 }
721
722
723
724
725
726
727
728
729 }
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744 protected synchronized void setStatus(int status) {
745 if (status != WorkflowStatus.STATUS_UNDEFINED) {
746 workflow.getProperties().put(Constants.PROP_STATUS, WorkflowStatus.getStatusAsString(status));
747 }
748 super.setStatus(status);
749 }
750
751
752
753
754
755
756 public void run() {
757 logger.debug("run() ...");
758
759 int step = 0;
760 boolean modification = true;
761 long sleepTime = SLEEP_TIME_MIN;
762
763 try {
764
765
766 List<Transition> enabledTransitions = new ArrayList<Transition>();
767 enabledTransitions.addAll(Arrays.asList(workflow.getEnabledTransitions()));
768 if (enabledTransitions.size() == 0) {
769 logger.warn("workflow '" + getID() + "' does not contain any enabled transitions!");
770 }
771
772
773 while ((!abort && enabledTransitions.size() > 0) || getStatus() == WorkflowStatus.STATUS_ACTIVE) {
774
775 if (modification) {
776 if (logger.isDebugEnabled()) {
777 logger.debug(new StringBuffer()
778 .append("--- step ")
779 .append(step)
780 .append(" (")
781 .append(WorkflowStatus.getStatusAsString(getStatus()))
782 .append(") --- ")
783 .append(enabledTransitions.size())
784 .append(" enabled transition(s)")
785 .toString());
786 }
787 modification = false;
788 }
789
790
791 TransitionOccurrence to = selectTransitionOccurrence(enabledTransitions);
792
793
794 if (!abort && !suspend && to != null) {
795 GenericProperties transprops = to.transition.getProperties();
796 String breakstring = transprops.get(Constants.PROP_TRANSITION_BREAKPOINT);
797 if (breakstring != null) {
798
799 if (breakstring.equals("REACHED")) {
800 logger.info("released breakpoint at transition " + to.transition.getID());
801 transprops.put(Constants.PROP_TRANSITION_BREAKPOINT, "RELEASED");
802 } else {
803 logger.info("reached breakpoint at transition " + to.transition.getID());
804 transprops.put(Constants.PROP_TRANSITION_BREAKPOINT, "REACHED");
805 suspend = true;
806 }
807 }
808 }
809
810
811 if (!abort && !suspend && to != null) {
812 if (logger.isDebugEnabled())
813 logger.debug("--- step " + step + " --- " + "processing transition occurrence " + to.getID());
814
815
816 if (to.transition.getAbstractionLevel() == Operation.BLACK) {
817 processBlackTransition(to, step);
818 modification = true;
819 }
820
821
822 else if (to.transition.getAbstractionLevel() == Operation.GREEN) {
823 if (processGreenTransition(to)) modification = true;
824 }
825
826
827 else if (to.transition.getAbstractionLevel() == Operation.BLUE) {
828 if (processBlueTransition(to)) modification = true;
829 }
830
831
832 else if (to.transition.getAbstractionLevel() == Operation.YELLOW) {
833 if (processYellowTransition(to)) modification = true;
834 }
835
836
837 else if (to.transition.getAbstractionLevel() == Operation.RED) {
838 if (processRedTransition(to)) modification = true;
839 }
840
841
842 else {
843 workflowErrorAndAbort("Unsupported abstraction level.");
844 }
845 }
846
847
848
849 if (checkActivityStatus(getActivityTable(), step)) modification = true;
850
851
852 if (getStatus() == WorkflowStatus.STATUS_RUNNING && !modification && to == null && enabledTimeDependendFalseTransitions.size() == 0) {
853 modification = true;
854 workflowWarnAndSuspend("Workflow suspended because all static conditions of all enabled transitions are false, or because of unresolved decision (conflict)!");
855 }
856
857
858 if (suspend && getStatus() == WorkflowStatus.STATUS_RUNNING) {
859 setStatus(WorkflowStatus.STATUS_SUSPENDED);
860 modification = true;
861 waitForStatusChangeFrom(WorkflowStatus.STATUS_SUSPENDED);
862 }
863
864
865 if (!modification && !abort) {
866 try {
867 if (logger.isDebugEnabled()) {
868 logger.debug("Sleeping " + sleepTime + "ms ...");
869 }
870 Thread.sleep(sleepTime);
871
872 sleepTime *= 1.5;
873 if (sleepTime > SLEEP_TIME_MAX) sleepTime = SLEEP_TIME_MAX;
874 } catch (InterruptedException ie) {
875 interrupt();
876 }
877 }
878
879 else {
880 sleepTime = SLEEP_TIME_MIN;
881 }
882
883
884 if (!abort) {
885 enabledTransitions.clear();
886 enabledTransitions.addAll(Arrays.asList(workflow.getEnabledTransitions()));
887 }
888
889 if (modification) {
890 step++;
891 }
892 }
893 } catch (Exception e) {
894 workflowErrorAndAbort("Unexpected error", e);
895 }
896
897
898 setStatus((abort) ? WorkflowStatus.STATUS_TERMINATED : WorkflowStatus.STATUS_COMPLETED);
899 if (logger.isDebugEnabled())
900 logger.debug(new StringBuffer()
901 .append("--- step ")
902 .append(step)
903 .append(" (")
904 .append(WorkflowStatus.getStatusAsString(getStatus()))
905 .append(") --- ")
906 .toString());
907
908
909 if (logger.isDebugEnabled() && occurrenceSequence!=null) {
910 logger.debug("Occurrence sequence: ["+occurrenceSequence+">");
911 }
912
913
914 if (workflowPersistence == WORKFLOW_PERSISTENCE_TRUE) {
915 try {
916 storeWorkflow();
917 } catch (DatabaseException e) {
918 logger.warn("WARNING: " + e);
919 }
920 }
921
922 }
923
924 public void workflowErrorAndAbort(String error) {
925 logger.error(error);
926 if (workflow != null) workflow.getProperties().put(Constants.PROP_ERROR_ + createNewErrorID(), error);
927 abort = true;
928 }
929
930 public void workflowErrorAndAbort(String error, Throwable e) {
931 logger.error(error + ": " + e, e);
932 if (workflow != null) workflow.getProperties().put(Constants.PROP_ERROR_ + createNewErrorID(), error + ": " + e);
933 abort = true;
934 }
935
936 public void workflowWarn(String warning) {
937 logger.warn(warning);
938 if (workflow != null) workflow.getProperties().put(Constants.PROP_WARN_ + createNewErrorID(), warning);
939 }
940
941 public void workflowWarnAndSuspend(String warning) {
942 logger.warn(warning);
943 if (workflow != null) workflow.getProperties().put(Constants.PROP_WARN_ + createNewErrorID(), warning);
944 suspend = true;
945 }
946
947
948
949
950
951
952
953
954
955
956
957
958
959 private TransitionOccurrence selectTransitionOccurrence(List<Transition> enabledTransitions) throws WorkflowFormatException {
960 enabledTimeDependendFalseTransitions.clear();
961
962 if (!abort && !suspend) {
963
964 Collections.sort(enabledTransitions, Collections.reverseOrder(new TransitionPriorityComparator()));
965
966
967 updateConflictTransitions();
968
969
970 enabledTransitionLoop:
971 for (Transition checktransition : enabledTransitions) {
972
973 TransitionOccurrence to = new TransitionOccurrence(checktransition);
974
975
976 if (conflictTransitions.contains(checktransition)) {
977
978
979
980 if (checktransition.getProperties().get(Constants.PROP_TRANSITION_PRE_SELECTED)!=null) {
981 logger.debug("Found pre selected transition occurrence "+to.toString());
982 checktransition.getProperties().remove(Constants.PROP_TRANSITION_PRE_SELECTED);
983 return to;
984 }
985
986
987 int condition = to.checkConditions(conditionChecker);
988 if ((condition & ConditionChecker.CONDITION_TRUE) != 0) {
989
990
991
992
993
994 List<Decision> decisions = new ArrayList<Decision>();
995 GenericProperties props = checktransition.getProperties();
996 for (Property prop : props.getProperties()) {
997 if (prop.getKey().startsWith(Constants.PROP_DECISION_)) {
998 decisions.add(currentDecisions.get(prop.getKey()));
999 }
1000 }
1001
1002
1003 int thisPriority = TransitionPriorityComparator.getPriority(checktransition);
1004
1005
1006
1007 int priorityComparison = 1;
1008 decisionLoop:
1009 for (Decision decision : decisions) {
1010 for (Transition t : decision.transitions) {
1011
1012 if (t == checktransition) continue;
1013 int otherPriority = TransitionPriorityComparator.getPriority(t);
1014 if (otherPriority > thisPriority) {
1015 TransitionOccurrence testTO = new TransitionOccurrence(t);
1016 if ( (testTO.checkConditions(conditionChecker) & ConditionChecker.CONDITION_TRUE) != 0) {
1017
1018 priorityComparison = -1;
1019 break decisionLoop;
1020 }
1021 } else if (otherPriority == thisPriority) {
1022 TransitionOccurrence testTO = new TransitionOccurrence(t);
1023 if ( (testTO.checkConditions(conditionChecker) & ConditionChecker.CONDITION_TRUE) != 0) {
1024
1025 priorityComparison = 0;
1026 }
1027 }
1028 }
1029 }
1030
1031 switch (priorityComparison) {
1032
1033 case 1: return to;
1034
1035
1036 case 0: continue enabledTransitionLoop;
1037
1038
1039 case -1: continue enabledTransitionLoop;
1040 }
1041 } else if ((condition & ConditionChecker.CONDITION_FALSE_DYNAMIC) != 0) {
1042 enabledTimeDependendFalseTransitions.add(checktransition);
1043 }
1044 } else {
1045
1046 int condition = to.checkConditions(conditionChecker);
1047 if ((condition & ConditionChecker.CONDITION_TRUE) != 0) {
1048 return to;
1049 } else if ((condition & ConditionChecker.CONDITION_FALSE_DYNAMIC) != 0) {
1050 enabledTimeDependendFalseTransitions.add(checktransition);
1051 }
1052 }
1053 }
1054 }
1055
1056 return null;
1057 }
1058
1059
1060
1061
1062
1063 private void updateConflictTransitions() {
1064 conflictTransitions.clear();
1065
1066
1067 Map<String, Decision> newDecisions = new HashMap<String, Decision>();
1068
1069 Decision[] decisions = Conflict.getDecisions(workflow);
1070 for (Decision decision : decisions) {
1071
1072 if (decision.type == Decision.PUT_CHOICE) continue;
1073
1074
1075
1076
1077
1078
1079 String decisionID = getDecisionID(decision);
1080
1081
1082 for (Transition trans : decision.transitions) {
1083 if (!conflictTransitions.contains(trans)) {
1084 conflictTransitions.add(trans);
1085 }
1086 }
1087
1088
1089 newDecisions.put(decisionID, decision);
1090
1091
1092 if (!currentDecisions.containsKey(decisionID)) {
1093 String decisionType = decision.type2String();
1094
1095 for (Transition tran : decision.transitions) {
1096 tran.getProperties().put(decisionID, decisionType);
1097 }
1098
1099 decision.place.getProperties().put(decisionID, decisionType);
1100
1101 workflow.getProperties().put(decisionID, decisionType);
1102 }
1103 }
1104
1105
1106 for (String decisionID : currentDecisions.keySet()) {
1107 if (!newDecisions.containsKey(decisionID)) {
1108 Decision decision = currentDecisions.get(decisionID);
1109
1110 workflow.getProperties().remove(decisionID);
1111 decision.place.getProperties().remove(decisionID);
1112 for (Transition transition : decision.transitions) {
1113 transition.getProperties().remove(decisionID);
1114 }
1115 }
1116 }
1117
1118 currentDecisions = newDecisions;
1119 }
1120
1121
1122
1123
1124 private void analyzeWorkflow() {
1125 String workflowAnalysis = workflow.getProperties().get(Constants.PROP_WORKFLOW_ANALYSIS);
1126 if (workflowAnalysis == null) workflowAnalysis = System.getProperty(Constants.PROP_SYSTEM_GWES_WORKFLOW_ANALYSIS,"karp-miller");
1127 if (workflowAnalysis.equalsIgnoreCase("karp-miller") || workflowAnalysis.equalsIgnoreCase(Constants.TRUE)) {
1128 workflow.getProperties().put(Constants.PROP_WORKFLOW_ANALYSIS,"karp-miller");
1129 karpMillerAnalysis();
1130 } else if (workflowAnalysis.equalsIgnoreCase(Constants.FALSE)) {
1131 workflow.getProperties().put(Constants.PROP_WORKFLOW_ANALYSIS,Constants.FALSE);
1132 } else {
1133 logger.warn("The workflow analysis method \"" + workflowAnalysis+ "\" is not supported. Falling back to \"karp-miller\".");
1134 workflow.getProperties().put(Constants.PROP_WORKFLOW_ANALYSIS,"karp-miller");
1135 workflow.getProperties().put(Constants.PROP_WARN_ + createNewErrorID(),
1136 "The workflow analysis method \"" + workflowAnalysis+ "\" is not supported. Falling back to \"karp-miller\".");
1137 karpMillerAnalysis();
1138 }
1139 }
1140
1141 private void karpMillerAnalysis() {
1142 logger.debug("Analysing workflow using Karp-Miller tree method...");
1143
1144 try {
1145 WorkflowAnalyzer analyzer = new WorkflowAnalyzer(workflow);
1146 if (logger.isDebugEnabled()) {
1147 logger.debug("workflow analysis:\n" + getID() + "\n" + analyzer.analysis2string());
1148 }
1149
1150
1151 int complexity = analyzer.getComplexity();
1152 workflow.getProperties().put(Constants.PROP_COMPLEXITY, "" + complexity);
1153 boolean isunbounded = analyzer.isUnbounded();
1154 workflow.getProperties().put(Constants.PROP_IS_UNBOUNDED, "" + isunbounded);
1155
1156
1157 Transition[] transitions = workflow.getTransitions();
1158 for (Transition transition : transitions) {
1159 boolean isquasylive = analyzer.isQuasiLive(transition);
1160 transition.getProperties().put(Constants.PROP_TRANSITION_IS_QUASI_LIVE, "" + isquasylive);
1161 }
1162
1163
1164 Place[] places = workflow.getPlaces();
1165 for (Place place : places) {
1166 boolean isquasylive = analyzer.isQuasiLive(place);
1167 boolean isfinallymarked = analyzer.isFinallyMarked(place);
1168 place.getProperties().put(Constants.PROP_PLACE_IS_QUASI_LIVE, "" + isquasylive);
1169 place.getProperties().put(Constants.PROP_PLACE_IS_FINALLY_MARKED, "" + isfinallymarked);
1170 }
1171
1172 logger.debug("Analysing workflow using Karp-Miller tree method ... done.");
1173 } catch (Exception e) {
1174 logger.error("Could not analyze workflow using Karp-Miller tree method: " + e, e);
1175 }
1176 }
1177
1178
1179
1180
1181 private void storeDataTokens() throws DatabaseException {
1182 logger.debug("storing tokens ...");
1183 for (Place place : workflow.getPlaces()) {
1184 if (place.getTokens() != null) {
1185 for (Token token : place.getTokens()) {
1186 if (token.getData() != null) {
1187 XMLDB.getInstance().storeTokenData(workflow.getID(), place.getID(), token, place.getOwls());
1188 }
1189 }
1190 }
1191 }
1192 }
1193
1194
1195
1196
1197
1198
1199
1200
1201 private boolean checkActivityStatus(ActivityTable activities, int step) throws ActivityException {
1202 boolean modification = false;
1203
1204 Vector<String> activityIDVector = new Vector<String>(activities.keySet());
1205 Collections.sort(activityIDVector);
1206
1207 int tempworkflowstatus = WorkflowStatus.STATUS_RUNNING;
1208
1209 for (String activityID : activityIDVector) {
1210 Activity activity = activities.get(activityID);
1211 TransitionOccurrence to = activity.getTransitionOccurrence();
1212 Activity.Status activityStatus = activity.getStatus();
1213 if (logger.isDebugEnabled())
1214 logger.debug("--- step " + step + " --- " + "activity#" + activityID + "=" + Activity.getStatusAsString(activityStatus));
1215
1216
1217 if (activity.isFinalStatus(activityStatus)) {
1218
1219
1220 String faultMessage = activity.getFaultMessage();
1221 if (faultMessage != null) {
1222 workflowWarn("Fault in activity " + activity + ": " + faultMessage);
1223 }
1224
1225
1226 to.generateResourceAllocationGroupPropertyFromActivity();
1227
1228
1229 to.removeOutputReservationTokens();
1230
1231 try {
1232
1233 to.evaluateXPathEdgeExpressions();
1234
1235
1236 to.removeInputTokens();
1237
1238 to.updateWriteTokens();
1239
1240 to.putOutputTokens();
1241
1242 updateProbabilityProperty(to);
1243
1244 if (provenanceHandler != null) provenanceHandler.storeProvenance(to,step);
1245 } catch (CapacityException e) {
1246 workflowErrorAndAbort("error during checkActivityStatus()", e);
1247 } catch (WorkflowFormatException e) {
1248 workflowErrorAndAbort("error during checkActivityStatus()", e);
1249 }
1250
1251
1252 if (workflowPersistence == WORKFLOW_PERSISTENCE_TRUE) {
1253 try {
1254 for (TokenParameter tp : to.tokens) {
1255 switch (tp.scope) {
1256 case OUTPUT:
1257 if (tp.token != null) {
1258 XMLDB.getInstance().storeTokenData(getID(), tp.edge.getPlaceID(), tp.token, tp.edge.getPlace().getOwls());
1259 }
1260 break;
1261 }
1262 }
1263 } catch (DatabaseException e) {
1264 logger.warn("exception: " + e);
1265 }
1266 }
1267
1268 modification = true;
1269
1270
1271 updateOccurrenceSequenceProperty(to);
1272
1273
1274 workflowStatistics.updateActivityStatistics(activity, to.transition);
1275
1276
1277 activities.remove(activityID);
1278
1279
1280 if (activityStatus == Activity.Status.TERMINATED) {
1281 switch (faultManagementPolicy) {
1282 case FAULT_MANAGEMENT_ABORT_ON_ACTIVITY_TERMINATED:
1283 abort = true;
1284 break;
1285 case FAULT_MANAGEMENT_SUSPEND_ON_ACTIVITY_TERMINATED:
1286 suspend = true;
1287 break;
1288 }
1289 }
1290 } else if (activityStatus == Activity.Status.FAILED) {
1291 tempworkflowstatus = WorkflowStatus.STATUS_ACTIVE;
1292
1293
1294
1295
1296
1297 to.unlockTokens();
1298 activities.remove(activityID);
1299 modification = true;
1300 } else {
1301
1302 tempworkflowstatus = WorkflowStatus.STATUS_ACTIVE;
1303
1304 if (abort) {
1305 activity.abortActivity();
1306 }
1307 }
1308
1309
1310 TransitionStatus ts = transtionStatusTable.get(to.transition.getID());
1311 if (ts == null) {
1312 ts = new TransitionStatus();
1313 transtionStatusTable.put(to.transition.getID(), ts);
1314 }
1315
1316 ts.setStatus(activityStatus==Activity.Status.FAILED?Activity.Status.RUNNING:activityStatus);
1317 }
1318
1319
1320 for (Object o : transtionStatusTable.keySet()) {
1321 String transitionID = (String) o;
1322 TransitionStatus status = transtionStatusTable.get(transitionID);
1323 if (status.isModified()) {
1324 workflow.getTransition(transitionID).getProperties().put(Constants.PROP_TRANSITION_STATUS, status.getStatusAsString());
1325 status.resetModified();
1326 }
1327 }
1328
1329
1330 setStatus(tempworkflowstatus);
1331 return modification;
1332 }
1333
1334
1335 private void updateOccurrenceSequenceProperty(TransitionOccurrence to) {
1336 if (occurrenceSequence!=null) {
1337 occurrenceSequence += " "+ to.transition.getID();
1338 workflow.getProperties().put(Constants.PROP_OCCURRENCE_SEQUENCE,occurrenceSequence);
1339 }
1340 }
1341
1342 private void updateProbabilityProperty(TransitionOccurrence to) {
1343 DoubleDistribution pDist = to.getProbabilityDistribution();
1344 if (pDist == null) return;
1345
1346 double oldP = Constants.PROP_WORKFLOW_PROBABILITY_DEFAULT;
1347 String propstr = workflow.getProperties().get(Constants.PROP_WORKFLOW_PROBABILITY);
1348 if (propstr != null && propstr.length() > 0) {
1349 oldP = Double.parseDouble(propstr);
1350 }
1351 double p = pDist.getMean() * oldP;
1352 workflow.getProperties().put(Constants.PROP_WORKFLOW_PROBABILITY,Double.toString(p));
1353 }
1354
1355
1356
1357
1358
1359
1360 private boolean processRedTransition(TransitionOccurrence to) throws WorkflowFormatException {
1361 try {
1362 return red2Yellow.processTransition(this, to.transition);
1363 } catch (OperationMapperException e) {
1364 throw new WorkflowFormatException("Exception during processing of red "+ to + ": "+ e, e);
1365 } catch (NullPointerException e) {
1366 throw new WorkflowFormatException("Exception during processing of red transition: Red 2 Yellow Operation Mapper not available! " +
1367 "Please check property '"+Constants.PROP_SYSTEM_GWES_OPERATIONMAPPER_RED2YELLOW_CLASS+"'.");
1368 }
1369 }
1370
1371
1372
1373
1374
1375
1376 private boolean processYellowTransition(TransitionOccurrence to) throws WorkflowFormatException {
1377 boolean modification = false;
1378 try {
1379
1380 modification = yellow2Blue.processTransition(this, to.transition);
1381 if (modification == true) {
1382
1383 WeightAndRank wr = new WeightAndRank();
1384 wr.prioritizeTransitions(this.getWorkflow());
1385 }
1386 return modification;
1387 } catch (OperationMapperException e) {
1388 throw new WorkflowFormatException("Exception during processing of yellow "+ to + ": "+ e, e);
1389 } catch (NullPointerException e) {
1390 throw new WorkflowFormatException("Exception during processing of yellow transition: Yellow 2 Blue Operation Mapper not available! " +
1391 "Please check property '"+Constants.PROP_SYSTEM_GWES_OPERATIONMAPPER_YELLOW2BLUE_CLASS+"'.");
1392 }
1393 }
1394
1395
1396
1397
1398
1399
1400
1401 private boolean processBlueTransition(TransitionOccurrence to) throws WorkflowFormatException, ActivityException {
1402 boolean modification = false;
1403 try {
1404 synchronized (to.transition.getOperation()) {
1405 if (to.transition.getOperation().get() instanceof OperationClass) {
1406
1407
1408 String allocateResource = ResourceCoAllocator.blue2greenCoAllocationFromToken(to, this);
1409
1410
1411 modification = allocateResource != null || blue2Green.processTransition(this, to.transition);
1412
1413
1414
1415 if (to.transition.getAbstractionLevel() == Operation.GREEN) {
1416 modification = processGreenTransition(to);
1417 }
1418 } else {
1419 workflowWarnAndSuspend("The transition \"" + to.transition.getID() + "\" contains an unsupported operation description.");
1420 suspend = true;
1421 }
1422 }
1423
1424 return modification;
1425 } catch (OperationMapperException e) {
1426 throw new WorkflowFormatException("Exception during processing of blue "+ to + ": "+ e, e);
1427 } catch (NullPointerException e) {
1428 throw new WorkflowFormatException("Exception during processing of blue transition: Blue 2 Green Operation Mapper not available! " +
1429 "Please check property '"+Constants.PROP_SYSTEM_GWES_OPERATIONMAPPER_BLUE2GREEN_CLASS+"'.");
1430 }
1431 }
1432
1433
1434
1435
1436
1437
1438
1439 private boolean processGreenTransition(TransitionOccurrence to) throws WorkflowFormatException {
1440 boolean modification = false;
1441 Activity activity;
1442
1443 synchronized (to.transition.getOperation()) {
1444 Object oo = to.transition.getOperation().get();
1445 if (oo instanceof OperationClass) {
1446 OperationCandidate oc = null;
1447
1448
1449 OperationCandidate[] ops = ((OperationClass) oo).getOperationCandidates();
1450 for (OperationCandidate candidate : ops) {
1451 if (candidate.isSelected()) {
1452 oc = candidate;
1453 break;
1454 }
1455 }
1456
1457 if (oc == null) throw new WorkflowFormatException("The Operation Candidate of green "+to+" is null!");
1458
1459 if (logger.isDebugEnabled()) {
1460 logger.debug("processing green " + to + " with operation candidate " + oc + "...");
1461 }
1462
1463
1464 to.lockTokens();
1465
1466
1467
1468
1469 try {
1470 activity = constructActivity(oc.getType(), to, oc);
1471 } catch (ActivityException e) {
1472 throw new WorkflowFormatException("transition \"" + to.transition.getID() + "\" is related to an activity which could not be constructed: "+e,e);
1473 }
1474
1475
1476 to.setActivity(activity);
1477
1478
1479 setStatus(WorkflowStatus.STATUS_ACTIVE);
1480
1481 try {
1482
1483 activity.initiateActivity();
1484 getActivityTable().put(activity.getID(), activity);
1485
1486
1487 activity.enqueueActivity();
1488
1489
1490 if (ops.length > 1) {
1491 for (OperationCandidate candidate : ops) {
1492 if (candidate.isSelected()) {
1493
1494 GenericProperties transprops = to.transition.getProperties();
1495 transprops.put(Constants.PROP_TRANSITION_LAST_RESOURCE_NAME, candidate.getResourceName());
1496 transprops.put(Constants.PROP_TRANSITION_LAST_OPERATION_NAME, candidate.getOperationName());
1497
1498 candidate.setSelected(false);
1499 }
1500 }
1501 }
1502 } catch (ActivityException e) {
1503 FaultToleranceHandler.initiateFaultToleranceOrTerminate(activity);
1504 getActivityTable().put(activity.getID(), activity);
1505 }
1506 } else {
1507 throw new WorkflowFormatException("transition \"" + to.transition.getID() + "\" is related to an activity which is not supported.");
1508 }
1509 }
1510
1511 modification = true;
1512
1513 return modification;
1514 }
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524 private void processBlackTransition(TransitionOccurrence to, int step) throws WorkflowFormatException {
1525 if (logger.isDebugEnabled()) {
1526 logger.debug("processing black " + to + " ...");
1527 }
1528
1529
1530 to.evaluateXPathEdgeExpressions();
1531
1532
1533 to.removeInputTokens();
1534
1535 try {
1536
1537 to.updateWriteTokens();
1538
1539 to.putOutputTokens();
1540 } catch (CapacityException e) {
1541 workflowErrorAndAbort("Error during processBlackTransition() "+toString()+": "+e, e);
1542 }
1543
1544
1545 updateProbabilityProperty(to);
1546
1547
1548 if (provenanceHandler != null) provenanceHandler.storeProvenance(to,step);
1549
1550
1551 updateOccurrenceSequenceProperty(to);
1552 }
1553
1554
1555
1556
1557
1558
1559 private String getDecisionID(Decision decision) {
1560 return Constants.PROP_DECISION_ + decision.toString().hashCode();
1561 }
1562
1563
1564
1565
1566
1567
1568 public Workflow getWorkflow() {
1569 updateWorkflowRuntimeVersion();
1570 return workflow;
1571 }
1572
1573 public Map<String, Decision> getCurrentDecisions() {
1574 return currentDecisions;
1575 }
1576
1577 public List<Transition> getConflictTransitions() {
1578 return conflictTransitions;
1579 }
1580
1581 public CredentialManager getCredentialManager(Activity activity) throws LoggingException, GSSException {
1582 getGlog().logEvent(
1583 GWESLogger.Event.GET_CREDENTIAL,
1584 getUserID(),
1585 this,
1586 new String[]{"activityID","DN"},
1587 null,
1588 new String[]{activity.getID(),credentialManager.getDN()}
1589 );
1590 return credentialManager;
1591 }
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601 public void setFaultManagementPolicy(int faultManagementPolicy) {
1602 this.faultManagementPolicy = faultManagementPolicy;
1603 }
1604
1605
1606
1607
1608
1609
1610
1611
1612 public void setWorkflowPersistence(int workflowPersistence) {
1613 this.workflowPersistence = workflowPersistence;
1614 switch (workflowPersistence) {
1615 case WORKFLOW_PERSISTENCE_FALSE:
1616 workflow.getProperties().put(Constants.PROP_WORKFLOW_PERSISTENCE, Constants.FALSE);
1617 break;
1618 case WORKFLOW_PERSISTENCE_TRUE:
1619 workflow.getProperties().put(Constants.PROP_WORKFLOW_PERSISTENCE, Constants.TRUE);
1620 break;
1621 }
1622 }
1623
1624
1625
1626
1627
1628
1629 public void setWorkflowPersistence(String workflowPersistence) throws WorkflowFormatException {
1630 if (workflowPersistence == null) setWorkflowPersistence(WORKFLOW_PERSISTENCE_TRUE);
1631 else if (workflowPersistence.equalsIgnoreCase(Constants.TRUE)) setWorkflowPersistence(WORKFLOW_PERSISTENCE_TRUE);
1632 else if (workflowPersistence.equalsIgnoreCase(Constants.FALSE)) setWorkflowPersistence(WORKFLOW_PERSISTENCE_FALSE);
1633 else throw new WorkflowFormatException("Wrong format of workflow persistence policy: " + workflowPersistence);
1634 }
1635
1636
1637
1638
1639
1640
1641 public synchronized void incrementTerminatedActivities(int terminatedActivities) {
1642 super.incrementTerminatedActivities(terminatedActivities);
1643 if (workflow != null) workflow.getProperties().put(Constants.PROP_ACTIVITIES_TERMINATED, "" + getTerminatedActivities());
1644 }
1645
1646
1647
1648
1649 public synchronized void incrementCompletedActivities(int completedActivities) {
1650 super.incrementCompletedActivities(completedActivities);
1651 if (workflow != null) workflow.getProperties().put(Constants.PROP_ACTIVITIES_COMPLETED, "" + getCompletedActivities());
1652 }
1653
1654
1655
1656
1657 protected void logStatistics() {
1658 super.logStatistics();
1659 GenericProperties workflowProps = workflow.getProperties();
1660 workflowProps.put(Constants.PROP_DURATION_UNDEFINED_MS, "" + getDurationUndefined());
1661 workflowProps.put(Constants.PROP_DURATION_INITIATED_MS, "" + getDurationInitiated());
1662 workflowProps.put(Constants.PROP_DURATION_RUNNING_MS, "" + getDurationRunning());
1663 workflowProps.put(Constants.PROP_DURATION_ACTIVE_MS, "" + getDurationActive());
1664 workflowProps.put(Constants.PROP_DURATION_SUSPENDED_MS, "" + getDurationSuspended());
1665 workflowProps.put(Constants.PROP_DURATION_TOTAL_MS, "" + getDurationTotal());
1666 if (getEndTime() > 0L) {
1667 workflowProps.put(Constants.PROP_END_TIME_MS, "" + getEndTime());
1668 }
1669 workflowStatistics.updateBranchingFactor();
1670 workflowStatistics.updateSequentialExecutionPathSize();
1671 workflowStatistics.updateSpeedupTotal();
1672 workflowStatistics.updateSpeedupActive();
1673
1674
1675 if (getStatus() == WorkflowStatus.STATUS_COMPLETED) {
1676 try {
1677 workflowStatistics.updateDatabaseActivityStatistics();
1678 } catch (DatabaseException e) {
1679 logger.warn("Could not update activity duration statistics in database\n" + e);
1680 }
1681 }
1682 }
1683
1684
1685
1686
1687
1688 private void updateWorkflowRuntimeVersion() {
1689 if (workflow == null) return;
1690 GenericProperties props = workflow.getProperties();
1691 String propVersion = props.get(Constants.PROP_WORKFLOW_RUNTIME_VERSION);
1692
1693 if (propVersion == null || !propVersion.equals(""+workflow.getVersionNumber()) ) {
1694
1695 String newVersion = ""+workflow.getVersionNumber()+1;
1696 props.put(Constants.PROP_WORKFLOW_RUNTIME_VERSION,newVersion);
1697 }
1698 }
1699
1700 }