View Javadoc

1   /*
2    * Copyright 2010 Fraunhofer Gesellschaft, Munich, Germany,
3    * for its Fraunhofer Institute for Computer Architecture and Software
4    * Technology (FIRST), Berlin, Germany. All rights reserved.
5    * http://www.first.fraunhofer.de/
6    */
7   
8   package net.kwfgrid.gwes;
9   
10  import net.kwfgrid.gwes.exception.GWESException;
11  import org.apache.log4j.Logger;
12  
13  import java.util.ArrayList;
14  
15  /**
16   * Cleans old completed or terminated workflows from memory, database and file systems.
17   *
18   * @author Andreas Hoheisel
19   *         (<a href="http://www.andreas-hoheisel.de">www.andreas-hoheisel.de</a>)
20   * @version $Id: WorkflowCleanUpManager.java 1537 2011-07-27 15:34:04Z hoheisel $
21   */
22  final class WorkflowCleanUpManager extends Thread {
23  
24      /**
25       * Default Timeout in milliseconds for the workflow to be removed from the memory.
26       */
27      private static long DEFAULT_EXPIRYTIME_MEMORY;
28  
29      /**
30       * Default Timeout in milliseconds for the workflow to be removed from the database.
31       */
32      private static long DEFAULT_EXPIRYTIME_DATABASE;
33  
34      static final Logger logger = Logger.getLogger(WorkflowCleanUpManager.class);
35  
36      private GWESEngine engine;
37  
38      static int threadcount = 0;
39  
40      private final String userID;
41  
42      /**
43       * Constructor.
44       */
45      public WorkflowCleanUpManager(GWESEngine engine) {
46          super("WorkflowCleanUpManager#" + (++threadcount));
47          DEFAULT_EXPIRYTIME_MEMORY = Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_GWES_WORKFLOW_EXPIRYTIME_MEMORY, "3600000"));
48          DEFAULT_EXPIRYTIME_DATABASE = Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_GWES_WORKFLOW_EXPIRYTIME_DATABASE, "604800000"));
49          this.engine = engine;
50          this.userID = System.getProperty(Constants.PROP_SYSTEM_GWES_SERVICE_BASE_URL_EXTERNAL)+"?"+getName();
51          logger.debug("Acting with userID='"+userID+"'");
52          start();
53      }
54  
55      /**
56       * Start the thread.
57       */
58      public final void run() {
59          logger.info(this + " started ...");
60          float usedMemoryPercent = 0.0f;
61          ArrayList<String> expiredFromMemoryWorkflows = new ArrayList<String>();
62          ArrayList<String> expiredFromDatabaseWorkflows = new ArrayList<String>();
63          while (!isInterrupted()) {
64              try {
65                  GenericWorkflowHandlerTable table = engine.getHandlerTable();
66                  long now = System.currentTimeMillis();
67                  long freeMemory = Runtime.getRuntime().freeMemory();
68                  long maxMemory = Runtime.getRuntime().maxMemory();
69                  long totalMemory = Runtime.getRuntime().totalMemory();
70                  usedMemoryPercent = 100.0f * (float) (totalMemory - freeMemory ) / (float) maxMemory;
71  
72                  logger.info("Memory max/total/free: " + maxMemory + "/" + totalMemory + "/" + freeMemory);
73                  logger.info("Memory used: " + usedMemoryPercent + "%");
74                  logger.info("Number of VM threads: "+countTotalThreads());
75                  logger.info("Number of workflows in memory: " + table.size());
76  
77                  // search for old workflows in memory
78                  for (GenericWorkflowHandler workflow : table.values()) {
79                      if (workflow.getStatus() == WorkflowStatus.STATUS_COMPLETED || workflow.getStatus() == WorkflowStatus.STATUS_TERMINATED) {
80                          if (now - workflow.getEndTime() > DEFAULT_EXPIRYTIME_MEMORY) {
81                              expiredFromMemoryWorkflows.add(workflow.getID());
82                          }
83                      }
84                  }
85  
86                  // remove old workflows from memory and table
87                  for (String workflowID : expiredFromMemoryWorkflows) {
88                      table.remove(workflowID);
89                      logger.info("Workflow '" + workflowID + "' removed from memory.");
90                  }
91                  expiredFromMemoryWorkflows.clear();
92  
93                  // search for old workflows in memory and database
94                  // example statusArray format:
95                  //  "ID=hoheisel_f2968050-1d6a-11db-bacc-ad353bc1f9b1"
96                  //  "status=COMPLETED"
97                  //  "birthdayMs=1154003111126"
98                  //  "durationUndefinedMs=527"
99                  //  "durationInitiatedMs=792"
100                 //  "durationRunningMs=0"
101                 //  "durationActiveMs=0"
102                 //  "durationSuspendedMs=0"
103                 //  "durationTotalMs=1351"
104                 //  "endTimeMs=1154003112477"
105                 //  "level=MEMORY"
106                 //  "description=test workflow"
107                 String[][] statusArray = engine.getWorkflowStatusArray(3, userID);
108                 for (String[] workflowStatus : statusArray) {
109                     if (workflowStatus[1].equals("status=COMPLETED") || workflowStatus[1].equals("status=TERMINATED")) {
110                         long endTime = Long.parseLong(workflowStatus[9].substring(workflowStatus[9].indexOf("=")+1));
111                         if (now - endTime > DEFAULT_EXPIRYTIME_DATABASE) {
112                             expiredFromDatabaseWorkflows.add(workflowStatus[0].substring(workflowStatus[0].indexOf("ID=")+3));
113                         }
114                     }
115                 }
116 
117                 // remove old workflows from database and filesystems
118                 for (String workflowID : expiredFromDatabaseWorkflows) {
119                     try {
120                         engine.remove(workflowID, 7, "WorkflowCleanUpManager");
121                         logger.info("Workflow '" + workflowID + "' removed from database and file systems.");
122                     } catch (Exception e) {
123                         logger.warn("There was an exception removing workflow '"+workflowID+"', but I will continue:\n"+e);
124                     }
125                 }
126                 expiredFromDatabaseWorkflows.clear();
127             } catch (GWESException e) {
128                 logger.warn("There was an GWES exception, but I will continue: \n"+e,e);
129             } catch (Exception e) {
130                 logger.warn("There was an exception, but I will continue: \n"+e,e);
131             }
132 
133             // run garbage collection
134             if ( usedMemoryPercent > 50.0 ) {
135                 logger.info("Used memory is more than 50% of max memory. Invoking garbage collection...");
136                 System.gc();
137             }
138 
139             // wait
140             try {
141                 if (!isInterrupted()) {
142                     logger.info("Next check in "+ DEFAULT_EXPIRYTIME_MEMORY /2+"ms");
143                     Thread.sleep(DEFAULT_EXPIRYTIME_MEMORY /2);
144                 }
145             } catch (InterruptedException e) {
146                 logger.error("I have been interrupted: " + e, e);
147                 interrupt();
148             }
149 
150         }
151     }
152 
153     public static int countTotalThreads() {
154         ThreadGroup top = Thread.currentThread().getThreadGroup();
155         while ( top.getParent() != null ) {
156             top = top.getParent();
157         }
158         return top.activeCount();
159     }
160 
161 }