1
2
3
4
5
6
7
8 package net.kwfgrid.gwes.prorater;
9
10 import net.kwfgrid.gworkflowdl.structure.Transition;
11 import net.kwfgrid.gworkflowdl.structure.OperationClass;
12 import net.kwfgrid.gworkflowdl.structure.OperationCandidate;
13 import net.kwfgrid.gworkflowdl.structure.Operation;
14 import net.kwfgrid.gwes.exception.DatabaseException;
15 import net.kwfgrid.gwes.exception.OperationMapperException;
16 import net.kwfgrid.gwes.GenericWorkflowHandler;
17 import net.kwfgrid.gwes.Constants;
18 import net.kwfgrid.gwes.operationmapper.OperationMapper;
19
20 import java.util.*;
21 import java.io.IOException;
22
23 import org.apache.log4j.Logger;
24
25
26
27
28
29
30
31
32 public class RecurrentProrater extends OperationMapper implements Runnable {
33
34
35
36
37 static HashMap<String, Node> nodes;
38
39
40
41
42 static HashMap<String, Long> lastAssignments;
43
44
45
46
47 final static Logger logger = Logger.getLogger(SimpleProrater.class);
48
49
50
51
52 static float MIN_QUALITY;
53
54
55
56
57 static int WAIT_AFTER_LAST_ASSIGNMENT;
58
59
60
61
62 static long POLL_INTERVAL;
63
64
65
66
67 static RecurrentProrater p;
68 static Thread t;
69
70 final Object lock = new Object();
71
72
73
74
75 private static Vector<Transition> transitions;
76
77 static boolean abort;
78
79
80
81
82 public RecurrentProrater() {
83 if (transitions == null) {
84 transitions = new Vector<Transition>();
85 logger.debug("Initializing Recurrent Prorater...");
86
87 MIN_QUALITY = Float.parseFloat(System.getProperty(Constants.PROP_SYSTEM_RESOURCE_PRORATER_MIN_QUALITY, "0.1"));
88 WAIT_AFTER_LAST_ASSIGNMENT = Integer.parseInt(System.getProperty(Constants.PROP_SYSTEM_RESOURCE_PRORATER_WAIT, "12000"));
89 POLL_INTERVAL = Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_RESOURCE_PRORATER_INTERVAL,"1000"));
90
91 p = new RecurrentProrater();
92 abort = false;
93
94 t = new Thread(p);
95 t.start();
96 }
97 }
98
99 public boolean processTransition(GenericWorkflowHandler handler, Transition transition) throws OperationMapperException {
100 if (!transitions.contains(transition) && transition.getAbstractionLevel() == Operation.BLUE) {
101 synchronized(p.lock) {
102 transitions.add(transition);
103 }
104 }
105 return false;
106 }
107
108 public boolean processTransitions(GenericWorkflowHandler handler, ArrayList<Transition> transitions) throws OperationMapperException {
109 for (Transition t : transitions) {
110 processTransition(handler,t);
111 }
112 return false;
113 }
114
115
116
117
118 public void run() {
119 while(!abort) {
120 synchronized (p.lock) {
121 if (transitions.size()>0) {
122 try {
123 if (logger.isDebugEnabled()) {
124 logger.debug("Scheduling the following transitions:");
125 for (Transition t:transitions) logger.debug(t.getID());
126 }
127 initialize(transitions);
128 decide();
129 transitions.clear();
130 } catch (Exception e) {
131 logger.error("Exception during processTransitions(): " + e, e);
132 }
133 }
134 }
135
136 try {
137 Thread.sleep(POLL_INTERVAL);
138 } catch (InterruptedException e) {
139 logger.error("exception:\n" + e, e);
140 t.interrupt();
141 }
142 }
143 }
144
145 static void initialize(Vector<Transition> transitions) throws IOException {
146 logger.debug("Init:");
147 long now = System.currentTimeMillis();
148 if (nodes == null) nodes = new HashMap<String, Node>();
149 if (lastAssignments == null) lastAssignments = new HashMap<String, Long>();
150 nodes.clear();
151 int priority = 0;
152 for (Transition transition : transitions) {
153 synchronized (transition.getOperation()) {
154 Operation operation = transition.getOperation();
155 if (operation == null) {
156 logger.warn("No <operation> available for transition "+transition.getID());
157 continue;
158 }
159 Object oo = transition.getOperation().get();
160 if (oo instanceof OperationClass) {
161
162 OperationCandidate[] operationCandidates = ((OperationClass) oo).getOperationCandidates();
163 if (operationCandidates == null) {
164 logger.warn("No <operationCandidate> available for transition "+transition.getID());
165 continue;
166 }
167
168 for (OperationCandidate operationCandidate : operationCandidates) {
169 String resourceName = operationCandidate.getResourceName();
170 if (resourceName == null) {
171 logger.warn("No <resourceName> available for operationCandidate of transition "+transition.getID());
172 continue;
173 }
174
175 Long last = lastAssignments.get(resourceName);
176 if (last != null && now - last < WAIT_AFTER_LAST_ASSIGNMENT) continue;
177
178 String propStr = transition.getProperties().get(Constants.PROP_TRANSITION_PRIORITY);
179 if (propStr != null) {
180 try {
181 priority = Integer.parseInt(propStr);
182 } catch (NumberFormatException e) {
183
184 }
185 }
186 Alternative alternative = new Alternative(transition, operationCandidate, priority);
187 Node node = nodes.get(resourceName);
188
189 if (node == null) {
190 node = new Node(resourceName);
191 node.updateQuality(operationCandidate.getQuality());
192 nodes.put(resourceName, node);
193 }
194
195
196 if (node.quality > MIN_QUALITY) {
197 node.add(alternative);
198 }
199
200 else {
201 alternative.operationCandidate.setQuality(node.quality);
202 }
203
204 }
205 ++priority;
206 }
207 }
208 }
209
210 if (logger.isDebugEnabled()) {
211 logger.debug("--------- last resource assignments ---------");
212 for (String hw : lastAssignments.keySet()) {
213 logger.debug(hw + ": " + lastAssignments.get(hw));
214 }
215 }
216 }
217
218
219
220
221 static boolean decide() {
222 boolean ret = false;
223 if (nodes.size() == 0) return ret;
224 logger.debug("Decide:");
225 ArrayList<Transition> tdone = new ArrayList<Transition>();
226 List<Node> nc = generateClusteredRandomDistribution(nodes);
227 for (Node node : nc) {
228 int nr = node.alternatives.size();
229 if (nr == 0) continue;
230 if (logger.isDebugEnabled()) {
231 logger.debug("Node: " + node.hostname);
232 }
233 Collections.sort(node.alternatives);
234 Collections.reverse(node.alternatives);
235 for (int i = 0; i < nr; ++i) {
236 Alternative a = node.alternatives.get(i);
237 if (!tdone.contains(a.transition)) {
238
239 int tries, j = i;
240 while (j + 1 < nr && a.priority == node.alternatives.get(j + 1).priority) ++j;
241
242 if (j > i) {
243
244 tries = j - i;
245 while (tries-- > 0) {
246 Alternative b = node.alternatives.get(i + (int) ((j - i) * Math.random() + 0.5));
247 if (!tdone.contains(b.transition)) {
248 a = b;
249 break;
250 }
251 }
252 }
253 a.operationCandidate.setSelected(true);
254 ret = true;
255 tdone.add(a.transition);
256 lastAssignments.put(node.hostname, System.currentTimeMillis());
257 logger.debug(" " + a.priority + ": " + a.operationCandidate.getResourceName() + ": " + a.transition.getID());
258 break;
259 }
260 }
261 }
262 return ret;
263 }
264
265 public static List<Node> generateClusteredRandomDistribution(Map<String, Node> nodes) {
266
267
268 int nClusters = nodes.size() / 3;
269 if (nClusters < 1) nClusters = 1;
270 else if (nClusters > 5) nClusters = 5;
271 int nMembers = nodes.size() / nClusters;
272 if (logger.isDebugEnabled()) {
273 logger.debug("nodes.size="+nodes.size()+" nClusters="+nClusters+" nMembers="+nMembers);
274 }
275
276 List<Node> nc = new ArrayList<Node>();
277
278
279 if (nClusters == 1) {
280 nc.addAll(nodes.values());
281 Collections.shuffle(nc, new Random(System.currentTimeMillis()));
282 }
283
284 else {
285
286 List<Node> sortedNodes = new ArrayList<Node>();
287 sortedNodes.addAll(nodes.values());
288 Collections.sort(sortedNodes);
289 Collections.reverse(sortedNodes);
290
291
292 List<Node> clusterNodes = new ArrayList<Node>();
293 Iterator nodeIter = sortedNodes.iterator();
294 for (int iCluster = 0; iCluster < nClusters; iCluster++) {
295 clusterNodes.clear();
296
297 for (int iMember = 0; iMember < nMembers; iMember++) {
298 if (nodeIter.hasNext()) clusterNodes.add((Node)nodeIter.next());
299 }
300
301 Collections.shuffle(clusterNodes,new Random(System.currentTimeMillis()));
302
303 nc.addAll(clusterNodes);
304 }
305
306 while (nodeIter.hasNext()) {
307 nc.add((Node)nodeIter.next());
308 }
309 }
310 if (logger.isDebugEnabled()) {
311 for (Node node : nc) {
312 logger.debug("node quality="+node.quality+" host="+node.hostname);
313 }
314 }
315 return nc;
316 }
317
318 }