1
2
3
4
5
6 package net.kwfgrid.gwes.uiproxy;
7
8 import java.util.*;
9
10 import org.apache.log4j.Logger;
11
12
13
14
15
16
17 public class UIProxyImpl implements UIProxy {
18 protected class Lease extends Thread {
19 protected String _clientid;
20 protected boolean _valid;
21
22 public Lease(String clientid) {
23 super("UIProxy.Lease [" + clientid + "]");
24 _clientid = clientid;
25 _valid = true;
26 start();
27 }
28
29 public synchronized void renew() {
30 _valid = true;
31 notifyAll();
32 }
33
34 public synchronized void cancel() {
35 _valid = false;
36 notifyAll();
37 }
38
39 public synchronized void run() {
40 if (Thread.currentThread() == this) {
41 try {
42 while (_valid) {
43 _valid = false;
44 wait(UIProxyImpl.this.getLeaseTime());
45 }
46 } catch (InterruptedException x) {
47
48 }
49 UIProxyImpl.this.timeout(_clientid);
50 }
51 }
52 }
53
54 public static final int DEFAULT_LEASE_TIME = 60000;
55
56
57
58
59 protected HashMapOfMaps _clients;
60
61
62
63 protected HashMapOfMaps _subscriptions;
64
65
66
67 protected HashMap _leases;
68
69 protected BufferFactory _bfact;
70 protected SubscriptionHandler _shandler;
71 protected MessageCodec _codec;
72 protected int _leasetime;
73
74 public UIProxyImpl(BufferFactory bfact, MessageCodec codec, SubscriptionHandler shandler) {
75 _bfact = bfact;
76 _shandler = shandler;
77 _codec = codec;
78
79 _clients = new HashMapOfMaps();
80 _subscriptions = new HashMapOfMaps();
81 _leases = new HashMap();
82 _leasetime = DEFAULT_LEASE_TIME;
83 }
84
85 public void setLeaseTime(int time) {
86 _leasetime = time;
87 }
88
89 public int getLeaseTime() {
90 return _leasetime;
91 }
92
93 protected Buffer getBuffer(String clientid, String bufferid) {
94 return (Buffer) _clients.get(clientid, bufferid);
95 }
96
97
98
99
100
101
102
103
104
105
106
107
108
109 public synchronized String subscribe(String clientid, String subscription, String buffertype, String[] propkeys, String[] propvalues) throws SubscriptionFailed {
110 Logger.getLogger(UIProxyImpl.class).debug("Subscribing buffer " + buffertype + " for client " + clientid + ".");
111
112 Map client = _clients.getMap(clientid);
113 if (client == null) {
114 Logger.getLogger(UIProxyImpl.class).debug("New client: " + clientid + ", creating lease.");
115
116 client = _clients.createKey(clientid);
117 _leases.put(clientid, new Lease(clientid));
118 } else {
119 renewLease(clientid);
120 }
121
122 if (propkeys.length != propvalues.length)
123 throw new SubscriptionFailed("Subscription failed. Different number of keys and values in properties.");
124
125 Buffer buffer = null;
126
127 Properties bufferprops = new Properties();
128 for (int i = 0; i < propkeys.length; i++) {
129 bufferprops.setProperty(propkeys[i], propvalues[i]);
130 }
131
132 try {
133 buffer = _bfact.createBuffer(buffertype, bufferprops);
134 } catch (InstantiationException x) {
135 Logger.getLogger(UIProxyImpl.class).warn("Could not create buffer " + buffertype + ": " + x + " " + x.getMessage());
136 throw new SubscriptionFailed("Could not create buffer " + buffertype + ": " + x + " " + x.getMessage());
137 }
138
139 client.put(buffer.getID(), buffer);
140
141 boolean exception = true;
142 try {
143 String sid = _shandler.subscribe(buffer, subscription);
144 _subscriptions.put(clientid, buffer.getID(), sid);
145 String bufid = buffer.getID();
146 exception = false;
147 return bufid;
148 } finally {
149 if (exception) {
150 Logger.getLogger(UIProxyImpl.class).warn("Subscription failed for client " + clientid + ".");
151 disposeBuffer(clientid, buffer.getID());
152 }
153 }
154 }
155
156
157
158
159
160
161
162 public synchronized void renewLease(String clientid) {
163 Logger.getLogger(UIProxyImpl.class).debug("Renewing lease for client " + clientid + ".");
164
165 Lease lease = (Lease) _leases.get(clientid);
166 if (lease != null) lease.renew();
167 }
168
169
170
171
172
173
174
175
176
177
178
179
180 public synchronized void setBufferProperty(String clientid, String bufferid, String key, String value) throws BufferException {
181 Logger.getLogger(UIProxyImpl.class).debug("Setting buffer property " + key + " to " + value + " for client " + clientid + ", buffer " + bufferid + ".");
182
183 Buffer buffer = (Buffer) _clients.get(clientid, bufferid);
184 if (buffer != null) buffer.setProperty(key, value);
185 renewLease(clientid);
186 }
187
188
189
190
191
192
193
194
195
196 public synchronized void disposeBuffer(String clientid, String bufferid) {
197 Logger.getLogger(UIProxyImpl.class).debug("Disposing buffer " + bufferid + " for client " + clientid + ".");
198
199 Buffer buffer = (Buffer) _clients.remove(clientid, bufferid);
200 String subscription = (String) _subscriptions.remove(clientid, bufferid);
201 if (subscription != null) _shandler.cancel(subscription);
202 renewLease(clientid);
203 }
204
205
206
207
208
209
210
211 public synchronized void disposeClient(String clientid) {
212 Logger.getLogger(UIProxyImpl.class).debug("Disposing client " + clientid + ".");
213
214 Lease lease = (Lease) _leases.remove(clientid);
215
216 if (lease==null)
217 Logger.getLogger(UIProxyImpl.class).info("Client "+clientid+" has no lease.");
218 else
219 lease.cancel();
220
221 _clients.remove(clientid);
222
223 Map subscriptions = ((Map) _subscriptions.remove(clientid));
224 if (subscriptions==null) {
225 Logger.getLogger(UIProxyImpl.class).info("Client "+clientid+" has no subscriptions.");
226 } else {
227 Iterator i = subscriptions.values().iterator();
228 while (i.hasNext()) {
229 _shandler.cancel(i.next().toString());
230 }
231 }
232 }
233
234
235
236
237 protected synchronized void timeout(String clientid) {
238 Logger.getLogger(UIProxyImpl.class).warn("Lease timed out for client " + clientid + ".");
239
240 _clients.remove(clientid);
241 _leases.remove(clientid);
242 Map subscriptions = ((Map) _subscriptions.remove(clientid));
243 if (subscriptions!=null) {
244 Iterator i = subscriptions.values().iterator();
245 while (i.hasNext()) {
246 _shandler.cancel(i.next().toString());
247 }
248 }
249 }
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265 public synchronized String[] poll(String clientid, String bufferid) throws CodecException, BufferException, UnknownClientException, UnknownBufferException {
266 Logger.getLogger(UIProxyImpl.class).debug("Client " + clientid + " polls buffer " + bufferid + ".");
267
268 Map client = _clients.getMap(clientid);
269 if (client == null) {
270 Logger.getLogger(UIProxyImpl.class).warn("Client " + clientid + " unknown.");
271 throw new UnknownClientException("unknown client: " + clientid);
272 }
273 renewLease(clientid);
274 Buffer buffer = (Buffer) client.get(bufferid);
275 if (buffer == null) {
276 Logger.getLogger(UIProxyImpl.class).warn("Buffer " + bufferid + " unknown.");
277 throw new UnknownBufferException("unknwon buffer: " + clientid + "/" + bufferid);
278 }
279 boolean exception = true;
280 try {
281 Object[] messages = buffer.read();
282 String[] spec = new String[messages.length];
283 for (int i = 0; i < messages.length; i++) {
284 spec[i] = _codec.marshal(messages[i]);
285 }
286 exception = false;
287 return spec;
288 } finally {
289 if (exception) {
290 Logger.getLogger(UIProxyImpl.class).warn("Reading buffer caused exception.");
291 disposeBuffer(clientid, bufferid);
292 }
293 }
294 }
295 }