View Javadoc

1   /*
2    * Copyright 2010 Fraunhofer Gesellschaft, Munich, Germany,
3    * for its Fraunhofer Institute for Computer Architecture and Software
4    * Technology (FIRST), Berlin, Germany. All rights reserved.
5    * http://www.first.fraunhofer.de/
6    */
7   
8   package net.kwfgrid.gwes;
9   
10  import net.kwfgrid.gwes.exception.ActivityException;
11  import net.kwfgrid.gworkflowdl.structure.WorkflowFormatException;
12  import org.apache.log4j.Logger;
13  
14  /**
15   * @author Andreas Hoheisel
16   *         (<a href="http://www.andreas-hoheisel.de">www.andreas-hoheisel.de</a>)
17   * @version $Id: ActivityStarter.java 1490 2011-02-18 13:20:36Z hoheisel $
18   */
19  final class ActivityStarter extends Thread {
20  
21      /**
22       * log4j logger
23       */
24      static final Logger logger = Logger.getLogger(ActivityStarter.class);
25  
26      /**
27       * activity queue
28       */
29      private ActivityQueue queue;
30  
31      /**
32       * timestamp at begin of activity.startActivity
33       */
34      private long startTimestamp;
35  
36      /**
37       * Current activity handled by this starter.
38       */
39      private Activity activity;
40  
41      /**
42       * Thread counter
43       */
44      private static int threadCounter = 0;
45  
46      /**
47       * Constructor for the multi-threaded activity starter
48       *
49       * @param queue The queue that contains the <code>Activity</code> objects.
50       */
51      public ActivityStarter(ActivityQueue queue) {
52          super("ActivityStarter#" + (++threadCounter));
53          this.queue = queue;
54          start();
55      }
56  
57      /**
58       * Start the thread.  This method dequeues activities from the queue and invokes
59       * the startActivity() method of each activity until thread is interrupted.
60       */
61      public final void run() {
62          logger.debug(this + " started ...");
63          while (!isInterrupted()) {
64  
65              // wait for queue.enqueue if queue is empty
66              try {
67                  while (queue.isEmpty() && !isInterrupted()) {
68                      queue.waitForEnqueue();
69                  }
70              } catch (InterruptedException e) {
71                  logger.warn("waitForEnqueue() ... interrupted");
72                  interrupt();
73              }
74  
75              // start activity - submit job
76              if (!isInterrupted()) {
77                  activity = queue.dequeueActivity();
78                  if (activity != null) {
79                      if (logger.isDebugEnabled()) {
80                          logger.debug("activity#" + activity.getID() + " dequeued");
81                      }
82                      try {
83                          startTimestamp = System.currentTimeMillis();
84                          activity.startActivity();
85                          if (logger.isDebugEnabled()) {
86                              logger.debug("Duration for starting activity#" + activity.getID()
87                                      + ": " + (System.currentTimeMillis() - startTimestamp) / 1000.0 + "s");
88                          }
89                          startTimestamp = 0;
90                      } catch (ActivityException e) {
91                          FaultToleranceHandler.initiateFaultToleranceOrTerminate(activity);
92                      } catch (WorkflowFormatException e) {
93                          activity.setStatus(Activity.Status.TERMINATED);
94                          activity.appendFaultMessage("Workflow format exception when trying to start activity '" + activity.getID() + "': " + e);
95                          logger.error("Workflow format exception when trying to start activity '" + activity.getID() + "': " + e, e);
96                      } catch (Exception e) {
97                          activity.setStatus(Activity.Status.TERMINATED);
98                          activity.appendFaultMessage("Exception when trying to start activity '" + activity.getID() + "': " + e);
99                          logger.error("exception when trying to start activity '" + activity.getID() + "': " + e, e);
100                     } catch (java.lang.NoClassDefFoundError e) {
101                         activity.setStatus(Activity.Status.TERMINATED);
102                         String message = "Class missing in classpath when trying to start activity '" + activity.getID() + "'! Check implementation of class '"+activity.getClass().getName()+"'! Caused by: ";
103                         activity.appendFaultMessage(message+e);
104                         logger.error(message + e.getMessage(), e);
105                     }
106                     activity = null;
107                 }
108             }
109         }
110         logger.debug(this.getName()+" finished.");
111     }
112 
113     public long getStartTimestamp() {
114         return startTimestamp;
115     }
116 
117     public Activity getActivity() {
118         return activity;
119     }
120 
121 }