View Javadoc

1   /*
2    * Copyright 2010 Fraunhofer Gesellschaft, Munich, Germany,
3    * for its Fraunhofer Institute for Computer Architecture and Software
4    * Technology (FIRST), Berlin, Germany. All rights reserved.
5    * http://www.first.fraunhofer.de/
6    */
7   
8   package net.kwfgrid.gwes;
9   
10  import net.kwfgrid.gwes.exception.LoggingException;
11  import net.kwfgrid.gwes.exception.StateTransitionException;
12  import net.kwfgrid.gwes.exception.DatabaseException;
13  import net.kwfgrid.gwes.exception.WorkflowSecurityException;
14  import net.kwfgrid.gwes.util.StringUtils;
15  import net.kwfgrid.gworkflowdl.structure.WorkflowFormatException;
16  import org.apache.log4j.Logger;
17  
18  import java.text.DecimalFormat;
19  import java.text.NumberFormat;
20  
21  /**
22   * A GenericWorkflowHandler controls one workflow. This generic workflow handler is abstract and should be
23   * extended in order to implement a concrete one, which is specific to a certain Workflow Description
24   * Language.  In order to implement your own GenericWorkflowHandler, you should
25   * override the methods
26   * <PRE>
27   * initiateWorkflow()
28   * startWorkflow()
29   * suspendWorkflow()
30   * resumeWorkflow()
31   * abortWorkflow()
32   * </PRE>
33   * One concrete implemenation of the GenericWorkflowHandler is the KWfGenericWorkflowHandler.
34   *
35   * @author Andreas Hoheisel
36   *         (<a href="http://www.andreas-hoheisel.de">www.andreas-hoheisel.de</a>)
37   * @author Andreas Hoheisel
38   *         (<a href="http://www.andreas-hoheisel.de">www.andreas-hoheisel.de</a>)
39   * @version $Id: GenericWorkflowHandler.java 1531 2011-06-22 18:43:10Z hoheisel $
40   * @see GWorkflowDLHandler
41   * @see #initiateWorkflow()
42   * @see #startWorkflow()
43   * @see #suspendWorkflow()
44   * @see #resumeWorkflow()
45   * @see #abortWorkflow()
46   */
47  public abstract class GenericWorkflowHandler extends Thread {
48  
49      /**
50       * log4j logger
51       */
52      private static Logger logger = Logger.getLogger(GenericWorkflowHandler.class);
53  
54      /**
55       * Identifier of this workflow. The String representation of a Universal Unique ID
56       */
57      private final String ID;
58  
59      /**
60       * Activity counter. Is used to generate activity IDs.
61       */
62      private int activityCounter;
63  
64      /**
65       * User ID of the owner of this grid job
66       */
67      private final String userID;
68  
69      /**
70       * The parent GWESEngine.
71       */
72      private GWESEngine engine;
73  
74      /**
75       * String that represents the type of workflow description, e.g., "GWorkflowDL 2.1"
76       */
77      private String originalWorkflowDescriptionType;
78  
79      /**
80       * A string representation of the workflow description.
81       */
82      private String workflowDescription;
83  
84      /**
85       * A object representation of the workflow description.
86       */
87      private Object workflow;
88  
89      /**
90       * Activity table of activities that are owned by this workflow. The keys of the table are the IDs of the
91       * activities.
92       */
93      private ActivityTable activityTable;
94  
95      private DecimalFormat intformat;
96  
97      /**
98       * error counter in order to create error IDs (unique in this workflow).
99       */
100     private int errorCounter = 0;
101 
102     /**
103      * Number of completed activities.
104      */
105     private int completedActivities = 0;
106 
107     /**
108      * Number of terminated activities.
109      */
110     private int terminatedActivities = 0;
111 
112     /**
113      * time in milliseconds status being undefined and not yet initialized.
114      */
115     private long durationUndefined = 0L;
116     private long startTimeUndefined = 0L;
117 
118     /**
119      * time in milliseconds status being initiated and not running.
120      */
121     private long durationInitiated = 0L;
122     private long startTimeInitiated = 0L;
123 
124     /**
125      * time in milliseconds status being running (not including active).
126      */
127     private long durationRunning = 0L;
128     private long startTimeRunning = 0L;
129 
130     /**
131      * time in milliseconds status being active.
132      */
133     private long durationActive = 0L;
134     private long startTimeActive = 0L;
135 
136     /**
137      * time in milliseconds status being suspended.
138      */
139     private long durationSuspended = 0L;
140     private long startTimeSuspended = 0L;
141 
142     /**
143      * total workflow duration in milliseconds.
144      */
145     private long durationTotal = 0L;
146     private long startTimeTotal = 0L;
147 
148     /**
149      * The status of this workflow. Use <CODE>setStatus()</CODE> in order to modify the status.
150      *
151      * @see #setStatus(int)
152      */
153     private int status = -1;
154 
155     private GWESLogger glog;
156 
157     /**
158      * Constructor for GenericWorkflowHandler.
159      *
160      * @param engine The parent workflow handler
161      * @param originalWorkflowDescriptionType The original type of workflow description, e.g. "GWorkflowDL 2.0" before conversion to default type.
162      */
163     protected GenericWorkflowHandler(GWESEngine engine, String originalWorkflowDescriptionType, String workflowDescription, String userID) throws WorkflowSecurityException, LoggingException {
164         super();
165         this.glog = GWESLogger.getInstance();
166         this.engine = engine;
167         this.userID = userID;
168         this.originalWorkflowDescriptionType = originalWorkflowDescriptionType;
169         this.workflowDescription = workflowDescription;
170         this.ID = newID();
171         setName("workflow#" + this.ID);
172         setStatus(WorkflowStatus.STATUS_UNDEFINED);
173         this.activityCounter = 0;
174         this.activityTable = new ActivityTable();
175 
176         // format for activity ID
177         intformat = (DecimalFormat) NumberFormat.getIntegerInstance();
178         intformat.applyPattern("0000000000");
179 
180         if (logger.isDebugEnabled()) {
181             logger.debug("New GenericWorkflowHandler with ID=" + ID + " constructed.");
182         }
183     }
184 
185     /**
186      * Get the identifier of this workflow instance. String representation of a Universal Unique ID.
187      *
188      * @return The identifier of this workflow
189      */
190     public String getID() {
191         return ID;
192     }
193 
194     /**
195      * Get the user identifier of the owner of this workflow instance.
196      *
197      * @return The userID
198      */
199     public String getUserID() {
200         return userID;
201     }
202 
203     public GWESLogger getGlog() {
204         return glog;
205     }
206 
207     /**
208      * Get the original workflow description type before conversion to default description type.
209      * @return String with the original workflow description type, e.g., "GWorkflowDL 2.0".
210      */
211     public String getOriginalWorkflowDescriptionType() {
212         return originalWorkflowDescriptionType;
213     }
214 
215     /**
216      * Get the current workflow description of this workflow. If the workflow description changes during runtime,
217      * you should override this method by the concrete implmenetation.
218      *
219      * @return The workflow descpription
220      */
221     public String getWorkflowDescription() {
222         return workflowDescription;
223     }
224 
225     public Object getWorkflow() {
226         return workflow;
227     }
228 
229     /**
230      * Set the current workflow description of this workflow.
231      *
232      * @param workflowDescription The workflow description
233      */
234     public void setWorkflowDescription(String workflowDescription) throws WorkflowFormatException, StateTransitionException {
235         this.workflowDescription = workflowDescription;
236     }
237 
238     /**
239      * Set status of the workflow and notify threads that wait for a status change.
240      *
241      * @see WorkflowStatus#STATUS_UNDEFINED
242      * @see WorkflowStatus#STATUS_INITIATED
243      * @see WorkflowStatus#STATUS_RUNNING
244      * @see WorkflowStatus#STATUS_SUSPENDED
245      * @see WorkflowStatus#STATUS_ACTIVE
246      * @see WorkflowStatus#STATUS_TERMINATED
247      * @see WorkflowStatus#STATUS_COMPLETED
248      * @see #waitForStatusChangeTo(int)
249      * @see #waitForStatusChangeFrom(int)
250      */
251     protected synchronized void setStatus(int status) {
252 
253         if (status != this.status) {
254             int oldStatus = this.status;
255             this.status = status;
256             long now = System.currentTimeMillis();
257             try {
258                 if (glog.i()) glog.logEventSP(
259                         GWESLogger.Event.WORKFLOW_STATUS_CHANGE,
260                         getUserID(),
261                         this,
262                         "workflowStatus",
263                         (oldStatus >= 0 ? WorkflowStatus.getStatusAsString(oldStatus) : null),
264                         WorkflowStatus.getStatusAsString(status));
265             } catch (LoggingException e) {
266                 logger.warn("Exception during logging during setStatus(): "+e,e);
267             }
268 
269             /* notify waitForStatusChange() */
270             synchronized (this) {
271                 this.notifyAll();
272             }
273             /* statistics */
274             switch (this.status) {
275                 case WorkflowStatus.STATUS_UNDEFINED:
276                     startTimeUndefined = now;
277                     if (startTimeTotal == 0L) {
278                         startTimeTotal = now;
279                     }
280                     break;
281                 case WorkflowStatus.STATUS_INITIATED:
282                     startTimeInitiated = now;
283                     break;
284                 case WorkflowStatus.STATUS_RUNNING:
285                     startTimeRunning = now;
286                     break;
287                 case WorkflowStatus.STATUS_SUSPENDED:
288                     startTimeSuspended = now;
289                     break;
290                 case WorkflowStatus.STATUS_ACTIVE:
291                     startTimeActive = now;
292                     break;
293                 case WorkflowStatus.STATUS_COMPLETED:
294                     durationTotal = now - startTimeTotal;
295                     break;
296                 case WorkflowStatus.STATUS_TERMINATED:
297                     durationTotal = now - startTimeTotal;
298             }
299             switch (oldStatus) {
300                 case WorkflowStatus.STATUS_UNDEFINED:
301                     durationUndefined = now - startTimeUndefined;
302                     break;
303                 case WorkflowStatus.STATUS_INITIATED:
304                     durationInitiated += now - startTimeInitiated;
305                     break;
306                 case WorkflowStatus.STATUS_RUNNING:
307                     durationRunning += now - startTimeRunning;
308                     break;
309                 case WorkflowStatus.STATUS_SUSPENDED:
310                     durationSuspended += now - startTimeSuspended;
311                     break;
312                 case WorkflowStatus.STATUS_ACTIVE:
313                     durationActive += now - startTimeActive;
314             }
315             if (this.status == WorkflowStatus.STATUS_COMPLETED || this.status == WorkflowStatus.STATUS_TERMINATED) {
316                 logStatistics();
317             }
318         }
319     }
320 
321     /**
322      * Get the current status code of this workflow as int.
323      *
324      * @return The current status code of this workflow.
325      * @see #getStatusAsString()
326      * @see WorkflowStatus#STATUS_UNDEFINED
327      * @see WorkflowStatus#STATUS_INITIATED
328      * @see WorkflowStatus#STATUS_RUNNING
329      * @see WorkflowStatus#STATUS_SUSPENDED
330      * @see WorkflowStatus#STATUS_ACTIVE
331      * @see WorkflowStatus#STATUS_TERMINATED
332      * @see WorkflowStatus#STATUS_COMPLETED
333      */
334     public int getStatus() {
335         return status;
336     }
337 
338     /**
339      * Wait for workflow to change its status.
340      *
341      * @param oldStatus The old status
342      * @return The new status
343      * @throws InterruptedException If the thread has been interrupted
344      */
345     public int waitForStatusChangeFrom(int oldStatus) throws InterruptedException {
346 //        logger.debug("Waiting for workflow#" + ID + " to change status ...");
347 
348         // do not wait if already in final state
349         if (status == WorkflowStatus.STATUS_COMPLETED || status == WorkflowStatus.STATUS_TERMINATED) {
350             return status;
351         }
352         while (status == oldStatus) {
353             synchronized (this) {
354                 this.wait();
355             }
356         }
357 //        logger.debug("Waiting for workflow#" + ID + " to change status ... done");
358         return status;
359     }
360 
361     /**
362      * Wait for workflow to change its status to a specified status.
363      *
364      * @param newStatus The new status code to wait for
365      * @throws InterruptedException If the thread has been interrupted
366      */
367     public void waitForStatusChangeTo(int newStatus) throws InterruptedException {
368 //        logger.debug("Waiting for workflow#" + ID + " to change status to "+getStatusAsString(newStatus)+" ...");
369         while (status != newStatus) {
370             synchronized (this) {
371                 this.wait();
372             }
373         }
374 //        logger.debug("Waiting for workflow#" + ID + " to change status to "+getStatusAsString(newStatus)+" ... done.");
375     }
376 
377     /**
378      * Wait for workflow to change its status to final states COMPLETED or TERMINATED.
379      *
380      * @throws InterruptedException If the thread has been interrupted
381      * @return The final status
382      */
383     public int waitForStatusChangeToCompletedOrTerminated() throws InterruptedException {
384         while (status != WorkflowStatus.STATUS_COMPLETED && status != WorkflowStatus.STATUS_TERMINATED) {
385             synchronized (this) {
386                 this.wait();
387             }
388         }
389         return status;
390     }
391 
392     /**
393      * Get the current status of this workflow instance as string.
394      *
395      * @return string representing the status of the workflow.
396      *         This String is useful for user-readable output.
397      * @see #getStatus()
398      * @see WorkflowStatus#getStatusAsString(int)
399      */
400     public String getStatusAsString() {
401         return WorkflowStatus.getStatusAsString(status);
402     }
403 
404     /**
405      * Get the workflow status as an array of strings.
406      * Example:
407      * <pre>
408      * "ID=hoheisel_f2968050-1d6a-11db-bacc-ad353bc1f9b1"
409      * "status=COMPLETED"
410      * "birthdayMs=1154003111126"
411      * "durationUndefinedMs=527"
412      * "durationInitiatedMs=792"
413      * "durationRunningMs=0"
414      * "durationActiveMs=0"
415      * "durationSuspendedMs=0"
416      * "durationTotalMs=1351"
417      * "endTimeMs=1154003112477"
418      * "level=MEMORY"
419      * "description=test workflow"
420      * "userID=test user"
421      * </pre>
422      * @return The current workflow status as string array
423      */
424     public String[] getWorkflowStatus() {
425         String[] ret = new String[13];
426         ret[0] = "ID="+getID();
427         ret[1] = "status="+getStatusAsString();
428         ret[2] = "birthdayMs="+getBirthday();
429         ret[3] = "durationUndefinedMs=" + getDurationUndefined();
430         ret[4] = "durationInitiatedMs=" + getDurationInitiated();
431         ret[5] = "durationRunningMs=" + getDurationRunning();
432         ret[6] = "durationActiveMs=" + getDurationActive();
433         ret[7] = "durationSuspendedMs=" + getDurationSuspended();
434         ret[8] = "durationTotalMs=" + getDurationTotal();
435         ret[9] = "endTimeMs=" + getEndTime();
436         ret[10] = "level=MEMORY";
437         ret[11] = "description=" + getDescription();
438         ret[12] = "userID=" + getUserID();
439 
440         return ret;
441     }
442 
443     /**
444      * Get the parent GWESEngine that owns this workflow handler.
445      *
446      * @return The parent GWES engine
447      */
448     public GWESEngine getEngine() {
449         return engine;
450     }
451 
452     /**
453      * Get the activity table with all activities triggered by this workflow instance.
454      *
455      * @return The activity table
456      */
457     public ActivityTable getActivityTable() {
458         return activityTable;
459     }
460 
461     /**
462      * Creates a new Universal Unique ID for a workflow instance.
463      * The UUID now consists of two parts:
464      * <ul>
465      * <li>the UserID of the user who instantiated the job</li>
466      * <li>an attached UUID</li>
467      * </ul>
468      * The two parts are connected by a underline (_).
469      */
470     protected synchronized String newID() {
471         return StringUtils.extractFilteredCNFromDN(userID) + "_" + java.util.UUID.randomUUID();
472     }
473 
474     /**
475      * Generate a new activity identifier. This identifier consists of the UUID of the workflow a underscore (_) and
476      * a serial number with leading zeros (activity counter).
477      *
478      * @return The new activity identifier.
479      */
480     protected synchronized String getNewActivityID() {
481         return ID + "_" + intformat.format(++activityCounter);
482     }
483 
484     /**
485      * Get the duration of how long this workflow was "UNDEFINED" (before initiated).
486      *
487      * @return The duration in milliseconds
488      */
489     public long getDurationUndefined() {
490         return durationUndefined;
491     }
492 
493     /**
494      * Get the duration of how long this workflow was "INITIATED" (before running).
495      *
496      * @return The duration in milliseconds
497      */
498     public long getDurationInitiated() {
499         return durationInitiated;
500     }
501 
502     /**
503      * Get the duration of how long this workflow was "RUNNING" not including the time it was "ACTIVE".
504      *
505      * @return The duration in milliseconds
506      */
507     public long getDurationRunning() {
508         return durationRunning;
509     }
510 
511     /**
512      * Get the duration of how long this workflow was "ACTIVE".
513      *
514      * @return The duration in milliseconds
515      */
516     public long getDurationActive() {
517         return durationActive;
518     }
519 
520     /**
521      * Get the duration of how long this workflow was "SUSPENDED".
522      *
523      * @return The duration in milliseconds
524      */
525     public long getDurationSuspended() {
526         return durationSuspended;
527     }
528 
529     /**
530      * Get the total workflow duration.
531      *
532      * @return The duration in milliseconds
533      */
534     public long getDurationTotal() {
535         return durationTotal;
536     }
537 
538     /**
539      * Get the birthday of this workflow.
540      * This is the time of the first occurence of STATUS_UNDEFINED of this workflow.
541      * @return The begin time in milliseconds since 1970
542      */
543     public long getBirthday() {
544         return startTimeTotal;
545     }
546 
547     /**
548      * Get the completion or termination time of this workflow.
549      * @return The end time in milliseconds since 1970 or "0" if this workflow has not been finished yet.
550      */
551     public long getEndTime() {
552         if (status== WorkflowStatus.STATUS_COMPLETED || status== WorkflowStatus.STATUS_TERMINATED) {
553             return startTimeTotal+durationTotal;
554         } else {
555             return 0L;
556         }
557     }
558 
559     /**
560      * Get a new unique error ID for this workflow.
561      *
562      * @return a String with a unique error ID.
563      */
564     public String createNewErrorID() {
565         return ""+ (++errorCounter);
566     }
567 
568     protected void logStatistics() {
569         if (logger.isDebugEnabled()) {
570             logger.debug("UNDEFINED: " + getDurationUndefined() / 1000.0 + "s");
571             logger.debug("INITIATED: " + getDurationInitiated() / 1000.0 + "s");
572             logger.debug("RUNNING  : " + getDurationRunning() / 1000.0 + "s");
573             logger.debug("ACTIVE   : " + getDurationActive() / 1000.0 + "s");
574             logger.debug("SUSPENDED: " + getDurationSuspended() / 1000.0 + "s");
575             logger.debug("TOTAL    : " + getDurationTotal() / 1000.0 + "s");
576         }
577     }
578 
579     public int getTerminatedActivities() {
580         return terminatedActivities;
581     }
582 
583     public synchronized void incrementTerminatedActivities(int terminatedActivities) {
584         this.terminatedActivities += terminatedActivities;
585     }
586 
587     public int getCompletedActivities() {
588         return completedActivities;
589     }
590 
591     public synchronized void incrementCompletedActivities(int completedActivities) {
592         this.completedActivities += completedActivities;
593     }
594 
595     // a concrete implementation must implement the following abstract methods
596 
597     /**
598      * Initiate this workflow. Status should switch to INITIATED. Method should only work if the status was UNDEFINED
599      * before. Implement this method in all derived classes!
600      */
601     public abstract void initiateWorkflow() throws WorkflowFormatException, StateTransitionException;
602 
603     /**
604      * Start this workflow. Status should switch to RUNNING. Implement this method in all derived classes!
605      */
606     public abstract void startWorkflow() throws StateTransitionException;
607 
608     /**
609      * Suspend this workflow. Waits until status switches to SUSPENDED. Implement this method in all derived classes!
610      */
611     public abstract void suspendWorkflow() throws StateTransitionException;
612 
613     /**
614      * Suspend this workflow asynchonously. This method does not wait until workflow switches to SUSPENDED.
615      * Implement this method in all derived classes!
616      */
617     public abstract void suspendWorkflowAsync() throws StateTransitionException;
618 
619     /**
620      * Resume this workflow. Status should switch to RUNNING. Implement this method in all derived classes!
621      */
622     public abstract void resumeWorkflow() throws StateTransitionException;
623 
624     /**
625      * Abort this workflow. Waits until status switches to TERMINATED. Implement this method in all derived classes!
626      */
627     public abstract void abortWorkflow() throws StateTransitionException;
628 
629     /**
630      * Abort this workflow asynchonously. This method does not wait until status switches to TERMINATED.
631      * Implement this method in all derived classes!
632      */
633     public abstract void abortWorkflowAsync() throws StateTransitionException;
634 
635     /**
636      * Stores this workflow. Implement this method in all derived classes!
637      * @return The location identifier of the storage location.
638      */
639     public abstract String storeWorkflow() throws DatabaseException;
640 
641     /**
642      * Get specific data that is hold inside the workflow and that is referenced by a data place identifier.
643      *
644      * @param placeID The place identifier that refers to the data
645      * @return The data as an array of Strings.
646      */
647     public abstract String[] getData(String placeID);
648 
649     /**
650      * Get the human-readable description of the workflow.
651      * @return A String containing the description. 
652      */
653     public abstract String getDescription();
654 
655     /**
656      * Set the human-readable description of the workflow.
657      * @param description The description.
658      */
659     public abstract void setDescription(String description);
660 
661     /**
662      * Implement this method for each Class that exends GridJob.
663      */
664     public abstract void run();
665 
666 }