View Javadoc

1   /*
2    * Copyright 2010 Fraunhofer Gesellschaft, Munich, Germany,
3    * for its Fraunhofer Institute for Computer Architecture and Software
4    * Technology (FIRST), Berlin, Germany. All rights reserved.
5    * http://www.first.fraunhofer.de/
6    */
7   
8   package net.kwfgrid.gwes;
9   
10  import org.apache.log4j.Logger;
11  
12  import java.util.ArrayList;
13  
14  /**
15   * @author Andreas Hoheisel
16   *         (<a href="http://www.andreas-hoheisel.de">www.andreas-hoheisel.de</a>)
17   * @version $Id: ActivitySuperviser.java 1419 2010-11-01 14:12:17Z hoheisel $
18   */
19  final class ActivitySuperviser extends Thread {
20  
21      /**
22       * log4j logger
23       */
24      static final Logger logger = Logger.getLogger(ActivitySuperviser.class);
25  
26      /**
27       * List that contains all worker instances
28       */
29      private ArrayList<ActivityStarter> workers;
30  
31      /**
32       * activity queue.
33       */
34      private ActivityQueue queue;
35  
36      /**
37       * Maximum number of workers.
38       */
39      private final static int MAX_WORKERS = 128;
40  
41      /**
42       * Timeout in milliseconds for join(), wait(), sleep(), ...
43       */
44      private final static int TIMEOUT = 1000;
45  
46      /**
47       * Maximum number of atomicJobs in the submission queue before
48       * creating a new submission thread
49       */
50      private final static int MAX_QUEUE_SIZE = 1;
51  
52      /**
53       * Constructor for the activity starter superviser.
54       *
55       * @param queue The queue that contains the <code>Activity</code> objects.
56       */
57      public ActivitySuperviser(ActivityQueue queue) {
58          this.queue = queue;
59          this.workers = new ArrayList<ActivityStarter>();
60          start();
61      }
62  
63      /**
64       * Start the thread.  This creates and supervises new activity starter workers.
65       */
66      public final void run() {
67          logger.info(this + " started ...");
68          ArrayList<ActivityStarter> deadWorkers = new ArrayList<ActivityStarter>();
69          workers.add(new ActivityStarter(queue));
70          while (!isInterrupted()) {
71              // create and start new ActivityStarter if too many elements are in queue
72              if (workers.size() < MAX_WORKERS && queue.getCurrentSize() >= MAX_QUEUE_SIZE) {
73                  workers.add(new ActivityStarter(queue));
74              }
75  
76              // check for timeouts and destroy hanging workers.
77              long now = System.currentTimeMillis();
78              for (ActivityStarter worker : workers) {
79                  try {
80                      long timestamp = worker.getStartTimestamp();
81                      if (!worker.isAlive()) {
82                          deadWorkers.add(worker);
83                          logger.warn(worker.getName() + " is dead!");
84                      } else if (timestamp > 0) {
85                          Activity activity = worker.getActivity();
86                          if (activity != null) {
87                              long timeout = activity.getTimeoutRunning();
88                              if (timeout == 0) timeout = Activity.DEFAULT_TIMEOUT_RUNNING;
89                              if (now - timestamp > timeout) {
90                                  logger.warn(worker.getName() + " timeoutRunning reached.");
91                                  worker.interrupt();
92                              }
93                          }
94                      }
95                  } catch (Exception e) {
96                      logger.error(e, e);
97                  }
98              }
99  
100             // remove workers that are not alive from list.
101             for (ActivityStarter worker : deadWorkers) {
102                 workers.remove(worker);
103                 logger.warn("Removed dead " + worker.getName() + ".");
104                 worker = null;
105             }
106             deadWorkers.clear();
107 
108             // sleep some time
109             try {
110                 Thread.sleep(TIMEOUT);
111             } catch (InterruptedException e) {
112                 logger.error(e, e);
113             }
114         }
115         exitAll();
116         logger.info(this + " finished.");
117     }
118 
119     /**
120      * Exit all activity starters.
121      */
122     private void exitAll() {
123         logger.debug("exitAll() ...");
124         for (ActivityStarter worker : workers) {
125             while (worker.isAlive()) {
126                 worker.interrupt();
127                 try {
128                     worker.join(TIMEOUT);
129                 } catch (InterruptedException e1) {
130                     // do nothing
131                 }
132             }
133         }
134     }
135 
136 }