View Javadoc

1   /*
2    * Copyright (c) 2005, The K-Wf Grid Consortium
3    * Fraunhofer Institute for Computer Architecture and Software Technology
4    * See http://www.kwfgrid.eu and http://www.first.fraunhofer.de for more details.
5    */
6   package net.kwfgrid.gwes.uiproxy;
7   
8   import java.util.*;
9   
10  import org.apache.log4j.Logger;
11  
12  /**
13   * The UIProxy. Facade for buffers, subscription handler and leasing system.
14   * The current implementation has no sophisticating synchronization management. This may lead to performance issues with
15   * multiple clients and buffers and should be fixed in the future.
16   */
17  public class UIProxyImpl implements UIProxy {
18      protected class Lease extends Thread {
19          protected String _clientid;
20          protected boolean _valid;
21  
22          public Lease(String clientid) {
23              super("UIProxy.Lease [" + clientid + "]");
24              _clientid = clientid;
25              _valid = true;
26              start();
27          }
28  
29          public synchronized void renew() {
30              _valid = true;
31              notifyAll();
32          }
33  
34          public synchronized void cancel() {
35              _valid = false;
36              notifyAll();
37          }
38  
39          public synchronized void run() {
40              if (Thread.currentThread() == this) {
41                  try {
42                      while (_valid) {
43                          _valid = false;
44                          wait(UIProxyImpl.this.getLeaseTime());
45                      }
46                  } catch (InterruptedException x) {
47                      // LOG
48                  }
49                  UIProxyImpl.this.timeout(_clientid);
50              }
51          }
52      }
53  
54      public static final int DEFAULT_LEASE_TIME = 60000;//ms
55  
56      /**
57       * clientid -> bufferids -> buffer.
58       */
59      protected HashMapOfMaps _clients;
60      /**
61       * clientid -> bufferids -> subscription
62       */
63      protected HashMapOfMaps _subscriptions;
64      /**
65       * clientid -> lease
66       */
67      protected HashMap _leases;
68  
69      protected BufferFactory _bfact;
70      protected SubscriptionHandler _shandler;
71      protected MessageCodec _codec;
72      protected int _leasetime;
73  
74      public UIProxyImpl(BufferFactory bfact, MessageCodec codec, SubscriptionHandler shandler) {
75          _bfact = bfact;
76          _shandler = shandler;
77          _codec = codec;
78  
79          _clients = new HashMapOfMaps();
80          _subscriptions = new HashMapOfMaps();
81          _leases = new HashMap();
82          _leasetime = DEFAULT_LEASE_TIME;
83      }
84  
85      public void setLeaseTime(int time) {
86          _leasetime = time;
87      }
88  
89      public int getLeaseTime() {
90          return _leasetime;
91      }
92  
93      protected Buffer getBuffer(String clientid, String bufferid) {
94          return (Buffer) _clients.get(clientid, bufferid);
95      }
96  
97      /**
98       * Subscribe for messages at a publisher. This will instantiate a buffer which stores the notification provided by the publisher.
99       * The specification of the subuscription must conform to the format which is defined by the SubscriptionHandler of the UIProxy.
100      *
101      * @param clientid     The UUID of the client.
102      * @param subscription Specification of the subscription.
103      * @param buffertype   The type of the buffer to be created.
104      * @param propkeys     The keys of the properties of the buffer, same index as <code>propvalues</code>.
105      * @param propvalues   The values of the properties of the buffer, same index as <code>propkeys</code>.
106      * @return The UUID of the newly created buffer. Needed to poll notifications from that buffer.
107      * @throws SubscriptionFailed If the subscription failed.
108      */
109     public synchronized String subscribe(String clientid, String subscription, String buffertype, String[] propkeys, String[] propvalues) throws SubscriptionFailed {
110         Logger.getLogger(UIProxyImpl.class).debug("Subscribing buffer " + buffertype + " for client " + clientid + ".");
111 
112         Map client = _clients.getMap(clientid);
113         if (client == null) {
114             Logger.getLogger(UIProxyImpl.class).debug("New client: " + clientid + ", creating lease.");
115 
116             client = _clients.createKey(clientid);
117             _leases.put(clientid, new Lease(clientid));
118         } else {
119             renewLease(clientid);
120         }
121 
122         if (propkeys.length != propvalues.length)
123             throw new SubscriptionFailed("Subscription failed. Different number of keys and values in properties.");
124 
125         Buffer buffer = null;
126 
127         Properties bufferprops = new Properties();
128         for (int i = 0; i < propkeys.length; i++) {
129             bufferprops.setProperty(propkeys[i], propvalues[i]);
130         }
131 
132         try {
133             buffer = _bfact.createBuffer(buffertype, bufferprops);
134         } catch (InstantiationException x) {
135             Logger.getLogger(UIProxyImpl.class).warn("Could not create buffer " + buffertype + ": " + x + " " + x.getMessage());
136             throw new SubscriptionFailed("Could not create buffer " + buffertype + ": " + x + " " + x.getMessage());
137         }
138 
139         client.put(buffer.getID(), buffer);
140 
141         boolean exception = true;
142         try {
143             String sid = _shandler.subscribe(buffer, subscription);
144             _subscriptions.put(clientid, buffer.getID(), sid);
145             String bufid = buffer.getID();
146             exception = false;
147             return bufid;
148         } finally {
149             if (exception) {
150                 Logger.getLogger(UIProxyImpl.class).warn("Subscription failed for client " + clientid + ".");
151                 disposeBuffer(clientid, buffer.getID());
152             }
153         }
154     }
155 
156     /**
157      * Renew the lease for the specified client.
158      * If the client is unknown the request is silently discarded.
159      *
160      * @param clientid The UUID of the client.
161      */
162     public synchronized void renewLease(String clientid) {
163         Logger.getLogger(UIProxyImpl.class).debug("Renewing lease for client " + clientid + ".");
164 
165         Lease lease = (Lease) _leases.get(clientid);
166         if (lease != null) lease.renew();
167     }
168 
169     /**
170      * Set a property of a buffer.
171      * Calling this method will also renew the lease for the specified client.
172      * If the client is not the creator of the buffer, this method will silently discard the request.
173      *
174      * @param clientid The UUID of the client.
175      * @param bufferid The UUID of the buffer.
176      * @param key      The name of the property.
177      * @param value    The value of the property
178      * @throws BufferException If an exception occured when setting the property.
179      */
180     public synchronized void setBufferProperty(String clientid, String bufferid, String key, String value) throws BufferException {
181         Logger.getLogger(UIProxyImpl.class).debug("Setting buffer property " + key + " to " + value + " for client " + clientid + ", buffer " + bufferid + ".");
182 
183         Buffer buffer = (Buffer) _clients.get(clientid, bufferid);
184         if (buffer != null) buffer.setProperty(key, value);
185         renewLease(clientid);
186     }
187 
188     /**
189      * Dispose a buffer.
190      * Calling this method will also renew the lease for the specified client.
191      * If the client is not the creator of the buffer, this method will silently discard the request.
192      *
193      * @param clientid The UUID of the client.
194      * @param bufferid The UUID of the buffer.
195      */
196     public synchronized void disposeBuffer(String clientid, String bufferid) {
197         Logger.getLogger(UIProxyImpl.class).debug("Disposing buffer " + bufferid + " for client " + clientid + ".");
198 
199         Buffer buffer = (Buffer) _clients.remove(clientid, bufferid);
200         String subscription = (String) _subscriptions.remove(clientid, bufferid);
201         if (subscription != null) _shandler.cancel(subscription);
202         renewLease(clientid);
203     }
204 
205     /**
206      * Dispose a whole client. This will free all resources associated with the client.
207      * If the client is unknown the request will be silently discarded.
208      *
209      * @param clientid The UUID of the client.
210      */
211     public synchronized void disposeClient(String clientid) {
212         Logger.getLogger(UIProxyImpl.class).debug("Disposing client " + clientid + ".");
213 
214         Lease lease = (Lease) _leases.remove(clientid);
215 
216 	if (lease==null) 
217 	    Logger.getLogger(UIProxyImpl.class).info("Client "+clientid+" has no lease.");
218 	else
219 	    lease.cancel();
220 
221         _clients.remove(clientid);
222 
223         Map subscriptions = ((Map) _subscriptions.remove(clientid));
224 	if (subscriptions==null) {
225 	    Logger.getLogger(UIProxyImpl.class).info("Client "+clientid+" has no subscriptions.");
226 	} else {
227 	    Iterator i = subscriptions.values().iterator();
228 	    while (i.hasNext()) {
229 		_shandler.cancel(i.next().toString());
230 	    }
231 	}
232     }
233 
234     /**
235      * Called if a lease has timed out. Disposes the client.
236      */
237     protected synchronized void timeout(String clientid) {
238         Logger.getLogger(UIProxyImpl.class).warn("Lease timed out for client " + clientid + ".");
239 
240         _clients.remove(clientid);
241         _leases.remove(clientid);
242         Map subscriptions = ((Map) _subscriptions.remove(clientid));
243 	if (subscriptions!=null) {
244 	    Iterator i = subscriptions.values().iterator();
245 	    while (i.hasNext()) {
246 		_shandler.cancel(i.next().toString());
247 	    }
248 	}
249     }
250 
251     /**
252      * Poll all buffered notifications for the specified client and buffer.
253      * Calling this method will also renew the lease for the specified client.
254      * If the client is not the creator of the buffer, this method will silently discard the request and return an empty collection.
255      *
256      * @return A collection of all buffered notifications from the specified buffer.
257      * @throws BufferException        If a exception occured at the buffer when handling a message. In that case the client will have to create a new
258      *                                Buffer.
259      * @throws UnknownClientException If the specified client is unknown to the UIProxy. This may also happen if the lease has timed out for the
260      *                                specified client.
261      * @throws UnknownBufferException If the specified buffer is unknown to the UIProxy. This may also happen if the buffer previously encountered
262      *                                a <code>BufferException</code>.
263      *                                specified client.
264      */
265     public synchronized String[] poll(String clientid, String bufferid) throws CodecException, BufferException, UnknownClientException, UnknownBufferException {
266         Logger.getLogger(UIProxyImpl.class).debug("Client " + clientid + " polls buffer " + bufferid + ".");
267 
268         Map client = _clients.getMap(clientid);
269         if (client == null) {
270             Logger.getLogger(UIProxyImpl.class).warn("Client " + clientid + " unknown.");
271             throw new UnknownClientException("unknown client: " + clientid);
272         }
273         renewLease(clientid);
274         Buffer buffer = (Buffer) client.get(bufferid);
275         if (buffer == null) {
276             Logger.getLogger(UIProxyImpl.class).warn("Buffer " + bufferid + " unknown.");
277             throw new UnknownBufferException("unknwon buffer: " + clientid + "/" + bufferid);
278         }
279         boolean exception = true;
280         try {
281             Object[] messages = buffer.read();
282             String[] spec = new String[messages.length];
283             for (int i = 0; i < messages.length; i++) {
284                 spec[i] = _codec.marshal(messages[i]);
285             }
286             exception = false;
287             return spec;
288         } finally {
289             if (exception) {
290                 Logger.getLogger(UIProxyImpl.class).warn("Reading buffer caused exception.");
291                 disposeBuffer(clientid, bufferid);
292             }
293         }
294     }
295 }