View Javadoc

1   /*
2    * Copyright 2010 Fraunhofer Gesellschaft, Munich, Germany,
3    * for its Fraunhofer Institute for Computer Architecture and Software
4    * Technology (FIRST), Berlin, Germany. All rights reserved.
5    * http://www.first.fraunhofer.de/
6    */
7   
8   package net.kwfgrid.gwes.prorater;
9   
10  import net.kwfgrid.gworkflowdl.structure.Transition;
11  import net.kwfgrid.gworkflowdl.structure.OperationClass;
12  import net.kwfgrid.gworkflowdl.structure.OperationCandidate;
13  import net.kwfgrid.gworkflowdl.structure.Operation;
14  import net.kwfgrid.gwes.exception.DatabaseException;
15  import net.kwfgrid.gwes.exception.OperationMapperException;
16  import net.kwfgrid.gwes.GenericWorkflowHandler;
17  import net.kwfgrid.gwes.Constants;
18  import net.kwfgrid.gwes.operationmapper.OperationMapper;
19  
20  import java.util.*;
21  import java.io.IOException;
22  
23  import org.apache.log4j.Logger;
24  
25  /**
26   * @author Andreas Hoheisel
27   *         (<a href="http://www.andreas-hoheisel.de">www.andreas-hoheisel.de</a>)
28   * @author Dietmar Sommerfeld
29   * @author Helge Rosé
30   * @version $Id: RecurrentProrater.java 1540 2011-08-17 13:30:37Z hoheisel $
31   */
32  public class RecurrentProrater extends OperationMapper implements Runnable {
33  
34      /**
35       * key is hardware identifier, value is Node object.
36       */
37      static HashMap<String, Node> nodes;
38  
39      /**
40       * key is hardware identifier, value is time of last assignment of this hardware
41       */
42      static HashMap<String, Long> lastAssignments;
43  
44      /**
45       * log4j logger.
46       */
47      final static Logger logger = Logger.getLogger(SimpleProrater.class);
48  
49      /**
50       * Nodes with a quality below MIN_QUALITY are skipped.
51       */
52      static float MIN_QUALITY;
53  
54      /**
55       * Wait period in Milliseconds after last usage of hardware.
56       */
57      static int WAIT_AFTER_LAST_ASSIGNMENT;
58  
59      /**
60       * Polling interval in Milliseconds.
61       */
62      static long POLL_INTERVAL;
63  
64      /**
65       * All processing steps are done by one single thread for all workflows.
66       */
67      static RecurrentProrater p;
68      static Thread t;
69  
70      final Object lock = new Object();
71  
72      /**
73       * Holds transitions to be processed.
74       */
75      private static Vector<Transition> transitions;
76  
77      static boolean abort; 
78  
79      /**
80       * Constructor.
81       */
82      public RecurrentProrater() {
83          if (transitions == null) {
84              transitions = new Vector<Transition>();
85              logger.debug("Initializing Recurrent Prorater...");
86  
87              MIN_QUALITY = Float.parseFloat(System.getProperty(Constants.PROP_SYSTEM_RESOURCE_PRORATER_MIN_QUALITY, "0.1"));
88              WAIT_AFTER_LAST_ASSIGNMENT = Integer.parseInt(System.getProperty(Constants.PROP_SYSTEM_RESOURCE_PRORATER_WAIT, "12000"));
89              POLL_INTERVAL = Long.parseLong(System.getProperty(Constants.PROP_SYSTEM_RESOURCE_PRORATER_INTERVAL,"1000"));
90  
91              p = new RecurrentProrater();
92              abort = false;
93              // start singleton as thread
94              t = new Thread(p);
95              t.start();
96          }
97      }
98  
99      public boolean processTransition(GenericWorkflowHandler handler, Transition transition) throws OperationMapperException {
100         if (!transitions.contains(transition) && transition.getAbstractionLevel() == Operation.BLUE) {
101             synchronized(p.lock) {
102                 transitions.add(transition);
103             }
104         }
105         return false;
106     }
107 
108     public boolean processTransitions(GenericWorkflowHandler handler, ArrayList<Transition> transitions) throws OperationMapperException {
109         for (Transition t : transitions) {
110             processTransition(handler,t);
111         }
112         return false;
113     }
114 
115     /**
116      * @see Thread#run()
117      */
118     public void run() {
119         while(!abort) {
120             synchronized (p.lock) {
121                 if (transitions.size()>0) {
122                     try {
123                         if (logger.isDebugEnabled()) {
124                             logger.debug("Scheduling the following transitions:");
125                             for (Transition t:transitions) logger.debug(t.getID());
126                         }
127                         initialize(transitions);
128                         decide();
129                         transitions.clear();
130                     } catch (Exception e) {
131                         logger.error("Exception during processTransitions(): " + e, e);
132                     }
133                 }
134             }
135 
136             try {
137                 Thread.sleep(POLL_INTERVAL);
138             } catch (InterruptedException e) {
139                 logger.error("exception:\n" + e, e);
140                 t.interrupt();
141             }
142         }
143     }
144 
145     static void initialize(Vector<Transition> transitions) throws IOException {
146         logger.debug("Init:");
147         long now = System.currentTimeMillis();
148         if (nodes == null) nodes = new HashMap<String, Node>();
149         if (lastAssignments == null) lastAssignments = new HashMap<String, Long>();
150         nodes.clear();
151         int priority = 0;
152         for (Transition transition : transitions) {
153             synchronized (transition.getOperation()) {
154                 Operation operation = transition.getOperation();
155                 if (operation == null) {
156                     logger.warn("No <operation> available for transition "+transition.getID());
157                     continue;
158                 }
159                 Object oo = transition.getOperation().get();
160                 if (oo instanceof OperationClass) {
161 
162                     OperationCandidate[] operationCandidates = ((OperationClass) oo).getOperationCandidates();
163                     if (operationCandidates == null) {
164                         logger.warn("No <operationCandidate> available for transition "+transition.getID());
165                         continue;
166                     }
167 
168                     for (OperationCandidate operationCandidate : operationCandidates) {
169                         String resourceName = operationCandidate.getResourceName();
170                         if (resourceName == null) {
171                             logger.warn("No <resourceName> available for operationCandidate of transition "+transition.getID());
172                             continue;
173                         }
174                         // skip hardware that has been recently assigned (wait for load update)
175                         Long last = lastAssignments.get(resourceName);
176                         if (last != null && now - last < WAIT_AFTER_LAST_ASSIGNMENT) continue;
177                         // <property name="priority">1</property>
178                         String propStr = transition.getProperties().get(Constants.PROP_TRANSITION_PRIORITY);
179                         if (propStr != null) {
180                             try {
181                                 priority = Integer.parseInt(propStr);
182                             } catch (NumberFormatException e) {
183                                 // do nothing.
184                             }
185                         }
186                         Alternative alternative = new Alternative(transition, operationCandidate, priority);
187                         Node node = nodes.get(resourceName);
188 
189                         if (node == null) {
190                             node = new Node(resourceName);
191                             node.updateQuality(operationCandidate.getQuality());
192                             nodes.put(resourceName, node);
193                         }
194 
195                         // Only nodes with a high quality get into the game
196                         if (node.quality > MIN_QUALITY) {
197                             node.add(alternative);
198                         }
199                         // for the others set the quality
200                         else {
201                             alternative.operationCandidate.setQuality(node.quality);
202                         }
203 
204                     }
205                     ++priority;
206                 }
207             }
208         }
209 
210         if (logger.isDebugEnabled()) {
211             logger.debug("--------- last resource assignments ---------");
212             for (String hw : lastAssignments.keySet()) {
213                 logger.debug(hw + ": " + lastAssignments.get(hw));
214             }
215         }
216     }
217 
218     /**
219      * @return True if there has been a decision, false if not.
220      */
221     static boolean decide() {
222         boolean ret = false;
223         if (nodes.size() == 0) return ret;
224         logger.debug("Decide:");
225         ArrayList<Transition> tdone = new ArrayList<Transition>();
226         List<Node> nc = generateClusteredRandomDistribution(nodes);
227         for (Node node : nc) {
228             int nr = node.alternatives.size();	// number of alternative operationCandidates involving this host
229             if (nr == 0) continue;
230             if (logger.isDebugEnabled()) {
231                 logger.debug("Node: " + node.hostname);
232             }
233             Collections.sort(node.alternatives);
234             Collections.reverse(node.alternatives);	// transitions with high priority first
235             for (int i = 0; i < nr; ++i) {
236                 Alternative a = node.alternatives.get(i);
237                 if (!tdone.contains(a.transition)) {
238                     // find equal priorities
239                     int tries, j = i;
240                     while (j + 1 < nr && a.priority == node.alternatives.get(j + 1).priority) ++j;
241                     // choose random transition in case of equal priorities
242                     if (j > i) {
243                         //System.out.println("node: "+node.hostname+" equal priorities: "+i+"-"+j);
244                         tries = j - i;
245                         while (tries-- > 0) {
246                             Alternative b = node.alternatives.get(i + (int) ((j - i) * Math.random() + 0.5));
247                             if (!tdone.contains(b.transition)) {
248                                 a = b;
249                                 break;
250                             }
251                         }
252                     }
253                     a.operationCandidate.setSelected(true);
254                     ret = true;
255                     tdone.add(a.transition);
256                     lastAssignments.put(node.hostname, System.currentTimeMillis());
257                     logger.debug("  " + a.priority + ": " + a.operationCandidate.getResourceName() + ": " + a.transition.getID());
258                     break;
259                 }
260             }
261         }
262         return ret;
263     }
264 
265     public static List<Node> generateClusteredRandomDistribution(Map<String, Node> nodes) {
266 
267         // at least 3 members per quality cluster, maximum 5 quality clusters
268         int nClusters = nodes.size() / 3;
269         if (nClusters < 1) nClusters = 1;
270         else if (nClusters > 5) nClusters = 5;
271         int nMembers = nodes.size() / nClusters;
272         if (logger.isDebugEnabled()) {
273             logger.debug("nodes.size="+nodes.size()+" nClusters="+nClusters+" nMembers="+nMembers);
274         }
275 
276         List<Node> nc = new ArrayList<Node>();
277 
278         // not enough members for clustering
279         if (nClusters == 1) {
280             nc.addAll(nodes.values());
281             Collections.shuffle(nc, new Random(System.currentTimeMillis()));
282         }
283         // more than one cluster
284         else {
285             // sort nodes by quality
286             List<Node> sortedNodes = new ArrayList<Node>();
287             sortedNodes.addAll(nodes.values());
288             Collections.sort(sortedNodes);
289             Collections.reverse(sortedNodes);
290 
291             // loop clusters
292             List<Node> clusterNodes = new ArrayList<Node>();
293             Iterator nodeIter = sortedNodes.iterator();
294             for (int iCluster = 0; iCluster < nClusters; iCluster++) {
295                 clusterNodes.clear();
296                 // fill cluster
297                 for (int iMember = 0; iMember < nMembers; iMember++) {
298                     if (nodeIter.hasNext()) clusterNodes.add((Node)nodeIter.next());
299                 }
300                 // randomize cluster
301                 Collections.shuffle(clusterNodes,new Random(System.currentTimeMillis()));
302                 // put cluster to result
303                 nc.addAll(clusterNodes);
304             }
305             // remaining nodes
306             while (nodeIter.hasNext()) {
307                 nc.add((Node)nodeIter.next());
308             }
309         }
310         if (logger.isDebugEnabled()) {
311             for (Node node : nc) {
312                 logger.debug("node quality="+node.quality+" host="+node.hostname);
313             }
314         }
315         return nc;
316     }
317 
318 }