View Javadoc

1   /*
2    * $Id: FaultToleranceHandler.java 1532 2011-06-23 15:57:53Z hoheisel $
3    *
4    * Copyright 2008 Fraunhofer Gesellschaft, Munich, Germany,
5    * for its Fraunhofer Institute for Computer Architecture and Software Technology (FIRST), Berlin, Germany
6    * All rights reserved.
7    *
8    * See http://www.first.fraunhofer.de and http://www.gridworkflow.org/gwes for more details.
9    */
10  
11  package net.kwfgrid.gwes;
12  
13  import net.kwfgrid.gworkflowdl.structure.OperationClass;
14  import net.kwfgrid.gworkflowdl.structure.OperationCandidate;
15  import net.kwfgrid.gworkflowdl.structure.Transition;
16  import net.kwfgrid.gworkflowdl.structure.WorkflowFormatException;
17  import net.kwfgrid.gwes.exception.LoggingException;
18  import net.kwfgrid.gwes.exception.ActivityException;
19  import org.apache.log4j.Logger;
20  
21  /**
22   * @author Andreas Hoheisel
23   *         (<a href="http://www.andreas-hoheisel.de">www.andreas-hoheisel.de</a>)
24   * @version $Id: FaultToleranceHandler.java 1532 2011-06-23 15:57:53Z hoheisel $
25   */
26  public class FaultToleranceHandler {
27  
28      /**
29       * log4j logger.
30       */
31      private final static Logger logger = Logger.getLogger(FaultToleranceHandler.class);
32  
33  
34      /**
35       * If this is not the last attempt, set the status to Status.FAILED and initiate fault tolerance,
36       * otherwise set Status.TERMINATED.
37       *
38       * As side effect, this method decreases the attempts variable and stores it back to as property
39       * to the input tokens and removes the failed operation candidate from the transition's operation.
40       */
41      public static Activity.Status initiateFaultToleranceOrTerminate(Activity activity) {
42          // evaluate number of attempts
43          if (activity.getAttempts() > 1) {
44      		activity.setStatus(Activity.Status.FAILED);
45              activity.decreaseAttempts();
46              TransitionOccurrence to = activity.getTransitionOccurrence();
47              to.putProperty(Constants.PROP_TOKEN_ACTIVITY_MAXATTEMPTS, Integer.toString(activity.getAttempts()));
48              to.movePropertyToInputTokens(Constants.PROP_TOKEN_ACTIVITY_MAXATTEMPTS);
49              logFailure(activity);
50              removeFailedOperationCandidate(activity);
51      		return Activity.Status.FAILED;
52      	} else {
53              try {
54                  activity.putFaultTokenOnOutputPlaces();
55              } catch (WorkflowFormatException e) {
56                  logger.error("Exception during fault tolerance for activity "+activity+": "+e, e);
57              }
58      		activity.setStatus(Activity.Status.TERMINATED);
59      		return Activity.Status.TERMINATED;
60      	}
61      }
62  
63      /**
64       * Remove failed operation candidate.
65       * @param activity
66       */
67      private static void removeFailedOperationCandidate(Activity activity) {
68          // remove operation candidates invoked on failed resource
69          ///ToDo: check if this makes all thread-safe!
70          Transition transition = activity.getTransitionOccurrence().transition;
71          synchronized (transition.getOperation()) {
72              OperationClass operationClass = (OperationClass) transition.getOperation().get();
73              OperationCandidate[] opcs = operationClass.getOperationCandidates();
74              if (opcs != null) {
75                  int selectedCandidate = -1;
76                  for (OperationCandidate cand : opcs) {
77                      selectedCandidate++;
78                      ///ToDO check if this works for extended resource names!
79                      if (activity.getResourceName().equals(cand.getResourceName())) {
80                          if (logger.isDebugEnabled()) {
81                              logger.debug("Removing failed operation candidate entry no. " + selectedCandidate + " in transition.");
82                          }
83                          operationClass.removeOperationCandidate(selectedCandidate);
84                          break;
85                      }
86                  }
87              }
88          }
89      }
90  
91      /**
92       * Write log message to workflow and mark transition.
93       * @param activity
94       */
95      private static void logFailure(Activity activity) {
96  
97          // set workflow warning property if there is any activity fault message
98          String faultMessage = activity.getFaultMessage();
99          ActivityException exception = new ActivityException("Trying to recover from fault in " + activity + ". Remaining "+activity.getAttempts()+" attempt(s):\n" + faultMessage);
100         try {
101             GWESLogger.getInstance().logEvent(
102                 GWESLogger.Event.ACTIVITY_START,
103                 GWESLogger.EventOutcome.WARNING,
104                 activity.getHandler().getUserID(),
105                 activity,
106                 exception);
107         } catch (LoggingException e) {
108             logger.error("exception:\n" + e, e);
109         }
110 
111         GenericWorkflowHandler handler = activity.getHandler();
112         if (faultMessage != null && handler != null && handler instanceof GWorkflowDLHandler) {
113             GWorkflowDLHandler gwdlHandler = (GWorkflowDLHandler) handler;
114             gwdlHandler.getWorkflow().getProperties().put(Constants.PROP_WARN_ + handler.createNewErrorID(), exception.toString());
115         }
116 
117         // mark transition as handled with fault tolerant mechanisms
118         activity.getTransitionOccurrence().transition.getProperties().put(Constants.PROP_TRANSITION_ACTIVITY_HAS_FAILED, Constants.TRUE);
119     }
120 
121 }