1
2
3
4
5
6
7
8 package net.kwfgrid.gwes.prorater;
9
10 import net.kwfgrid.gworkflowdl.structure.Transition;
11 import net.kwfgrid.gworkflowdl.structure.OperationClass;
12 import net.kwfgrid.gworkflowdl.structure.OperationCandidate;
13 import net.kwfgrid.gworkflowdl.structure.Operation;
14 import net.kwfgrid.gwes.exception.OperationMapperException;
15 import net.kwfgrid.gwes.GenericWorkflowHandler;
16 import net.kwfgrid.gwes.Constants;
17 import net.kwfgrid.gwes.operationmapper.OperationMapper;
18
19 import java.util.*;
20 import java.io.IOException;
21
22 import org.apache.log4j.Logger;
23
24
25
26
27
28
29
30 public class TwoTierProrater extends OperationMapper implements Runnable {
31
32
33
34
35 private static HashMap<String, Node> nodes;
36
37
38
39
40 private static HashMap<String, Long> lastAssignments;
41
42
43
44
45 final static Logger logger = Logger.getLogger(TwoTierProrater.class);
46
47
48
49
50 private static float MIN_QUALITY;
51
52
53
54
55 private static int WAIT_AFTER_LAST_ASSIGNMENT;
56
57
58
59
60 private static long POLL_INTERVAL;
61
62
63
64
65 private static Vector<Transition> transitions;
66
67
68
69
70 private static Vector<Transition> transitionsDone;
71
72
73
74
75 private static HashMap<Transition, String> parents;
76
77
78
79
80 private static TwoTierProrater prorater;
81 private static Thread thread;
82
83 private static final Object lock = new Object();
84
85 static boolean abort;
86
87
88
89
90 public TwoTierProrater() {
91 if (transitions == null) {
92 transitions = new Vector<Transition>();
93 logger.debug("Initializing TwoTier Prorater...");
94
95 MIN_QUALITY = Float.parseFloat(System.getProperty(Constants.PROP_SYSTEM_RESOURCE_PRORATER_MIN_QUALITY, "0.3"));
96 WAIT_AFTER_LAST_ASSIGNMENT = Integer.parseInt(System.getProperty(Constants.PROP_SYSTEM_RESOURCE_PRORATER_WAIT, "12000"));
97 POLL_INTERVAL = Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_RESOURCE_PRORATER_INTERVAL,"1000"));
98
99 transitionsDone = new Vector<Transition>();
100 parents = new HashMap<Transition, String>();
101 prorater = new TwoTierProrater();
102 abort = false;
103
104 thread = new Thread(prorater);
105 thread.start();
106 }
107 }
108
109 public boolean processTransition(GenericWorkflowHandler handler, Transition transition) throws OperationMapperException {
110
111 if (!transitions.contains(transition) && !transitionsDone.contains(transition)) {
112 synchronized (lock) {
113 logger.debug("Adding transition to prorater queue: " + transition.getID() + "\tfrom workflow " + handler.getID());
114 transitions.add(transition);
115 parents.put(transition, handler.getID());
116 }
117 }
118 return false;
119 }
120
121 public boolean processTransitions(GenericWorkflowHandler handler, ArrayList<Transition> transitions) throws OperationMapperException {
122 for (Transition t : transitions) {
123 processTransition(handler,t);
124 }
125 return false;
126 }
127
128
129
130
131 public void run() {
132 while(!abort) {
133 synchronized (lock) {
134 transitionsDone.clear();
135 if (transitions.size() > 0) {
136 try {
137 if (logger.isDebugEnabled()) {
138 logger.debug("Scheduling the following transitions:");
139 for (Transition t:transitions) logger.debug(t.getID());
140 }
141 initialize();
142 decide();
143 if (logger.isDebugEnabled()) {
144 if (transitions.size() > 0) {
145 logger.debug("Unscheduled transitions:");
146 for (Transition t:transitions) logger.debug(t.getID());
147 }
148 }
149 } catch (Exception e) {
150 logger.error("Exception during processTransitions(): " + e, e);
151 }
152 }
153 }
154
155 try {
156 Thread.sleep(POLL_INTERVAL);
157 } catch (InterruptedException e) {
158 logger.error("exception:\n" + e, e);
159 thread.interrupt();
160 }
161 }
162 }
163
164 private void initialize() throws IOException {
165 logger.debug("Initialize:");
166 long now = System.currentTimeMillis();
167 if (nodes == null) nodes = new HashMap<String, Node>();
168 if (lastAssignments == null) lastAssignments = new HashMap<String, Long>();
169 nodes.clear();
170 for (Transition transition : transitions) {
171 synchronized (transition.getOperation()) {
172
173 Operation operation = transition.getOperation();
174 if (operation == null) {
175 logger.warn("No <operation> available for transition "+transition.getID());
176 continue;
177 }
178 Object oo = operation.get();
179
180 if (oo instanceof OperationClass) {
181
182 OperationCandidate[] operationCandidates = ((OperationClass) oo).getOperationCandidates();
183 if (operationCandidates == null) {
184 logger.warn("No <operationCandidate> available for transition "+transition.getID());
185 continue;
186 }
187
188 int priority = 0;
189
190 String propStr = transition.getProperties().get(Constants.PROP_TRANSITION_PRIORITY);
191 if (propStr != null) {
192 try {
193 priority = Integer.parseInt(propStr);
194 } catch (NumberFormatException e) {
195
196 }
197 }
198
199 for (OperationCandidate operationCandidate : operationCandidates) {
200 String resourceName = operationCandidate.getResourceName();
201 if (resourceName == null) {
202 logger.warn("No <resourceName> available for operationCandidate of transition "+transition.getID());
203 continue;
204 }
205
206 Long last = lastAssignments.get(resourceName);
207 if (last != null && now - last < WAIT_AFTER_LAST_ASSIGNMENT) {
208
209 continue;
210 }
211
212 Node node = nodes.get(resourceName);
213 if (node == null) {
214 node = new Node(resourceName);
215 node.updateQuality(operationCandidate.getQuality());
216 nodes.put(resourceName, node);
217 }
218
219
220 if (node.quality > MIN_QUALITY) {
221 node.add(new Alternative(transition, operationCandidate, priority));
222 }
223
224 else {
225 operationCandidate.setQuality(node.quality);
226 }
227
228 }
229 }
230 }
231 }
232
233 if (logger.isDebugEnabled()) {
234 logger.debug("--------- last resource assignments ---------");
235 for (String hw : lastAssignments.keySet()) {
236 logger.debug(hw + ":\t" + (now - lastAssignments.get(hw)) + "ms ago");
237 }
238 }
239 }
240
241 private void decide() {
242 if (nodes.size() == 0) return;
243 logger.debug("Decide:");
244 ArrayList<Node> nc = generateClusteredRandomDistribution(nodes);
245 for (Node node : nc) {
246 int nr = node.alternatives.size();
247 if (nr == 0) continue;
248
249 for (int i = 0; i < nr; i++) {
250 Alternative a = node.alternatives.get(i);
251 if (!transitionsDone.contains(a.transition)) {
252
253 for (int j = i + 1; j < nr; j++) {
254 Alternative b = node.alternatives.get(j);
255 if (!transitionsDone.contains(b.transition)) {
256 if (b.priority > a.priority && parents.get(a.transition).equals(parents.get(b.transition))) {
257 logger.debug("Priority: " + b.transition.getID() + " > " + a.transition.getID());
258 a = b;
259 }
260 }
261 }
262 transitionsDone.add(a.transition);
263 transitions.remove(a.transition);
264 parents.remove(a.transition);
265 lastAssignments.put(node.hostname, System.currentTimeMillis());
266 a.operationCandidate.setSelected(true);
267 logger.debug("Decision: " + a.transition.getID() + ", priority " + a.priority + ": " + a.operationCandidate.getResourceName());
268 break;
269 }
270 }
271 }
272 }
273
274 private ArrayList<Node> generateClusteredRandomDistribution(HashMap<String, Node> nodes) {
275
276
277 int nClusters = (int) Math.ceil((double) nodes.size() / 3d);
278 if (nClusters < 1) nClusters = 1;
279 else if (nClusters > 5) nClusters = 5;
280 int nMembers = (int) Math.ceil((double) nodes.size() / (double )nClusters);
281 if (logger.isDebugEnabled()) {
282 logger.debug(" generateClusteredRandomDistribution");
283 logger.debug(" nodes.size="+nodes.size()+" nClusters="+nClusters+" nMembers="+nMembers);
284 }
285
286 ArrayList<Node> nc = new ArrayList<Node>();
287
288
289 if (nClusters == 1) {
290 nc.addAll(nodes.values());
291 Collections.shuffle(nc);
292 }
293
294 else {
295
296 ArrayList<Node> sortedNodes = new ArrayList<Node>();
297 sortedNodes.addAll(nodes.values());
298 Collections.sort(sortedNodes);
299 Collections.reverse(sortedNodes);
300
301
302 ArrayList<Node> clusterNodes = new ArrayList<Node>();
303 Iterator<Node> nodeIter = sortedNodes.iterator();
304 for (int iCluster = 0; iCluster < nClusters; iCluster++) {
305 clusterNodes.clear();
306
307 for (int iMember = 0; iMember < nMembers; iMember++) {
308 if (nodeIter.hasNext()) clusterNodes.add(nodeIter.next());
309 }
310
311 Collections.shuffle(clusterNodes);
312
313 nc.addAll(clusterNodes);
314 }
315
316 while (nodeIter.hasNext()) {
317 nc.add(nodeIter.next());
318 }
319 }
320 if (logger.isDebugEnabled()) {
321 for (Node node : nc) {
322 logger.debug(" node.quality="+node.quality+" host="+node.hostname);
323 }
324 }
325 return nc;
326 }
327
328 }