1
2
3
4
5
6
7
8 package net.kwfgrid.gwes;
9
10 import net.kwfgrid.gwes.exception.GWESException;
11 import org.apache.log4j.Logger;
12
13 import java.util.ArrayList;
14
15
16
17
18
19
20
21
22 final class WorkflowCleanUpManager extends Thread {
23
24
25
26
27 private static long DEFAULT_EXPIRYTIME_MEMORY;
28
29
30
31
32 private static long DEFAULT_EXPIRYTIME_DATABASE;
33
34 static final Logger logger = Logger.getLogger(WorkflowCleanUpManager.class);
35
36 private GWESEngine engine;
37
38 static int threadcount = 0;
39
40 private final String userID;
41
42
43
44
45 public WorkflowCleanUpManager(GWESEngine engine) {
46 super("WorkflowCleanUpManager#" + (++threadcount));
47 DEFAULT_EXPIRYTIME_MEMORY = Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_GWES_WORKFLOW_EXPIRYTIME_MEMORY, "3600000"));
48 DEFAULT_EXPIRYTIME_DATABASE = Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_GWES_WORKFLOW_EXPIRYTIME_DATABASE, "604800000"));
49 this.engine = engine;
50 this.userID = System.getProperty(Constants.PROP_SYSTEM_GWES_SERVICE_BASE_URL_EXTERNAL)+"?"+getName();
51 logger.debug("Acting with userID='"+userID+"'");
52 start();
53 }
54
55
56
57
58 public final void run() {
59 logger.info(this + " started ...");
60 float usedMemoryPercent = 0.0f;
61 ArrayList<String> expiredFromMemoryWorkflows = new ArrayList<String>();
62 ArrayList<String> expiredFromDatabaseWorkflows = new ArrayList<String>();
63 while (!isInterrupted()) {
64 try {
65 GenericWorkflowHandlerTable table = engine.getHandlerTable();
66 long now = System.currentTimeMillis();
67 long freeMemory = Runtime.getRuntime().freeMemory();
68 long maxMemory = Runtime.getRuntime().maxMemory();
69 long totalMemory = Runtime.getRuntime().totalMemory();
70 usedMemoryPercent = 100.0f * (float) (totalMemory - freeMemory ) / (float) maxMemory;
71
72 logger.info("Memory max/total/free: " + maxMemory + "/" + totalMemory + "/" + freeMemory);
73 logger.info("Memory used: " + usedMemoryPercent + "%");
74 logger.info("Number of VM threads: "+countTotalThreads());
75 logger.info("Number of workflows in memory: " + table.size());
76
77
78 for (GenericWorkflowHandler workflow : table.values()) {
79 if (workflow.getStatus() == WorkflowStatus.STATUS_COMPLETED || workflow.getStatus() == WorkflowStatus.STATUS_TERMINATED) {
80 if (now - workflow.getEndTime() > DEFAULT_EXPIRYTIME_MEMORY) {
81 expiredFromMemoryWorkflows.add(workflow.getID());
82 }
83 }
84 }
85
86
87 for (String workflowID : expiredFromMemoryWorkflows) {
88 table.remove(workflowID);
89 logger.info("Workflow '" + workflowID + "' removed from memory.");
90 }
91 expiredFromMemoryWorkflows.clear();
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107 String[][] statusArray = engine.getWorkflowStatusArray(3, userID);
108 for (String[] workflowStatus : statusArray) {
109 if (workflowStatus[1].equals("status=COMPLETED") || workflowStatus[1].equals("status=TERMINATED")) {
110 long endTime = Long.parseLong(workflowStatus[9].substring(workflowStatus[9].indexOf("=")+1));
111 if (now - endTime > DEFAULT_EXPIRYTIME_DATABASE) {
112 expiredFromDatabaseWorkflows.add(workflowStatus[0].substring(workflowStatus[0].indexOf("ID=")+3));
113 }
114 }
115 }
116
117
118 for (String workflowID : expiredFromDatabaseWorkflows) {
119 try {
120 engine.remove(workflowID, 7, "WorkflowCleanUpManager");
121 logger.info("Workflow '" + workflowID + "' removed from database and file systems.");
122 } catch (Exception e) {
123 logger.warn("There was an exception removing workflow '"+workflowID+"', but I will continue:\n"+e);
124 }
125 }
126 expiredFromDatabaseWorkflows.clear();
127 } catch (GWESException e) {
128 logger.warn("There was an GWES exception, but I will continue: \n"+e,e);
129 } catch (Exception e) {
130 logger.warn("There was an exception, but I will continue: \n"+e,e);
131 }
132
133
134 if ( usedMemoryPercent > 50.0 ) {
135 logger.info("Used memory is more than 50% of max memory. Invoking garbage collection...");
136 System.gc();
137 }
138
139
140 try {
141 if (!isInterrupted()) {
142 logger.info("Next check in "+ DEFAULT_EXPIRYTIME_MEMORY /2+"ms");
143 Thread.sleep(DEFAULT_EXPIRYTIME_MEMORY /2);
144 }
145 } catch (InterruptedException e) {
146 logger.error("I have been interrupted: " + e, e);
147 interrupt();
148 }
149
150 }
151 }
152
153 public static int countTotalThreads() {
154 ThreadGroup top = Thread.currentThread().getThreadGroup();
155 while ( top.getParent() != null ) {
156 top = top.getParent();
157 }
158 return top.activeCount();
159 }
160
161 }