1 /*
2 * Copyright 2010 Fraunhofer Gesellschaft, Munich, Germany,
3 * for its Fraunhofer Institute for Computer Architecture and Software
4 * Technology (FIRST), Berlin, Germany. All rights reserved.
5 * http://www.first.fraunhofer.de/
6 */
7
8 package net.kwfgrid.gwes;
9
10 import org.apache.log4j.Logger;
11
12 import java.util.ArrayList;
13
14 /**
15 * @author Andreas Hoheisel
16 * (<a href="http://www.andreas-hoheisel.de">www.andreas-hoheisel.de</a>)
17 * @version $Id: ActivitySuperviser.java 1419 2010-11-01 14:12:17Z hoheisel $
18 */
19 final class ActivitySuperviser extends Thread {
20
21 /**
22 * log4j logger
23 */
24 static final Logger logger = Logger.getLogger(ActivitySuperviser.class);
25
26 /**
27 * List that contains all worker instances
28 */
29 private ArrayList<ActivityStarter> workers;
30
31 /**
32 * activity queue.
33 */
34 private ActivityQueue queue;
35
36 /**
37 * Maximum number of workers.
38 */
39 private final static int MAX_WORKERS = 128;
40
41 /**
42 * Timeout in milliseconds for join(), wait(), sleep(), ...
43 */
44 private final static int TIMEOUT = 1000;
45
46 /**
47 * Maximum number of atomicJobs in the submission queue before
48 * creating a new submission thread
49 */
50 private final static int MAX_QUEUE_SIZE = 1;
51
52 /**
53 * Constructor for the activity starter superviser.
54 *
55 * @param queue The queue that contains the <code>Activity</code> objects.
56 */
57 public ActivitySuperviser(ActivityQueue queue) {
58 this.queue = queue;
59 this.workers = new ArrayList<ActivityStarter>();
60 start();
61 }
62
63 /**
64 * Start the thread. This creates and supervises new activity starter workers.
65 */
66 public final void run() {
67 logger.info(this + " started ...");
68 ArrayList<ActivityStarter> deadWorkers = new ArrayList<ActivityStarter>();
69 workers.add(new ActivityStarter(queue));
70 while (!isInterrupted()) {
71 // create and start new ActivityStarter if too many elements are in queue
72 if (workers.size() < MAX_WORKERS && queue.getCurrentSize() >= MAX_QUEUE_SIZE) {
73 workers.add(new ActivityStarter(queue));
74 }
75
76 // check for timeouts and destroy hanging workers.
77 long now = System.currentTimeMillis();
78 for (ActivityStarter worker : workers) {
79 try {
80 long timestamp = worker.getStartTimestamp();
81 if (!worker.isAlive()) {
82 deadWorkers.add(worker);
83 logger.warn(worker.getName() + " is dead!");
84 } else if (timestamp > 0) {
85 Activity activity = worker.getActivity();
86 if (activity != null) {
87 long timeout = activity.getTimeoutRunning();
88 if (timeout == 0) timeout = Activity.DEFAULT_TIMEOUT_RUNNING;
89 if (now - timestamp > timeout) {
90 logger.warn(worker.getName() + " timeoutRunning reached.");
91 worker.interrupt();
92 }
93 }
94 }
95 } catch (Exception e) {
96 logger.error(e, e);
97 }
98 }
99
100 // remove workers that are not alive from list.
101 for (ActivityStarter worker : deadWorkers) {
102 workers.remove(worker);
103 logger.warn("Removed dead " + worker.getName() + ".");
104 worker = null;
105 }
106 deadWorkers.clear();
107
108 // sleep some time
109 try {
110 Thread.sleep(TIMEOUT);
111 } catch (InterruptedException e) {
112 logger.error(e, e);
113 }
114 }
115 exitAll();
116 logger.info(this + " finished.");
117 }
118
119 /**
120 * Exit all activity starters.
121 */
122 private void exitAll() {
123 logger.debug("exitAll() ...");
124 for (ActivityStarter worker : workers) {
125 while (worker.isAlive()) {
126 worker.interrupt();
127 try {
128 worker.join(TIMEOUT);
129 } catch (InterruptedException e1) {
130 // do nothing
131 }
132 }
133 }
134 }
135
136 }