1
2
3
4
5
6
7
8 package net.kwfgrid.gwes;
9
10 import net.kwfgrid.gwes.exception.ActivityException;
11 import net.kwfgrid.gwes.exception.LoggingException;
12 import net.kwfgrid.gwes.util.HTMLFilter;
13 import net.kwfgrid.gworkflowdl.structure.*;
14 import org.apache.log4j.Logger;
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 public abstract class Activity {
44
45
46
47
48 public static final Logger logger = Logger.getLogger(Activity.class);
49
50
51
52
53 private static final long WAIT = Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_WORKFLOW_SLEEPTIME_MAX,"60000"));
54
55
56
57
58 public final String ID;
59
60
61
62
63 public GenericWorkflowHandler handler;
64
65
66
67
68
69
70 private Status status;
71
72
73
74
75 protected TransitionOccurrence to;
76
77
78
79
80 protected OperationCandidate op;
81
82
83
84
85 public long timeoutRunning;
86
87
88
89
90 public long timeoutActive;
91
92
93
94
95
96 private String faultMessage;
97
98
99
100
101 private SOAPFault soapFault;
102
103
104
105
106 private GWESLogger glog;
107
108
109
110
111
112 private int attempts = -1;
113
114
115
116
117 private long durationUndefined = 0L;
118 private long startTimeUndefined = 0L;
119
120
121
122
123 private long durationInitiated = 0L;
124 private long startTimeInitiated = 0L;
125
126
127
128
129 private long durationRunning = 0L;
130 private long startTimeRunning = 0L;
131
132
133
134
135 private long durationActive = 0L;
136 private long startTimeActive = 0L;
137
138
139
140
141 private long durationSuspended = 0L;
142 private long startTimeSuspended = 0L;
143
144
145
146
147 private long durationTotal = 0L;
148 private long startTimeTotal = 0L;
149
150
151
152
153
154 public static final long DEFAULT_TIMEOUT_RUNNING = 86400000;
155
156
157
158
159
160 public static final long DEFAULT_TIMEOUT_ACTIVE = 7200000;
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179 public static enum Status {
180 UNDEFINED,
181 INITIATED,
182 RUNNING,
183 SUSPENDED,
184 ACTIVE,
185 TERMINATED,
186 COMPLETED,
187 FAILED
188 }
189
190
191
192
193 public Integer lastExitCode;
194
195
196
197
198
199
200
201
202
203 public Activity(GenericWorkflowHandler handler, TransitionOccurrence to, OperationCandidate op) throws WorkflowFormatException, LoggingException {
204 this.handler = handler;
205 this.ID = (handler == null) ? "Activity_"+ java.util.UUID.randomUUID() : this.handler.getNewActivityID();
206 this.to = to;
207 this.op = op;
208 this.glog = (handler == null) ? GWESLogger.getInstance() : handler.getGlog();
209 setAttempts();
210 setActivityTimeouts();
211 setStatus(Status.UNDEFINED);
212 if (logger.isDebugEnabled()) {
213 logger.debug(this.to);
214 }
215 }
216
217
218
219
220 public synchronized void setStatus(Status status) {
221 if (status != this.status) {
222 Status oldStatus = this.status;
223 this.status = status;
224 long now = System.currentTimeMillis();
225
226 try {
227 glog.logEventSP(
228 GWESLogger.Event.ACTIVITY_STATUS_CHANGE,
229 (handler == null) ? null : getHandler().getUserID(),
230 this,
231 "activityStatus",
232 (oldStatus == null) ? null : getStatusAsString(oldStatus),
233 getStatusAsString(status));
234 } catch (LoggingException e) {
235 logger.warn("Exception during log: "+e,e);
236 }
237
238
239 switch (this.status) {
240 case UNDEFINED:
241 startTimeUndefined = now;
242 if (startTimeTotal == 0L) {
243 startTimeTotal = now;
244 }
245 break;
246 case INITIATED:
247 startTimeInitiated = now;
248 break;
249 case RUNNING:
250 startTimeRunning = now;
251 break;
252 case SUSPENDED:
253 startTimeSuspended = now;
254 break;
255 case ACTIVE:
256 startTimeActive = now;
257 break;
258 case COMPLETED:
259 if (handler != null) handler.incrementCompletedActivities(1);
260 break;
261 case TERMINATED:
262 if (handler != null) handler.incrementTerminatedActivities(1);
263 break;
264 case FAILED:
265 break;
266 }
267
268 if (oldStatus != null) {
269 switch (oldStatus) {
270 case UNDEFINED:
271 durationUndefined = now - startTimeUndefined;
272 break;
273 case INITIATED:
274 durationInitiated += now - startTimeInitiated;
275 break;
276 case RUNNING:
277 durationRunning += now - startTimeRunning;
278 break;
279 case SUSPENDED:
280 durationSuspended += now - startTimeSuspended;
281 break;
282 case ACTIVE:
283 durationActive += now - startTimeActive;
284 break;
285 }
286 }
287
288 if (isFinalStatus() || status==Status.FAILED) {
289 durationTotal = now - startTimeTotal;
290 logStatistics();
291 }
292
293
294 synchronized (this) {
295 this.notifyAll();
296 }
297 }
298 }
299
300
301
302
303
304
305
306 public Status getStatus() {
307 return status;
308 }
309
310
311
312
313
314
315
316
317 public Status waitForStatusChangeFrom(Status oldStatus) throws InterruptedException {
318 while (status == oldStatus) {
319 synchronized (this) {
320 this.wait(WAIT);
321 }
322 getStatus();
323 }
324 return status;
325 }
326
327
328
329
330
331
332
333 public void waitForStatusChangeTo(Status newStatus) throws InterruptedException {
334 while (status != newStatus) {
335 synchronized (this) {
336 this.wait(WAIT);
337 }
338 getStatus();
339 }
340 }
341
342
343
344
345
346
347 public void waitForCompleteOrTerminate() throws InterruptedException {
348 while (!isFinalStatus()) {
349 synchronized (this) {
350 this.wait(WAIT);
351 }
352 getStatus();
353 }
354 }
355
356 public boolean isFinalStatus() {
357 return isFinalStatus(status);
358 }
359
360 public static boolean isFinalStatus(Status status) {
361 switch(status) {
362 case COMPLETED: return true;
363 case TERMINATED: return true;
364 }
365 return false;
366 }
367
368
369
370
371
372
373
374
375
376 public String getStatusAsString() {
377 return getStatusAsString(status);
378 }
379
380
381
382
383
384
385
386 public static String getStatusAsString(Status status) {
387 if (status==null) return "null";
388 switch(status) {
389 case UNDEFINED: return "UNDEFINED";
390 case INITIATED: return "INITIATED";
391 case RUNNING: return "RUNNING";
392 case SUSPENDED: return "SUSPENDED";
393 case ACTIVE: return "ACTIVE";
394 case TERMINATED: return "TERMINATED";
395 case COMPLETED: return "COMPLETED";
396 case FAILED: return "FAILED";
397 }
398 return "UNDEFINED";
399 }
400
401
402
403
404
405
406 public String getID() {
407 return ID;
408 }
409
410
411
412
413
414
415 public GenericWorkflowHandler getHandler() {
416 return handler;
417 }
418
419
420
421
422
423 public GWESLogger getGlog() {
424 return glog;
425 }
426
427
428
429
430
431 public TransitionOccurrence getTransitionOccurrence() {
432 return to;
433 }
434
435
436
437
438
439 public OperationCandidate getOperationCandidate() {
440 return op;
441 }
442
443
444
445
446
447 public String getOperationName() {
448 return op.getOperationName();
449 }
450
451
452
453
454
455 public String getResourceName() {
456 return op.getResourceName();
457 }
458
459
460
461
462
463 public long getTimeoutRunning() {
464 return timeoutRunning;
465 }
466
467
468
469
470
471 protected void setTimeoutRunning(long timeoutRunning) {
472 this.timeoutRunning = timeoutRunning;
473 }
474
475
476
477
478
479 public long getTimeoutActive() {
480 return timeoutActive;
481 }
482
483
484
485
486
487 protected void setTimeoutActive(long timeoutActive) {
488 this.timeoutActive = timeoutActive;
489 }
490
491
492
493
494
495 public String getFaultMessage() {
496 if (faultMessage == null && soapFault != null) {
497 faultMessage = HTMLFilter.filter(soapFault.toTokenString());
498 }
499 return faultMessage;
500 }
501
502
503
504
505
506 public void setFaultMessage(String faultMessage) {
507 this.faultMessage = faultMessage;
508 }
509
510
511
512
513
514 public void appendFaultMessage(String faultMessage) {
515 if (this.faultMessage == null || this.faultMessage.length() == 0) {
516 this.faultMessage = faultMessage;
517 } else {
518 this.faultMessage += "\n" + faultMessage;
519 }
520 }
521
522
523
524
525
526 public synchronized SOAPFault getSoapFault() {
527 if (soapFault == null && faultMessage != null) {
528 SOAPFault fault = new SOAPFault("env:Server", "Activity '" + getID() + "': Exception during activity invocation");
529 if (op != null) {
530 fault.setNode(op.getResourceName().startsWith("hardware:") ? op.getResourceName().split(":")[1] : op.getResourceName());
531 }
532 if (this.lastExitCode != null) fault.setExitCode(this.lastExitCode);
533 fault.setDetail(toString() + "\n" + getFaultMessage());
534 setSoapFault(fault);
535 }
536 return soapFault;
537 }
538
539
540
541
542
543 public void setSoapFault(SOAPFault soapFault) {
544 this.soapFault = soapFault;
545 }
546
547
548
549
550
551 public long getStartTimeTotal() {
552 return startTimeTotal;
553 }
554
555
556
557
558
559
560 public long getDurationUndefined() {
561 return durationUndefined;
562 }
563
564
565
566
567
568
569 public long getDurationInitiated() {
570 return durationInitiated;
571 }
572
573
574
575
576
577
578 public long getDurationRunning() {
579 return durationRunning;
580 }
581
582
583
584
585
586
587 public long getDurationActive() {
588 return durationActive;
589 }
590
591
592
593
594
595
596 public long getDurationSuspended() {
597 return durationSuspended;
598 }
599
600
601
602
603
604
605 public long getDurationTotal() {
606 return durationTotal;
607 }
608
609
610
611
612
613
614 public synchronized int getAttempts() {
615 return attempts;
616 }
617
618 public synchronized void decreaseAttempts() {
619 attempts--;
620 }
621
622
623
624
625
626 public void enqueueActivity() throws WorkflowFormatException {
627 if (status == Status.INITIATED) {
628 if (handler != null) {
629 handler.getEngine().getActivityQueue().enqueue(this);
630 } else {
631 logger.error("Trying to enqueue activity without handler.");
632 }
633 } else {
634 logger.error("Wrong status for enqueue activity " + getStatusAsString());
635 }
636 }
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658 public String[] getStatusArray() {
659 String[] ret = new String[15];
660 int i=0;
661 ret[i++] = "ID="+getID();
662 ret[i++] = "status="+getStatusAsString();
663 ret[i++] = "activityClass=" + getClass().getName();
664 ret[i++] = "operationName=" + getOperationName();
665 ret[i++] = "resourceName=" + getResourceName();
666 ret[i++] = "timeoutActive=" + getTimeoutActive();
667 ret[i++] = "timeoutRunning=" + getTimeoutRunning();
668 ret[i++] = "birthdayMs="+getStartTimeTotal();
669 ret[i++] = "durationUndefinedMs=" + getDurationUndefined();
670 ret[i++] = "durationInitiatedMs=" + getDurationInitiated();
671 ret[i++] = "durationRunningMs=" + getDurationRunning();
672 ret[i++] = "durationActiveMs=" + getDurationActive();
673 ret[i++] = "durationSuspendedMs=" + getDurationSuspended();
674 ret[i++] = "durationTotalMs=" + getDurationTotal();
675 ret[i++] = "endTimeMs=" + (getDurationTotal() == 0L ? 0L : getStartTimeTotal()+getDurationTotal());
676 return ret;
677 }
678
679 private void logStatistics() {
680 if (logger.isDebugEnabled()) {
681 logger.debug("UNDEFINED: " + getDurationUndefined() / 1000.0 + "s");
682 logger.debug("INITIATED: " + getDurationInitiated() / 1000.0 + "s");
683 logger.debug("RUNNING : " + getDurationRunning() / 1000.0 + "s");
684 logger.debug("ACTIVE : " + getDurationActive() / 1000.0 + "s");
685 logger.debug("SUSPENDED: " + getDurationSuspended() / 1000.0 + "s");
686 logger.debug("TOTAL : " + getDurationTotal() / 1000.0 + "s");
687 }
688 }
689
690 private void setAttempts() throws WorkflowFormatException {
691
692 if (to != null && to.transition != null) {
693
694
695 String hasFailedProperty = to.transition.getProperties().get(Constants.PROP_TRANSITION_ACTIVITY_HAS_FAILED);
696 if (hasFailedProperty != null && hasFailedProperty.equals(Constants.TRUE)) {
697 String attemptsStr = to.getProps().get(Constants.PROP_TOKEN_ACTIVITY_MAXATTEMPTS);
698 if (attemptsStr != null) {
699 try {
700 attempts = Integer.parseInt(attemptsStr);
701 return;
702 } catch (NumberFormatException e) {
703 throw new WorkflowFormatException("Wrong format of token property \""+Constants.PROP_TOKEN_ACTIVITY_MAXATTEMPTS+"\": " + e, e);
704 }
705 }
706 }
707
708
709 String attemptsStr = to.transition.getProperties().get(Constants.PROP_TRANSITION_ACTIVITY_MAXATTEMPTS);
710 if (attemptsStr != null) {
711 try {
712 attempts = Integer.parseInt(attemptsStr);
713 return;
714 } catch (NumberFormatException e) {
715 throw new WorkflowFormatException("Wrong format of transition property \""+Constants.PROP_TRANSITION_ACTIVITY_MAXATTEMPTS+"\": " + e, e);
716 }
717 }
718 }
719
720
721 if (handler != null && handler.getWorkflow() != null) {
722 Object wfObj = handler.getWorkflow();
723 if (wfObj instanceof Workflow) {
724 String attemptsStr = ((Workflow) wfObj).getProperties().get(Constants.PROP_ACTIVITY_MAXATTEMPTS);
725 if (attemptsStr != null) {
726 try {
727 attempts = Integer.parseInt(attemptsStr);
728 return;
729 } catch (NumberFormatException e) {
730 throw new WorkflowFormatException("Wrong format of workflow property \""+Constants.PROP_TRANSITION_ACTIVITY_MAXATTEMPTS+"\": " + e, e);
731 }
732 }
733 }
734 }
735
736
737 String attemptsStr = System.getProperty(Constants.PROP_SYSTEM_GWES_ACTIVITY_MAXATTEMPTS);
738 if (attemptsStr != null) {
739 try {
740 attempts = Integer.parseInt(attemptsStr);
741 return;
742 } catch (NumberFormatException e) {
743 throw new WorkflowFormatException("Wrong format of property \""+Constants.PROP_SYSTEM_GWES_ACTIVITY_MAXATTEMPTS+"\" in gwes.properties: " + e, e);
744 }
745 }
746
747
748 attempts = 1;
749 }
750
751 private void setActivityTimeouts() throws WorkflowFormatException {
752
753 if (to == null) {
754 logger.warn("setActivityTimeouts(): Transition occurrence is null for "+ID);
755 return;
756 }
757
758
759 String timeoutProperty = to.transition.getProperties().get(Constants.PROP_TRANSITION_TIMEOUT);
760 String timeoutRunningProperty = to.transition.getProperties().get(Constants.PROP_TRANSITION_TIMEOUT_RUNNING);
761 String timeoutActiveProperty = to.transition.getProperties().get(Constants.PROP_TRANSITION_TIMEOUT_ACTIVE);
762
763 if (timeoutRunningProperty != null) {
764 try {
765 setTimeoutRunning(Long.parseLong(timeoutRunningProperty));
766 } catch (NumberFormatException e) {
767 throw new WorkflowFormatException("Wrong format of property \"timeout.running\": " + e, e);
768 }
769 } else if (timeoutProperty != null) {
770 try {
771 setTimeoutRunning(Long.parseLong(timeoutProperty));
772 } catch (NumberFormatException e) {
773 throw new WorkflowFormatException("Wrong format of property \"timeout\": " + e, e);
774 }
775 } else {
776 setTimeoutRunning(Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_GWES_ACTIVITY_TIMEOUT_RUNNING, "" + Activity.DEFAULT_TIMEOUT_RUNNING)));
777 }
778
779
780 if (timeoutActiveProperty != null) {
781 try {
782 setTimeoutActive(Long.parseLong(timeoutActiveProperty));
783 } catch (NumberFormatException e) {
784 throw new WorkflowFormatException("Wrong format of property \"timeout.active\": " + e, e);
785 }
786 } else if (timeoutProperty != null) {
787 try {
788 setTimeoutActive(Long.parseLong(timeoutProperty));
789 } catch (NumberFormatException e) {
790 throw new WorkflowFormatException("Wrong format of property \"timeout\": " + e, e);
791 }
792 } else {
793 setTimeoutActive(Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_GWES_ACTIVITY_TIMEOUT_ACTIVE, "" + Activity.DEFAULT_TIMEOUT_ACTIVE)));
794 }
795 }
796
797
798
799
800
801
802
803 public abstract void initiateActivity() throws ActivityException;
804
805
806
807
808 public abstract void startActivity() throws ActivityException, WorkflowFormatException;
809
810
811
812
813
814
815
816
817 public abstract void simulateActivity() throws ActivityException, WorkflowFormatException;
818
819
820
821
822 public abstract void suspendActivity() throws ActivityException;
823
824
825
826
827 public abstract void resumeActivity() throws ActivityException;
828
829
830
831
832 public abstract void abortActivity() throws ActivityException;
833
834
835
836
837 public abstract void restartActivity() throws ActivityException;
838
839
840
841
842 public abstract void cleanupActivity() throws ActivityException;
843
844
845
846
847
848 public String toString() {
849 StringBuffer buffer = new StringBuffer();
850 buffer.append(getClass().getName()).append('[');
851 buffer.append("ID=").append(ID);
852 if (to != null) {
853 buffer.append("; ").append(to);
854 }
855 if (op != null) {
856 buffer.append("; OperationName=").append(op.getOperationName());
857 buffer.append("; ResourceName=").append(op.getResourceName());
858 }
859 buffer.append(']');
860 return buffer.toString();
861 }
862
863
864
865
866
867
868 protected void putFaultTokenOnOutputPlaces() throws WorkflowFormatException {
869 if (to == null) return;
870 logger.debug("putting one fault token on each of the output places...");
871
872 for (TokenParameter tp : to.tokens) {
873 switch (tp.scope) {
874 case OUTPUT:
875 if (tp.token != null) continue;
876 String edgeExpression = tp.edge.getExpression();
877 if (edgeExpression != null && edgeExpression.length() > 0) {
878 try {
879 tp.token = getSoapFault().toToken();
880 } catch (NullPointerException e) {
881 logger.error("NullPointerException during putting fault tokens on output places: "+e,e);
882 }
883 }
884 break;
885 }
886 }
887 }
888
889 }