View Javadoc

1   /*
2    * Copyright 2010 Fraunhofer Gesellschaft, Munich, Germany,
3    * for its Fraunhofer Institute for Computer Architecture and Software
4    * Technology (FIRST), Berlin, Germany. All rights reserved.
5    * http://www.first.fraunhofer.de/
6    */
7   
8   package net.kwfgrid.gwes.monitor;
9   
10  import net.kwfgrid.gwes.*;
11  import org.apache.log4j.Logger;
12  
13  import java.io.BufferedReader;
14  import java.io.IOException;
15  import java.io.InputStreamReader;
16  import java.util.ArrayList;
17  import java.util.Enumeration;
18  
19  /**
20   * @author Andreas Hoheisel
21   *         (<a href="http://www.andreas-hoheisel.de">www.andreas-hoheisel.de</a>)
22   * @version $Id: GangliaCustomMetricUpdater.java 1490 2011-02-18 13:20:36Z hoheisel $
23   */
24  public class GangliaCustomMetricUpdater extends Thread {
25  
26      // 5 Minutes update interval
27      private final static int UPDATE_INTERVAL = 1000 * 60 * 5;
28      // 1 Day maximum update interval 
29      private final static int UPDATE_INTERVAL_MAX = 1000 * 60 * 60 * 24;
30  
31      private GWESEngine engine;
32      static Logger logger = Logger.getLogger(GangliaCustomMetricUpdater.class);
33      static int threadcount = 0;
34      private String gmetric;
35      private String multicastChannel;
36      private int multicastPort;
37      private String multicastInterface;
38      private int activeWorkflows;
39      private int runningWorkflows;
40      private int activeActivities;
41      private int runningActivities;
42      private int completedActivities;
43      private int terminatedActivities;
44      private float completedActivitiesPerMinute;
45      private float terminatedActivitiesPerMinute;
46      long lastTime;
47  
48      /**
49       * Constructor
50       *
51       * @param engine
52       */
53      public GangliaCustomMetricUpdater(GWESEngine engine) {
54          super("GangliaCustomMetricUpdater#" + (++threadcount));
55          gmetric = System.getProperty(Constants.PROP_SYSTEM_GWES_GANGLIA_GMETRIC, "/usr/bin/gmetric");
56          multicastChannel = System.getProperty(Constants.PROP_SYSTEM_GWES_GANGLIA_MULTICAST_CHANNEL);
57          String portStr = System.getProperty(Constants.PROP_SYSTEM_GWES_GANGLIA_MULTICAST_PORT);
58          if (portStr != null) {
59              try {
60                  multicastPort = Integer.parseInt(portStr);
61              } catch (NumberFormatException e) {
62                  logger.error("The property "+ Constants.PROP_SYSTEM_GWES_GANGLIA_MULTICAST_PORT + " has wrong format: " + e);
63              }
64          }
65          multicastInterface = System.getProperty(Constants.PROP_SYSTEM_GWES_GANGLIA_MULTICAST_INTERFACE);
66          this.engine = engine;
67          start();
68      }
69  
70      /**
71       * Start the thread.
72       */
73      public final void run() {
74          logger.info(this + " started ...");
75          int interval = UPDATE_INTERVAL;
76  
77          while (!isInterrupted()) {
78              try {
79                  GenericWorkflowHandlerTable table = engine.getHandlerTable();
80                  updateActionCount(table);
81                  long freeMemory = Runtime.getRuntime().freeMemory();
82                  long maxMemory = Runtime.getRuntime().maxMemory();
83                  long totalMemory = Runtime.getRuntime().totalMemory();
84                  float usedMemoryPercent = 100.0f * (float) (totalMemory - freeMemory ) / (float) maxMemory;
85  
86                  if ( usedMemoryPercent > 80.0 ) {
87                      logger.info("Memory max/total/free: " + maxMemory + "/" + totalMemory + "/" + freeMemory);
88                      logger.info("Used memory is more than 80% of max memory. Invoking garbage collection...");
89                      System.gc();
90                  }
91  
92                  send("tomcat_used_memory", "" + usedMemoryPercent, "float", "%");
93                  send("tomcat_vm_threads", "" + countTotalThreads(), "int8", "threads");
94                  send("gwes_workflows_in_memory", "" + table.size(), "int16", "workflows");
95                  send("gwes_running_or_active_workflows", "" + (runningWorkflows + activeWorkflows), "int16", "workflows");
96                  send("gwes_running_or_active_activities", "" + (runningActivities + activeActivities), "int16", "activities");
97                  send("gwes_completed_activities_per_minute", "" + completedActivitiesPerMinute, "float", "activities/min");
98                  send("gwes_terminated_activities_per_minute", "" + terminatedActivitiesPerMinute, "float", "activities/min");
99  
100                 interval = UPDATE_INTERVAL;
101             } catch (Exception e) {
102                 if (interval < UPDATE_INTERVAL_MAX) interval *= 2;
103                 logger.error("There was an exception, so I set the update interval to " + interval + "ms: " + e, e);
104             }
105 
106             try {
107                 // wait
108                 if (!isInterrupted()) {
109                     Thread.sleep(interval);
110                 }
111             } catch (InterruptedException e) {
112                 logger.error("I have been interrupted: " + e, e);
113                 interrupt();
114             }
115         }
116     }
117 
118     private void updateActionCount(GenericWorkflowHandlerTable handlerTable) {
119         activeWorkflows = 0;
120         runningWorkflows = 0;
121         activeActivities = 0;
122         runningActivities = 0;
123 
124         int lastCompletedActivities = completedActivities;
125         int lastTerminatedActivities = terminatedActivities;
126         completedActivities = 0;
127         terminatedActivities = 0;
128 
129         long now = System.currentTimeMillis();
130         Enumeration handlers = handlerTable.elements();
131         while (handlers.hasMoreElements()) {
132             GenericWorkflowHandler handler = (GenericWorkflowHandler) handlers.nextElement();
133             if (handler.getStatus() == WorkflowStatus.STATUS_ACTIVE) {
134                 activeWorkflows++;
135                 // only active workflows have running or active activities
136                 Enumeration activities = handler.getActivityTable().elements();
137                 while (activities.hasMoreElements()) {
138                     Activity activity = (Activity) activities.nextElement();
139                     try {
140                         Activity.Status status = activity.getStatus();
141                         if (status == Activity.Status.ACTIVE) activeActivities++;
142                         if (status == Activity.Status.RUNNING) runningActivities++;
143                     } catch (Exception e) {
144                         logger.warn("Warning: Exception when updating status of activity " + activity.getID() + ": " + e);
145                     }
146                 }
147             }
148             if (handler.getStatus() == WorkflowStatus.STATUS_RUNNING) {
149                 runningWorkflows++;
150             }
151             completedActivities += handler.getCompletedActivities();
152             terminatedActivities += handler.getTerminatedActivities();
153         }
154 
155         if (lastTime > 0) {
156             completedActivitiesPerMinute = ((float) (completedActivities - lastCompletedActivities)) / ((float) (now - lastTime)) * 60000f;
157             if (completedActivitiesPerMinute < 0f) completedActivitiesPerMinute = 0f;
158             terminatedActivitiesPerMinute = ((float) (terminatedActivities - lastTerminatedActivities)) / ((float) (now - lastTime)) * 60000f;
159             if (terminatedActivitiesPerMinute < 0f) terminatedActivitiesPerMinute = 0f;
160         }
161 
162         lastTime = now;
163     }
164 
165     private static int countTotalThreads() {
166         ThreadGroup top = Thread.currentThread().getThreadGroup();
167         while (top.getParent() != null) {
168             top = top.getParent();
169         }
170         return top.activeCount();
171     }
172 
173     /**
174      * Send value to ganglia
175      *
176      * @param name
177      * @param value
178      * @param type
179      * @param unit
180      * @return
181      * @throws IOException
182      */
183     protected String send(String name, String value, String type, String unit) throws IOException {
184         if (logger.isDebugEnabled()) {
185             logger.debug("Sending "+name+"="+ value+" to Ganglia...");
186         }
187 
188         ArrayList<String> argsList = new ArrayList<String>();
189 
190         if (gmetric != null && gmetric.length() > 0) {
191             argsList.add(gmetric);
192         } else {
193             throw new IOException("Path to executable not valid. Please set property \"gwes.ganglia.gmetric\".");
194         }
195 
196         argsList.add("-n" + name.trim());
197         argsList.add("-v" + value.trim());
198         argsList.add("-t" + type);
199         argsList.add("-u" + unit);
200 
201         if (multicastChannel != null && multicastChannel.length() > 0) {
202             argsList.add("-c" + multicastChannel);
203         }
204 
205         if (multicastPort > 0) {
206             argsList.add("-p" + multicastPort);
207         }
208 
209         if (multicastInterface != null && multicastInterface.length() > 0) {
210             argsList.add("-i" + multicastInterface);
211         }
212 
213         String[] args = new String[argsList.size()];
214         for (int i = 0; i < argsList.size(); i++) {
215             args[i] = argsList.get(i);
216         }
217         Process p = Runtime.getRuntime().exec(args);
218         BufferedReader r = new BufferedReader(new InputStreamReader(p.getInputStream()));
219         StringBuffer ret = new StringBuffer();
220         String line;
221         while ((line = r.readLine()) != null) {
222             ret.append(line);
223             ret.append("\n");
224         }
225 
226         closeProcessStreams(p);
227 
228         if (ret.length() > 1) {
229             logger.warn("Output of " + gmetric + ": " + ret.toString());
230         }
231         return ret.toString();
232     }
233 
234     /**
235      * Close process streams to avoid "too many open files" issue.
236      * @param p
237      * @throws IOException
238      */
239     public static void closeProcessStreams(Process p) throws IOException {
240         p.getErrorStream().close();
241         p.getInputStream().close();
242         p.getOutputStream().close();
243         p.destroy();
244    }
245 
246 
247 }