1 /*
2 * Copyright 2010 Fraunhofer Gesellschaft, Munich, Germany,
3 * for its Fraunhofer Institute for Computer Architecture and Software
4 * Technology (FIRST), Berlin, Germany. All rights reserved.
5 * http://www.first.fraunhofer.de/
6 */
7
8 package net.kwfgrid.gwes;
9
10 import net.kwfgrid.gwes.exception.LoggingException;
11 import net.kwfgrid.gwes.exception.StateTransitionException;
12 import net.kwfgrid.gwes.exception.DatabaseException;
13 import net.kwfgrid.gwes.exception.WorkflowSecurityException;
14 import net.kwfgrid.gwes.util.StringUtils;
15 import net.kwfgrid.gworkflowdl.structure.WorkflowFormatException;
16 import org.apache.log4j.Logger;
17
18 import java.text.DecimalFormat;
19 import java.text.NumberFormat;
20
21 /**
22 * A GenericWorkflowHandler controls one workflow. This generic workflow handler is abstract and should be
23 * extended in order to implement a concrete one, which is specific to a certain Workflow Description
24 * Language. In order to implement your own GenericWorkflowHandler, you should
25 * override the methods
26 * <PRE>
27 * initiateWorkflow()
28 * startWorkflow()
29 * suspendWorkflow()
30 * resumeWorkflow()
31 * abortWorkflow()
32 * </PRE>
33 * One concrete implemenation of the GenericWorkflowHandler is the KWfGenericWorkflowHandler.
34 *
35 * @author Andreas Hoheisel
36 * (<a href="http://www.andreas-hoheisel.de">www.andreas-hoheisel.de</a>)
37 * @author Andreas Hoheisel
38 * (<a href="http://www.andreas-hoheisel.de">www.andreas-hoheisel.de</a>)
39 * @version $Id: GenericWorkflowHandler.java 1531 2011-06-22 18:43:10Z hoheisel $
40 * @see GWorkflowDLHandler
41 * @see #initiateWorkflow()
42 * @see #startWorkflow()
43 * @see #suspendWorkflow()
44 * @see #resumeWorkflow()
45 * @see #abortWorkflow()
46 */
47 public abstract class GenericWorkflowHandler extends Thread {
48
49 /**
50 * log4j logger
51 */
52 private static Logger logger = Logger.getLogger(GenericWorkflowHandler.class);
53
54 /**
55 * Identifier of this workflow. The String representation of a Universal Unique ID
56 */
57 private final String ID;
58
59 /**
60 * Activity counter. Is used to generate activity IDs.
61 */
62 private int activityCounter;
63
64 /**
65 * User ID of the owner of this grid job
66 */
67 private final String userID;
68
69 /**
70 * The parent GWESEngine.
71 */
72 private GWESEngine engine;
73
74 /**
75 * String that represents the type of workflow description, e.g., "GWorkflowDL 2.1"
76 */
77 private String originalWorkflowDescriptionType;
78
79 /**
80 * A string representation of the workflow description.
81 */
82 private String workflowDescription;
83
84 /**
85 * A object representation of the workflow description.
86 */
87 private Object workflow;
88
89 /**
90 * Activity table of activities that are owned by this workflow. The keys of the table are the IDs of the
91 * activities.
92 */
93 private ActivityTable activityTable;
94
95 private DecimalFormat intformat;
96
97 /**
98 * error counter in order to create error IDs (unique in this workflow).
99 */
100 private int errorCounter = 0;
101
102 /**
103 * Number of completed activities.
104 */
105 private int completedActivities = 0;
106
107 /**
108 * Number of terminated activities.
109 */
110 private int terminatedActivities = 0;
111
112 /**
113 * time in milliseconds status being undefined and not yet initialized.
114 */
115 private long durationUndefined = 0L;
116 private long startTimeUndefined = 0L;
117
118 /**
119 * time in milliseconds status being initiated and not running.
120 */
121 private long durationInitiated = 0L;
122 private long startTimeInitiated = 0L;
123
124 /**
125 * time in milliseconds status being running (not including active).
126 */
127 private long durationRunning = 0L;
128 private long startTimeRunning = 0L;
129
130 /**
131 * time in milliseconds status being active.
132 */
133 private long durationActive = 0L;
134 private long startTimeActive = 0L;
135
136 /**
137 * time in milliseconds status being suspended.
138 */
139 private long durationSuspended = 0L;
140 private long startTimeSuspended = 0L;
141
142 /**
143 * total workflow duration in milliseconds.
144 */
145 private long durationTotal = 0L;
146 private long startTimeTotal = 0L;
147
148 /**
149 * The status of this workflow. Use <CODE>setStatus()</CODE> in order to modify the status.
150 *
151 * @see #setStatus(int)
152 */
153 private int status = -1;
154
155 private GWESLogger glog;
156
157 /**
158 * Constructor for GenericWorkflowHandler.
159 *
160 * @param engine The parent workflow handler
161 * @param originalWorkflowDescriptionType The original type of workflow description, e.g. "GWorkflowDL 2.0" before conversion to default type.
162 */
163 protected GenericWorkflowHandler(GWESEngine engine, String originalWorkflowDescriptionType, String workflowDescription, String userID) throws WorkflowSecurityException, LoggingException {
164 super();
165 this.glog = GWESLogger.getInstance();
166 this.engine = engine;
167 this.userID = userID;
168 this.originalWorkflowDescriptionType = originalWorkflowDescriptionType;
169 this.workflowDescription = workflowDescription;
170 this.ID = newID();
171 setName("workflow#" + this.ID);
172 setStatus(WorkflowStatus.STATUS_UNDEFINED);
173 this.activityCounter = 0;
174 this.activityTable = new ActivityTable();
175
176 // format for activity ID
177 intformat = (DecimalFormat) NumberFormat.getIntegerInstance();
178 intformat.applyPattern("0000000000");
179
180 if (logger.isDebugEnabled()) {
181 logger.debug("New GenericWorkflowHandler with ID=" + ID + " constructed.");
182 }
183 }
184
185 /**
186 * Get the identifier of this workflow instance. String representation of a Universal Unique ID.
187 *
188 * @return The identifier of this workflow
189 */
190 public String getID() {
191 return ID;
192 }
193
194 /**
195 * Get the user identifier of the owner of this workflow instance.
196 *
197 * @return The userID
198 */
199 public String getUserID() {
200 return userID;
201 }
202
203 public GWESLogger getGlog() {
204 return glog;
205 }
206
207 /**
208 * Get the original workflow description type before conversion to default description type.
209 * @return String with the original workflow description type, e.g., "GWorkflowDL 2.0".
210 */
211 public String getOriginalWorkflowDescriptionType() {
212 return originalWorkflowDescriptionType;
213 }
214
215 /**
216 * Get the current workflow description of this workflow. If the workflow description changes during runtime,
217 * you should override this method by the concrete implmenetation.
218 *
219 * @return The workflow descpription
220 */
221 public String getWorkflowDescription() {
222 return workflowDescription;
223 }
224
225 public Object getWorkflow() {
226 return workflow;
227 }
228
229 /**
230 * Set the current workflow description of this workflow.
231 *
232 * @param workflowDescription The workflow description
233 */
234 public void setWorkflowDescription(String workflowDescription) throws WorkflowFormatException, StateTransitionException {
235 this.workflowDescription = workflowDescription;
236 }
237
238 /**
239 * Set status of the workflow and notify threads that wait for a status change.
240 *
241 * @see WorkflowStatus#STATUS_UNDEFINED
242 * @see WorkflowStatus#STATUS_INITIATED
243 * @see WorkflowStatus#STATUS_RUNNING
244 * @see WorkflowStatus#STATUS_SUSPENDED
245 * @see WorkflowStatus#STATUS_ACTIVE
246 * @see WorkflowStatus#STATUS_TERMINATED
247 * @see WorkflowStatus#STATUS_COMPLETED
248 * @see #waitForStatusChangeTo(int)
249 * @see #waitForStatusChangeFrom(int)
250 */
251 protected synchronized void setStatus(int status) {
252
253 if (status != this.status) {
254 int oldStatus = this.status;
255 this.status = status;
256 long now = System.currentTimeMillis();
257 try {
258 if (glog.i()) glog.logEventSP(
259 GWESLogger.Event.WORKFLOW_STATUS_CHANGE,
260 getUserID(),
261 this,
262 "workflowStatus",
263 (oldStatus >= 0 ? WorkflowStatus.getStatusAsString(oldStatus) : null),
264 WorkflowStatus.getStatusAsString(status));
265 } catch (LoggingException e) {
266 logger.warn("Exception during logging during setStatus(): "+e,e);
267 }
268
269 /* notify waitForStatusChange() */
270 synchronized (this) {
271 this.notifyAll();
272 }
273 /* statistics */
274 switch (this.status) {
275 case WorkflowStatus.STATUS_UNDEFINED:
276 startTimeUndefined = now;
277 if (startTimeTotal == 0L) {
278 startTimeTotal = now;
279 }
280 break;
281 case WorkflowStatus.STATUS_INITIATED:
282 startTimeInitiated = now;
283 break;
284 case WorkflowStatus.STATUS_RUNNING:
285 startTimeRunning = now;
286 break;
287 case WorkflowStatus.STATUS_SUSPENDED:
288 startTimeSuspended = now;
289 break;
290 case WorkflowStatus.STATUS_ACTIVE:
291 startTimeActive = now;
292 break;
293 case WorkflowStatus.STATUS_COMPLETED:
294 durationTotal = now - startTimeTotal;
295 break;
296 case WorkflowStatus.STATUS_TERMINATED:
297 durationTotal = now - startTimeTotal;
298 }
299 switch (oldStatus) {
300 case WorkflowStatus.STATUS_UNDEFINED:
301 durationUndefined = now - startTimeUndefined;
302 break;
303 case WorkflowStatus.STATUS_INITIATED:
304 durationInitiated += now - startTimeInitiated;
305 break;
306 case WorkflowStatus.STATUS_RUNNING:
307 durationRunning += now - startTimeRunning;
308 break;
309 case WorkflowStatus.STATUS_SUSPENDED:
310 durationSuspended += now - startTimeSuspended;
311 break;
312 case WorkflowStatus.STATUS_ACTIVE:
313 durationActive += now - startTimeActive;
314 }
315 if (this.status == WorkflowStatus.STATUS_COMPLETED || this.status == WorkflowStatus.STATUS_TERMINATED) {
316 logStatistics();
317 }
318 }
319 }
320
321 /**
322 * Get the current status code of this workflow as int.
323 *
324 * @return The current status code of this workflow.
325 * @see #getStatusAsString()
326 * @see WorkflowStatus#STATUS_UNDEFINED
327 * @see WorkflowStatus#STATUS_INITIATED
328 * @see WorkflowStatus#STATUS_RUNNING
329 * @see WorkflowStatus#STATUS_SUSPENDED
330 * @see WorkflowStatus#STATUS_ACTIVE
331 * @see WorkflowStatus#STATUS_TERMINATED
332 * @see WorkflowStatus#STATUS_COMPLETED
333 */
334 public int getStatus() {
335 return status;
336 }
337
338 /**
339 * Wait for workflow to change its status.
340 *
341 * @param oldStatus The old status
342 * @return The new status
343 * @throws InterruptedException If the thread has been interrupted
344 */
345 public int waitForStatusChangeFrom(int oldStatus) throws InterruptedException {
346 // logger.debug("Waiting for workflow#" + ID + " to change status ...");
347
348 // do not wait if already in final state
349 if (status == WorkflowStatus.STATUS_COMPLETED || status == WorkflowStatus.STATUS_TERMINATED) {
350 return status;
351 }
352 while (status == oldStatus) {
353 synchronized (this) {
354 this.wait();
355 }
356 }
357 // logger.debug("Waiting for workflow#" + ID + " to change status ... done");
358 return status;
359 }
360
361 /**
362 * Wait for workflow to change its status to a specified status.
363 *
364 * @param newStatus The new status code to wait for
365 * @throws InterruptedException If the thread has been interrupted
366 */
367 public void waitForStatusChangeTo(int newStatus) throws InterruptedException {
368 // logger.debug("Waiting for workflow#" + ID + " to change status to "+getStatusAsString(newStatus)+" ...");
369 while (status != newStatus) {
370 synchronized (this) {
371 this.wait();
372 }
373 }
374 // logger.debug("Waiting for workflow#" + ID + " to change status to "+getStatusAsString(newStatus)+" ... done.");
375 }
376
377 /**
378 * Wait for workflow to change its status to final states COMPLETED or TERMINATED.
379 *
380 * @throws InterruptedException If the thread has been interrupted
381 * @return The final status
382 */
383 public int waitForStatusChangeToCompletedOrTerminated() throws InterruptedException {
384 while (status != WorkflowStatus.STATUS_COMPLETED && status != WorkflowStatus.STATUS_TERMINATED) {
385 synchronized (this) {
386 this.wait();
387 }
388 }
389 return status;
390 }
391
392 /**
393 * Get the current status of this workflow instance as string.
394 *
395 * @return string representing the status of the workflow.
396 * This String is useful for user-readable output.
397 * @see #getStatus()
398 * @see WorkflowStatus#getStatusAsString(int)
399 */
400 public String getStatusAsString() {
401 return WorkflowStatus.getStatusAsString(status);
402 }
403
404 /**
405 * Get the workflow status as an array of strings.
406 * Example:
407 * <pre>
408 * "ID=hoheisel_f2968050-1d6a-11db-bacc-ad353bc1f9b1"
409 * "status=COMPLETED"
410 * "birthdayMs=1154003111126"
411 * "durationUndefinedMs=527"
412 * "durationInitiatedMs=792"
413 * "durationRunningMs=0"
414 * "durationActiveMs=0"
415 * "durationSuspendedMs=0"
416 * "durationTotalMs=1351"
417 * "endTimeMs=1154003112477"
418 * "level=MEMORY"
419 * "description=test workflow"
420 * "userID=test user"
421 * </pre>
422 * @return The current workflow status as string array
423 */
424 public String[] getWorkflowStatus() {
425 String[] ret = new String[13];
426 ret[0] = "ID="+getID();
427 ret[1] = "status="+getStatusAsString();
428 ret[2] = "birthdayMs="+getBirthday();
429 ret[3] = "durationUndefinedMs=" + getDurationUndefined();
430 ret[4] = "durationInitiatedMs=" + getDurationInitiated();
431 ret[5] = "durationRunningMs=" + getDurationRunning();
432 ret[6] = "durationActiveMs=" + getDurationActive();
433 ret[7] = "durationSuspendedMs=" + getDurationSuspended();
434 ret[8] = "durationTotalMs=" + getDurationTotal();
435 ret[9] = "endTimeMs=" + getEndTime();
436 ret[10] = "level=MEMORY";
437 ret[11] = "description=" + getDescription();
438 ret[12] = "userID=" + getUserID();
439
440 return ret;
441 }
442
443 /**
444 * Get the parent GWESEngine that owns this workflow handler.
445 *
446 * @return The parent GWES engine
447 */
448 public GWESEngine getEngine() {
449 return engine;
450 }
451
452 /**
453 * Get the activity table with all activities triggered by this workflow instance.
454 *
455 * @return The activity table
456 */
457 public ActivityTable getActivityTable() {
458 return activityTable;
459 }
460
461 /**
462 * Creates a new Universal Unique ID for a workflow instance.
463 * The UUID now consists of two parts:
464 * <ul>
465 * <li>the UserID of the user who instantiated the job</li>
466 * <li>an attached UUID</li>
467 * </ul>
468 * The two parts are connected by a underline (_).
469 */
470 protected synchronized String newID() {
471 return StringUtils.extractFilteredCNFromDN(userID) + "_" + java.util.UUID.randomUUID();
472 }
473
474 /**
475 * Generate a new activity identifier. This identifier consists of the UUID of the workflow a underscore (_) and
476 * a serial number with leading zeros (activity counter).
477 *
478 * @return The new activity identifier.
479 */
480 protected synchronized String getNewActivityID() {
481 return ID + "_" + intformat.format(++activityCounter);
482 }
483
484 /**
485 * Get the duration of how long this workflow was "UNDEFINED" (before initiated).
486 *
487 * @return The duration in milliseconds
488 */
489 public long getDurationUndefined() {
490 return durationUndefined;
491 }
492
493 /**
494 * Get the duration of how long this workflow was "INITIATED" (before running).
495 *
496 * @return The duration in milliseconds
497 */
498 public long getDurationInitiated() {
499 return durationInitiated;
500 }
501
502 /**
503 * Get the duration of how long this workflow was "RUNNING" not including the time it was "ACTIVE".
504 *
505 * @return The duration in milliseconds
506 */
507 public long getDurationRunning() {
508 return durationRunning;
509 }
510
511 /**
512 * Get the duration of how long this workflow was "ACTIVE".
513 *
514 * @return The duration in milliseconds
515 */
516 public long getDurationActive() {
517 return durationActive;
518 }
519
520 /**
521 * Get the duration of how long this workflow was "SUSPENDED".
522 *
523 * @return The duration in milliseconds
524 */
525 public long getDurationSuspended() {
526 return durationSuspended;
527 }
528
529 /**
530 * Get the total workflow duration.
531 *
532 * @return The duration in milliseconds
533 */
534 public long getDurationTotal() {
535 return durationTotal;
536 }
537
538 /**
539 * Get the birthday of this workflow.
540 * This is the time of the first occurence of STATUS_UNDEFINED of this workflow.
541 * @return The begin time in milliseconds since 1970
542 */
543 public long getBirthday() {
544 return startTimeTotal;
545 }
546
547 /**
548 * Get the completion or termination time of this workflow.
549 * @return The end time in milliseconds since 1970 or "0" if this workflow has not been finished yet.
550 */
551 public long getEndTime() {
552 if (status== WorkflowStatus.STATUS_COMPLETED || status== WorkflowStatus.STATUS_TERMINATED) {
553 return startTimeTotal+durationTotal;
554 } else {
555 return 0L;
556 }
557 }
558
559 /**
560 * Get a new unique error ID for this workflow.
561 *
562 * @return a String with a unique error ID.
563 */
564 public String createNewErrorID() {
565 return ""+ (++errorCounter);
566 }
567
568 protected void logStatistics() {
569 if (logger.isDebugEnabled()) {
570 logger.debug("UNDEFINED: " + getDurationUndefined() / 1000.0 + "s");
571 logger.debug("INITIATED: " + getDurationInitiated() / 1000.0 + "s");
572 logger.debug("RUNNING : " + getDurationRunning() / 1000.0 + "s");
573 logger.debug("ACTIVE : " + getDurationActive() / 1000.0 + "s");
574 logger.debug("SUSPENDED: " + getDurationSuspended() / 1000.0 + "s");
575 logger.debug("TOTAL : " + getDurationTotal() / 1000.0 + "s");
576 }
577 }
578
579 public int getTerminatedActivities() {
580 return terminatedActivities;
581 }
582
583 public synchronized void incrementTerminatedActivities(int terminatedActivities) {
584 this.terminatedActivities += terminatedActivities;
585 }
586
587 public int getCompletedActivities() {
588 return completedActivities;
589 }
590
591 public synchronized void incrementCompletedActivities(int completedActivities) {
592 this.completedActivities += completedActivities;
593 }
594
595 // a concrete implementation must implement the following abstract methods
596
597 /**
598 * Initiate this workflow. Status should switch to INITIATED. Method should only work if the status was UNDEFINED
599 * before. Implement this method in all derived classes!
600 */
601 public abstract void initiateWorkflow() throws WorkflowFormatException, StateTransitionException;
602
603 /**
604 * Start this workflow. Status should switch to RUNNING. Implement this method in all derived classes!
605 */
606 public abstract void startWorkflow() throws StateTransitionException;
607
608 /**
609 * Suspend this workflow. Waits until status switches to SUSPENDED. Implement this method in all derived classes!
610 */
611 public abstract void suspendWorkflow() throws StateTransitionException;
612
613 /**
614 * Suspend this workflow asynchonously. This method does not wait until workflow switches to SUSPENDED.
615 * Implement this method in all derived classes!
616 */
617 public abstract void suspendWorkflowAsync() throws StateTransitionException;
618
619 /**
620 * Resume this workflow. Status should switch to RUNNING. Implement this method in all derived classes!
621 */
622 public abstract void resumeWorkflow() throws StateTransitionException;
623
624 /**
625 * Abort this workflow. Waits until status switches to TERMINATED. Implement this method in all derived classes!
626 */
627 public abstract void abortWorkflow() throws StateTransitionException;
628
629 /**
630 * Abort this workflow asynchonously. This method does not wait until status switches to TERMINATED.
631 * Implement this method in all derived classes!
632 */
633 public abstract void abortWorkflowAsync() throws StateTransitionException;
634
635 /**
636 * Stores this workflow. Implement this method in all derived classes!
637 * @return The location identifier of the storage location.
638 */
639 public abstract String storeWorkflow() throws DatabaseException;
640
641 /**
642 * Get specific data that is hold inside the workflow and that is referenced by a data place identifier.
643 *
644 * @param placeID The place identifier that refers to the data
645 * @return The data as an array of Strings.
646 */
647 public abstract String[] getData(String placeID);
648
649 /**
650 * Get the human-readable description of the workflow.
651 * @return A String containing the description.
652 */
653 public abstract String getDescription();
654
655 /**
656 * Set the human-readable description of the workflow.
657 * @param description The description.
658 */
659 public abstract void setDescription(String description);
660
661 /**
662 * Implement this method for each Class that exends GridJob.
663 */
664 public abstract void run();
665
666 }