View Javadoc

1   /*
2    * Copyright (c) 2005, The K-Wf Grid Consortium
3    * Fraunhofer Institute for Computer Architecture and Software Technology
4    * See http://www.kwfgrid.eu and http://www.first.fraunhofer.de for more details.
5    */
6   package net.kwfgrid.gwes.uiproxy;
7   
8   import java.util.*;
9   
10  /**
11     An abstract buffer implementation to order messages with sequence numbers.
12     Subclasses must implement <code>getSequenceNumber(message)</code> to access the sequence number of an incoming message.
13     A <code>SequenceBuffer's</code> client must explicitly initialize the buffer by setting the <code>SequenceBuffer.first</code>
14     property to the first valid sequence number. The buffer will not return any messages on <code>read()</code> before
15     this property has been set.
16     This buffer supports the following properties:
17     <ul>
18     <li>Buffer.size</li>
19     <li>SequenceBuffer.first</li>
20     </ul>
21   */
22  public abstract class SequenceBuffer extends AbstractBuffer {
23      public static final String FIRST_PROPERTY_KEY = "SequenceBuffer.first";
24  
25      protected int _first;
26      protected boolean _initialized;
27      protected BufferException _exception;
28      protected LinkedList _content, _precontent;
29  
30      public SequenceBuffer() {
31  	super();
32  	_first = -1;
33  	_exception = null;
34  	_initialized = false;
35  	_precontent = new LinkedList();
36  	_content = new LinkedList();
37  	for (int i=0; i<getSize(); i++) _content.add(null); 
38      }
39  
40      /**
41         Generic method to access the sequence number of the specified message.
42         Sequence numbers are expected to be positive integers.
43       */
44      protected abstract int getSequenceNumber(Object message) throws Exception;
45      
46      protected synchronized void setSize(int size) throws BufferException {
47  	if (size>getSize()) {
48  	    for (int i=_content.size(); i<size; i++) {
49  		_content.add(null);
50  	    }
51  	} else if (size<getSize()) {
52  	    for (int i=0; i<getSize()-size; i++) {
53  		Object message = _content.removeLast();
54  		if (message!=null) _exception = new BufferOverflowException(this);
55  	    }	    
56  	}
57  	if (_exception!=null) throw _exception;
58  	super.setSize(size);
59      }
60  
61      protected synchronized void initialize(int first) throws BufferException {	
62  	try {
63  	    _first = first;
64  	    Iterator messages = _precontent.iterator(); while(messages.hasNext()) {
65  		Object message = messages.next();
66  		int seq = getSequenceNumber(message);
67  		if (seq-_first>=0) {
68  		    _content.set(seq-_first, message);
69  		}
70  	    }
71  	    _precontent.clear();
72  	    _initialized = true;
73  	} catch (Exception x) {
74  	    _exception = new BufferException(this, "Exception getting sequence number.", x);
75  	    throw _exception;
76  	}
77      }
78  
79      public synchronized void setProperty(String key, String value) throws BufferException {
80  	if (FIRST_PROPERTY_KEY.equals(key)) {
81  	    try {
82  		int first = Integer.parseInt(value);
83  		initialize(first);
84  	    } catch (NumberFormatException x) {
85  		_exception = new BufferException(this, "Exception getting sequence number.", x);
86  		throw _exception;
87  	    }	    
88  	} else {
89  	    super.setProperty(key, value);
90  	}
91      }
92  
93      public synchronized void handle(Object message) {
94  	try {
95  	    if (shouldHandle(message)) {
96  		if (_exception==null) {
97  		    if (!_initialized) {
98  			if (_precontent.size()>=getSize()) _exception = new BufferOverflowException(this);
99  			else _precontent.add(message);
100 		    } else {
101 			int seq = getSequenceNumber(message);
102 			if (seq-_first>=getSize()) _exception = new BufferOverflowException(this);
103 			else if (seq-_first<0) return; // discard old doubled messages
104 			else _content.set(seq-_first, message);
105 		    }
106 		}
107 	    }
108 	} catch (Exception x) {
109 	    _exception = new BufferException(this, "Exception getting sequence number.", x);
110 	}
111     }
112 
113     /**
114        Determine if this buffer should handle the specified message.
115        Subclasses must override this method in order to sort out messages of interest.
116      */
117     protected abstract boolean shouldHandle(Object message) throws Exception;
118 
119     /**
120        Will always return messages in correct order. 
121      */
122     public synchronized Object[] read() throws BufferException {
123 	if (_exception!=null) throw _exception;
124 	if (!_initialized) return new Object[0];
125 	List result = new LinkedList();
126 	while (true) {
127 	    if (_content.size()==0) break;
128 	    Object message = _content.get(0);
129 	    if (message==null) break;
130 	    result.add(message);
131 	    _content.removeFirst();
132 	    _content.addLast(null);
133 	    _first++;
134 	}
135 	return result.toArray();
136     }
137 }
138