1
2
3
4
5
6
7
8 package net.kwfgrid.gwes.monitor;
9
10 import net.kwfgrid.gwes.*;
11 import org.apache.log4j.Logger;
12
13 import java.io.BufferedReader;
14 import java.io.IOException;
15 import java.io.InputStreamReader;
16 import java.util.ArrayList;
17 import java.util.Enumeration;
18
19
20
21
22
23
24 public class GangliaCustomMetricUpdater extends Thread {
25
26
27 private final static int UPDATE_INTERVAL = 1000 * 60 * 5;
28
29 private final static int UPDATE_INTERVAL_MAX = 1000 * 60 * 60 * 24;
30
31 private GWESEngine engine;
32 static Logger logger = Logger.getLogger(GangliaCustomMetricUpdater.class);
33 static int threadcount = 0;
34 private String gmetric;
35 private String multicastChannel;
36 private int multicastPort;
37 private String multicastInterface;
38 private int activeWorkflows;
39 private int runningWorkflows;
40 private int activeActivities;
41 private int runningActivities;
42 private int completedActivities;
43 private int terminatedActivities;
44 private float completedActivitiesPerMinute;
45 private float terminatedActivitiesPerMinute;
46 long lastTime;
47
48
49
50
51
52
53 public GangliaCustomMetricUpdater(GWESEngine engine) {
54 super("GangliaCustomMetricUpdater#" + (++threadcount));
55 gmetric = System.getProperty(Constants.PROP_SYSTEM_GWES_GANGLIA_GMETRIC, "/usr/bin/gmetric");
56 multicastChannel = System.getProperty(Constants.PROP_SYSTEM_GWES_GANGLIA_MULTICAST_CHANNEL);
57 String portStr = System.getProperty(Constants.PROP_SYSTEM_GWES_GANGLIA_MULTICAST_PORT);
58 if (portStr != null) {
59 try {
60 multicastPort = Integer.parseInt(portStr);
61 } catch (NumberFormatException e) {
62 logger.error("The property "+ Constants.PROP_SYSTEM_GWES_GANGLIA_MULTICAST_PORT + " has wrong format: " + e);
63 }
64 }
65 multicastInterface = System.getProperty(Constants.PROP_SYSTEM_GWES_GANGLIA_MULTICAST_INTERFACE);
66 this.engine = engine;
67 start();
68 }
69
70
71
72
73 public final void run() {
74 logger.info(this + " started ...");
75 int interval = UPDATE_INTERVAL;
76
77 while (!isInterrupted()) {
78 try {
79 GenericWorkflowHandlerTable table = engine.getHandlerTable();
80 updateActionCount(table);
81 long freeMemory = Runtime.getRuntime().freeMemory();
82 long maxMemory = Runtime.getRuntime().maxMemory();
83 long totalMemory = Runtime.getRuntime().totalMemory();
84 float usedMemoryPercent = 100.0f * (float) (totalMemory - freeMemory ) / (float) maxMemory;
85
86 if ( usedMemoryPercent > 80.0 ) {
87 logger.info("Memory max/total/free: " + maxMemory + "/" + totalMemory + "/" + freeMemory);
88 logger.info("Used memory is more than 80% of max memory. Invoking garbage collection...");
89 System.gc();
90 }
91
92 send("tomcat_used_memory", "" + usedMemoryPercent, "float", "%");
93 send("tomcat_vm_threads", "" + countTotalThreads(), "int8", "threads");
94 send("gwes_workflows_in_memory", "" + table.size(), "int16", "workflows");
95 send("gwes_running_or_active_workflows", "" + (runningWorkflows + activeWorkflows), "int16", "workflows");
96 send("gwes_running_or_active_activities", "" + (runningActivities + activeActivities), "int16", "activities");
97 send("gwes_completed_activities_per_minute", "" + completedActivitiesPerMinute, "float", "activities/min");
98 send("gwes_terminated_activities_per_minute", "" + terminatedActivitiesPerMinute, "float", "activities/min");
99
100 interval = UPDATE_INTERVAL;
101 } catch (Exception e) {
102 if (interval < UPDATE_INTERVAL_MAX) interval *= 2;
103 logger.error("There was an exception, so I set the update interval to " + interval + "ms: " + e, e);
104 }
105
106 try {
107
108 if (!isInterrupted()) {
109 Thread.sleep(interval);
110 }
111 } catch (InterruptedException e) {
112 logger.error("I have been interrupted: " + e, e);
113 interrupt();
114 }
115 }
116 }
117
118 private void updateActionCount(GenericWorkflowHandlerTable handlerTable) {
119 activeWorkflows = 0;
120 runningWorkflows = 0;
121 activeActivities = 0;
122 runningActivities = 0;
123
124 int lastCompletedActivities = completedActivities;
125 int lastTerminatedActivities = terminatedActivities;
126 completedActivities = 0;
127 terminatedActivities = 0;
128
129 long now = System.currentTimeMillis();
130 Enumeration handlers = handlerTable.elements();
131 while (handlers.hasMoreElements()) {
132 GenericWorkflowHandler handler = (GenericWorkflowHandler) handlers.nextElement();
133 if (handler.getStatus() == WorkflowStatus.STATUS_ACTIVE) {
134 activeWorkflows++;
135
136 Enumeration activities = handler.getActivityTable().elements();
137 while (activities.hasMoreElements()) {
138 Activity activity = (Activity) activities.nextElement();
139 try {
140 Activity.Status status = activity.getStatus();
141 if (status == Activity.Status.ACTIVE) activeActivities++;
142 if (status == Activity.Status.RUNNING) runningActivities++;
143 } catch (Exception e) {
144 logger.warn("Warning: Exception when updating status of activity " + activity.getID() + ": " + e);
145 }
146 }
147 }
148 if (handler.getStatus() == WorkflowStatus.STATUS_RUNNING) {
149 runningWorkflows++;
150 }
151 completedActivities += handler.getCompletedActivities();
152 terminatedActivities += handler.getTerminatedActivities();
153 }
154
155 if (lastTime > 0) {
156 completedActivitiesPerMinute = ((float) (completedActivities - lastCompletedActivities)) / ((float) (now - lastTime)) * 60000f;
157 if (completedActivitiesPerMinute < 0f) completedActivitiesPerMinute = 0f;
158 terminatedActivitiesPerMinute = ((float) (terminatedActivities - lastTerminatedActivities)) / ((float) (now - lastTime)) * 60000f;
159 if (terminatedActivitiesPerMinute < 0f) terminatedActivitiesPerMinute = 0f;
160 }
161
162 lastTime = now;
163 }
164
165 private static int countTotalThreads() {
166 ThreadGroup top = Thread.currentThread().getThreadGroup();
167 while (top.getParent() != null) {
168 top = top.getParent();
169 }
170 return top.activeCount();
171 }
172
173
174
175
176
177
178
179
180
181
182
183 protected String send(String name, String value, String type, String unit) throws IOException {
184 if (logger.isDebugEnabled()) {
185 logger.debug("Sending "+name+"="+ value+" to Ganglia...");
186 }
187
188 ArrayList<String> argsList = new ArrayList<String>();
189
190 if (gmetric != null && gmetric.length() > 0) {
191 argsList.add(gmetric);
192 } else {
193 throw new IOException("Path to executable not valid. Please set property \"gwes.ganglia.gmetric\".");
194 }
195
196 argsList.add("-n" + name.trim());
197 argsList.add("-v" + value.trim());
198 argsList.add("-t" + type);
199 argsList.add("-u" + unit);
200
201 if (multicastChannel != null && multicastChannel.length() > 0) {
202 argsList.add("-c" + multicastChannel);
203 }
204
205 if (multicastPort > 0) {
206 argsList.add("-p" + multicastPort);
207 }
208
209 if (multicastInterface != null && multicastInterface.length() > 0) {
210 argsList.add("-i" + multicastInterface);
211 }
212
213 String[] args = new String[argsList.size()];
214 for (int i = 0; i < argsList.size(); i++) {
215 args[i] = argsList.get(i);
216 }
217 Process p = Runtime.getRuntime().exec(args);
218 BufferedReader r = new BufferedReader(new InputStreamReader(p.getInputStream()));
219 StringBuffer ret = new StringBuffer();
220 String line;
221 while ((line = r.readLine()) != null) {
222 ret.append(line);
223 ret.append("\n");
224 }
225
226 closeProcessStreams(p);
227
228 if (ret.length() > 1) {
229 logger.warn("Output of " + gmetric + ": " + ret.toString());
230 }
231 return ret.toString();
232 }
233
234
235
236
237
238
239 public static void closeProcessStreams(Process p) throws IOException {
240 p.getErrorStream().close();
241 p.getInputStream().close();
242 p.getOutputStream().close();
243 p.destroy();
244 }
245
246
247 }