View Javadoc

1   /*
2    * Copyright 2010 Fraunhofer Gesellschaft, Munich, Germany,
3    * for its Fraunhofer Institute for Computer Architecture and Software
4    * Technology (FIRST), Berlin, Germany. All rights reserved.
5    * http://www.first.fraunhofer.de/
6    */
7   
8   package net.kwfgrid.gwes.prorater;
9   
10  import net.kwfgrid.gworkflowdl.structure.Transition;
11  import net.kwfgrid.gworkflowdl.structure.OperationClass;
12  import net.kwfgrid.gworkflowdl.structure.OperationCandidate;
13  import net.kwfgrid.gworkflowdl.structure.Operation;
14  import net.kwfgrid.gwes.exception.OperationMapperException;
15  import net.kwfgrid.gwes.GenericWorkflowHandler;
16  import net.kwfgrid.gwes.Constants;
17  import net.kwfgrid.gwes.operationmapper.OperationMapper;
18  
19  import java.util.*;
20  import java.io.IOException;
21  
22  import org.apache.log4j.Logger;
23  
24  /**
25   * @author Andreas Hoheisel
26   *         (<a href="http://www.andreas-hoheisel.de">www.andreas-hoheisel.de</a>)
27   * @author Dietmar Sommerfeld, GWDG
28   * @version $Id: RecurrentProrater.java 1419 2010-11-01 14:12:17Z hoheisel $
29   */
30  public class TwoTierProrater extends OperationMapper implements Runnable {
31  
32      /**
33       * key is hardware identifier, value is Node object.
34       */
35      private static HashMap<String, Node> nodes;
36  
37      /**
38       * key is hardware identifier, value is time of last assignment of this hardware
39       */
40      private static HashMap<String, Long> lastAssignments;
41  
42      /**
43       * log4j logger.
44       */
45      final static Logger logger = Logger.getLogger(TwoTierProrater.class);
46  
47      /**
48       * Nodes with a quality below MIN_QUALITY are skipped.
49       */
50      private static float MIN_QUALITY;
51  
52      /**
53       * Wait period in Milliseconds after last usage of hardware.
54       */
55      private static int WAIT_AFTER_LAST_ASSIGNMENT;
56  
57      /**
58       * Polling interval in Milliseconds.
59       */
60      private static long POLL_INTERVAL;
61  
62      /**
63       * Holds transitions to be processed.
64       */
65      private static Vector<Transition> transitions;
66  
67      /**
68       * Holds transitions that were recently processed.
69       */
70      private static Vector<Transition> transitionsDone;
71  
72      /**
73       * key is transition, value is parent workflow ID.
74       */
75      private static HashMap<Transition, String> parents;
76  
77      /**
78       * All processing steps are done by one single thread for all workflows.
79       */
80      private static TwoTierProrater prorater;
81      private static Thread thread;
82  
83      private static final Object lock = new Object();
84  
85      static boolean abort; 
86  
87      /**
88       * Constructor.
89       */
90      public TwoTierProrater() {
91          if (transitions == null) {
92              transitions = new Vector<Transition>();
93              logger.debug("Initializing TwoTier Prorater...");
94  
95              MIN_QUALITY = Float.parseFloat(System.getProperty(Constants.PROP_SYSTEM_RESOURCE_PRORATER_MIN_QUALITY, "0.3"));
96              WAIT_AFTER_LAST_ASSIGNMENT = Integer.parseInt(System.getProperty(Constants.PROP_SYSTEM_RESOURCE_PRORATER_WAIT, "12000"));
97              POLL_INTERVAL = Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_RESOURCE_PRORATER_INTERVAL,"1000"));
98  
99              transitionsDone = new Vector<Transition>();
100             parents = new HashMap<Transition, String>();
101             prorater = new TwoTierProrater();
102             abort = false;
103             // start singleton as thread
104             thread = new Thread(prorater);
105             thread.start();
106         }
107     }
108 
109     public boolean processTransition(GenericWorkflowHandler handler, Transition transition) throws OperationMapperException {
110        	// if a transition was scheduled in the previous round do not schedule it again right away
111         if (!transitions.contains(transition) && !transitionsDone.contains(transition)) {
112         	synchronized (lock) {
113             	logger.debug("Adding transition to prorater queue: " + transition.getID() + "\tfrom workflow " + handler.getID());
114             	transitions.add(transition);
115                 parents.put(transition, handler.getID());	// keep a reference to originating workflow for prioritization
116             }
117         }
118         return false;
119     }
120 
121     public boolean processTransitions(GenericWorkflowHandler handler, ArrayList<Transition> transitions) throws OperationMapperException {
122         for (Transition t : transitions) {
123             processTransition(handler,t);
124         }
125         return false;
126     }
127 
128     /**
129      * @see Thread#run()
130      */
131     public void run() {
132         while(!abort) {
133             synchronized (lock) {
134                 transitionsDone.clear(); 
135                 if (transitions.size() > 0) {
136                     try {
137                         if (logger.isDebugEnabled()) {
138                             logger.debug("Scheduling the following transitions:");
139                             for (Transition t:transitions) logger.debug(t.getID());
140                         }
141                         initialize();
142                         decide();
143                         if (logger.isDebugEnabled()) {
144                             if (transitions.size() > 0) {
145                                 logger.debug("Unscheduled transitions:");
146                                 for (Transition t:transitions) logger.debug(t.getID());
147                             }
148                         }
149                     } catch (Exception e) {
150                         logger.error("Exception during processTransitions(): " + e, e);
151                     }
152                 }
153             }
154 
155             try {
156                 Thread.sleep(POLL_INTERVAL);
157             } catch (InterruptedException e) {
158                 logger.error("exception:\n" + e, e);
159                 thread.interrupt();
160             }
161         }
162     }
163 
164     private void initialize() throws IOException {
165         logger.debug("Initialize:");
166         long now = System.currentTimeMillis();
167         if (nodes == null) nodes = new HashMap<String, Node>();
168         if (lastAssignments == null) lastAssignments = new HashMap<String, Long>();
169         nodes.clear();
170         for (Transition transition : transitions) {
171             synchronized (transition.getOperation()) {
172                 //logger.debug("entered synchronized block");
173                 Operation operation = transition.getOperation();
174                 if (operation == null) {
175                     logger.warn("No <operation> available for transition "+transition.getID());
176                     continue;
177                 }
178                 Object oo = operation.get();
179 
180                 if (oo instanceof OperationClass) {
181 
182                     OperationCandidate[] operationCandidates = ((OperationClass) oo).getOperationCandidates();
183                     if (operationCandidates == null) {
184                         logger.warn("No <operationCandidate> available for transition "+transition.getID());
185                         continue;
186                     }
187 
188                     int priority = 0;
189                     // <property name="priority">1</property>
190                     String propStr = transition.getProperties().get(Constants.PROP_TRANSITION_PRIORITY);
191                     if (propStr != null) {
192                         try {
193                             priority = Integer.parseInt(propStr);
194                         } catch (NumberFormatException e) {
195                             // do nothing.
196                         }
197                     }
198 
199                     for (OperationCandidate operationCandidate : operationCandidates) {
200                         String resourceName = operationCandidate.getResourceName();
201                         if (resourceName == null) {
202                             logger.warn("No <resourceName> available for operationCandidate of transition "+transition.getID());
203                             continue;
204                         }
205                         // skip hardware that has been recently assigned (wait for load update)
206                         Long last = lastAssignments.get(resourceName);
207                         if (last != null && now - last < WAIT_AFTER_LAST_ASSIGNMENT) {
208                             //logger.debug("Skipping resource  " + resourceName);
209                         	continue;
210                         }
211 
212                         Node node = nodes.get(resourceName);
213                         if (node == null) {
214                             node = new Node(resourceName);
215                             node.updateQuality(operationCandidate.getQuality());
216                             nodes.put(resourceName, node);
217                         }
218 
219                         // only operationCandidates with a high node quality get into the game
220                         if (node.quality > MIN_QUALITY) {
221                             node.add(new Alternative(transition, operationCandidate, priority));
222                         }
223                         // for the others set the quality
224                         else {
225                             operationCandidate.setQuality(node.quality);
226                         }
227 
228                     }
229                 }
230             }
231         }
232 
233         if (logger.isDebugEnabled()) {
234             logger.debug("--------- last resource assignments ---------");
235             for (String hw : lastAssignments.keySet()) {
236                 logger.debug(hw + ":\t" + (now - lastAssignments.get(hw)) + "ms ago");
237             }
238         }
239     }
240 
241     private void decide() {
242         if (nodes.size() == 0) return;
243         logger.debug("Decide:");
244         ArrayList<Node> nc = generateClusteredRandomDistribution(nodes);
245         for (Node node : nc) {
246             int nr = node.alternatives.size();	// number of alternative operationCandidates involving this host
247             if (nr == 0) continue;
248             //logger.debug("Scheduling node: " + node.hostname);
249             for (int i = 0; i < nr; i++) {
250                 Alternative a = node.alternatives.get(i);
251                 if (!transitionsDone.contains(a.transition)) {
252                 	// look for transitions from same workflow with higher priority
253                 	for (int j = i + 1; j < nr; j++) {
254                         Alternative b = node.alternatives.get(j);
255                         if (!transitionsDone.contains(b.transition)) {
256                         	if (b.priority > a.priority && parents.get(a.transition).equals(parents.get(b.transition))) {
257                                 logger.debug("Priority: " + b.transition.getID() + " > " + a.transition.getID());
258                                 a = b;
259                             }
260                 		}
261                 	}
262                     transitionsDone.add(a.transition);
263                     transitions.remove(a.transition);
264                     parents.remove(a.transition);
265                     lastAssignments.put(node.hostname, System.currentTimeMillis());
266                     a.operationCandidate.setSelected(true);
267                     logger.debug("Decision: " + a.transition.getID() + ", priority " + a.priority + ": " + a.operationCandidate.getResourceName());
268                     break;
269                 }
270             }
271         }
272     }
273 
274     private ArrayList<Node> generateClusteredRandomDistribution(HashMap<String, Node> nodes) {
275 
276         // at least 3 members per quality cluster, maximum 5 quality clusters
277         int nClusters = (int) Math.ceil((double) nodes.size() / 3d);
278         if (nClusters < 1) nClusters = 1;
279         else if (nClusters > 5) nClusters = 5;
280         int nMembers = (int) Math.ceil((double) nodes.size() / (double )nClusters);
281         if (logger.isDebugEnabled()) {
282             logger.debug(" generateClusteredRandomDistribution");
283             logger.debug(" nodes.size="+nodes.size()+" nClusters="+nClusters+" nMembers="+nMembers);
284         }
285 
286         ArrayList<Node> nc = new ArrayList<Node>();
287         
288         // not enough members for clustering
289         if (nClusters == 1) {
290             nc.addAll(nodes.values());
291             Collections.shuffle(nc);
292         }
293         // more than one cluster
294         else {
295             // sort nodes by quality
296             ArrayList<Node> sortedNodes = new ArrayList<Node>();
297             sortedNodes.addAll(nodes.values());
298             Collections.sort(sortedNodes);
299             Collections.reverse(sortedNodes);
300 
301             // loop clusters
302             ArrayList<Node> clusterNodes = new ArrayList<Node>();
303             Iterator<Node> nodeIter = sortedNodes.iterator();
304             for (int iCluster = 0; iCluster < nClusters; iCluster++) {
305                 clusterNodes.clear();
306                 // fill cluster
307                 for (int iMember = 0; iMember < nMembers; iMember++) {
308                     if (nodeIter.hasNext()) clusterNodes.add(nodeIter.next());
309                 }
310                 // randomize cluster
311                 Collections.shuffle(clusterNodes);
312                 // put cluster to result
313                 nc.addAll(clusterNodes);
314             }
315             // remaining nodes
316             while (nodeIter.hasNext()) {
317                 nc.add(nodeIter.next());
318             }
319         }
320         if (logger.isDebugEnabled()) {
321             for (Node node : nc) {
322                 logger.debug(" node.quality="+node.quality+" host="+node.hostname);
323             }
324         }
325         return nc;
326     }
327 
328 }