1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Idlequeues are Gajim's network heartbeat. Transports can be plugged as idle
18 objects and be informed about possible IO
19 """
20
21 import os
22 import select
23 import logging
24 log = logging.getLogger('nbxmpp.idlequeue')
25
26
27 try:
28 import gobject
29 HAVE_GOBJECT = True
30 except ImportError:
31 HAVE_GOBJECT = False
32
33
34 if os.name == 'nt':
35 from subprocess import *
36 elif os.name == 'posix':
37 import fcntl
38
39 FLAG_WRITE = 20
40 FLAG_READ = 19
41 FLAG_READ_WRITE = 23
42 FLAG_CLOSE = 16
43
44 PENDING_READ = 3
45 PENDING_WRITE = 4
46 IS_CLOSED = 16
47
48
63
64
66 """
67 Idle listener interface. Listed methods are called by IdleQueue.
68 """
69
72
74 """
75 Called on stream failure
76 """
77 pass
78
80 """
81 Called on new read event
82 """
83 pass
84
86 """
87 Called on new write event (connect in sockets is a pollout)
88 """
89 pass
90
92 """
93 Called when timeout happened
94 """
95 pass
96
97
99 """
100 Can be subclassed to execute commands asynchronously by the idlequeue.
101 Result will be optained via file descriptor of created pipe
102 """
103
105 IdleObject.__init__(self)
106
107
108 self.commandtimeout = 0
109
110 self.result_handler = on_result
111
112 self.canexecute = True
113 self.idlequeue = None
114 self.result =''
115
118
120 if self.result_handler:
121 self.result_handler(self.result)
122 self.result_handler = None
123
125 return ['echo', 'da']
126
128 """
129 Return one line representation of command and its arguments
130 """
131 return reduce(lambda left, right: left + ' ' + right,
132 self._compose_command_args())
133
150
152 if not self.canexecute:
153 self.result = ''
154 self._return_result()
155 return
156 if os.name == 'nt':
157 self._start_nt()
158 elif os.name == 'posix':
159 self._start_posix()
160
169
177
184
189
191 try:
192 res = self.pipe.read()
193 except Exception, e:
194 res = ''
195 if res == '':
196 return self.pollend()
197 else:
198 self.result += res
199
203
204
206 """
207 IdleQueue provide three distinct time based features. Uses select.poll()
208
209 1. Alarm timeout: Execute a callback after foo seconds
210 2. Timeout event: Call read_timeout() of an plugged object if a timeout
211 has been set, but not removed in time.
212 3. Check file descriptor of plugged objects for read, write and error
213 events
214
215 """
216
217
218
219 PROCESS_TIMEOUT = (100, False)
220
222 self.queue = {}
223
224
225
226
227
228
229 self.read_timeouts = {}
230
231
232 self.alarms = {}
233 self._init_idle()
234
236 """
237 Hook method for subclassed. Will be called by __init__
238 """
239 self.selector = select.poll()
240
242 """
243 Set up a new alarm. alarm_cb will be called after specified seconds.
244 """
245 alarm_time = self.current_time() + seconds
246
247 if alarm_time in self.alarms:
248 self.alarms[alarm_time].append(alarm_cb)
249 else:
250 self.alarms[alarm_time] = [alarm_cb]
251 return alarm_time
252
254 """
255 Remove alarm callback alarm_cb scheduled on alarm_time. Returns True if
256 it was removed sucessfully, otherwise False
257 """
258 if not alarm_time in self.alarms:
259 return False
260 i = -1
261 for i in range(len(self.alarms[alarm_time])):
262
263 if self.alarms[alarm_time][i] is alarm_cb:
264 break
265 if i != -1:
266 del self.alarms[alarm_time][i]
267 if self.alarms[alarm_time] == []:
268 del self.alarms[alarm_time]
269 return True
270 else:
271 return False
272
274 """
275 Remove the read timeout
276 """
277 log.info('read timeout removed for fd %s' % fd)
278 if fd in self.read_timeouts:
279 if timeout:
280 if timeout in self.read_timeouts[fd]:
281 del(self.read_timeouts[fd][timeout])
282 if len(self.read_timeouts[fd]) == 0:
283 del(self.read_timeouts[fd])
284 else:
285 del(self.read_timeouts[fd])
286
288 """
289 Seta a new timeout. If it is not removed after specified seconds,
290 func or obj.read_timeout() will be called
291
292 A filedescriptor fd can have several timeouts.
293 """
294 log_txt = 'read timeout set for fd %s on %s seconds' % (fd, seconds)
295 if func:
296 log_txt += ' with function ' + str(func)
297 log.info(log_txt)
298 timeout = self.current_time() + seconds
299 if fd in self.read_timeouts:
300 self.read_timeouts[fd][timeout] = func
301 else:
302 self.read_timeouts[fd] = {timeout: func}
303
305 """
306 Execute and remove alarm callbacks and execute func() or read_timeout()
307 for plugged objects if specified time has ellapsed
308 """
309 current_time = self.current_time()
310
311 for fd, timeouts in self.read_timeouts.items():
312 if fd not in self.queue:
313 self.remove_timeout(fd)
314 continue
315 for timeout, func in timeouts.items():
316 if timeout > current_time:
317 continue
318 if func:
319 log.debug('Calling %s for fd %s' % (func, fd))
320 func()
321 else:
322 log.debug('Calling read_timeout for fd %s' % fd)
323 self.queue[fd].read_timeout()
324 self.remove_timeout(fd, timeout)
325
326 times = self.alarms.keys()
327 for alarm_time in times:
328 if alarm_time > current_time:
329 continue
330 if alarm_time in self.alarms:
331 for callback in self.alarms[alarm_time]:
332 callback()
333 if alarm_time in self.alarms:
334 del(self.alarms[alarm_time])
335
336 - def plug_idle(self, obj, writable=True, readable=True):
337 """
338 Plug an IdleObject into idlequeue. Filedescriptor fd must be set
339
340 :param obj: the IdleObject
341 :param writable: True if obj has data to sent
342 :param readable: True if obj expects data to be reiceived
343 """
344 if obj.fd == -1:
345 return
346 if obj.fd in self.queue:
347 self.unplug_idle(obj.fd)
348 self.queue[obj.fd] = obj
349 if writable:
350 if not readable:
351 flags = FLAG_WRITE
352 else:
353 flags = FLAG_READ_WRITE
354 else:
355 if readable:
356 flags = FLAG_READ
357 else:
358
359 flags = FLAG_CLOSE
360 self._add_idle(obj.fd, flags)
361
363 """
364 Hook method for subclasses, called by plug_idle
365 """
366 self.selector.register(fd, flags)
367
369 """
370 Remove plugged IdleObject, specified by filedescriptor fd
371 """
372 if fd in self.queue:
373 del(self.queue[fd])
374 self._remove_idle(fd)
375
377 from time import time
378 return time()
379
381 """
382 Hook method for subclassed, called by unplug_idle
383 """
384 self.selector.unregister(fd)
385
412
414 """
415 Process idlequeue. Check for any pending timeout or alarm events. Call
416 IdleObjects on possible and requested read, write and error events on
417 their file descriptors
418
419 Call this in regular intervals.
420 """
421 if not self.queue:
422
423 self._check_time_events()
424 return True
425 try:
426 waiting_descriptors = self.selector.poll(0)
427 except select.error, e:
428 waiting_descriptors = []
429 if e[0] != 4:
430 raise
431 for fd, flags in waiting_descriptors:
432 self._process_events(fd, flags)
433 self._check_time_events()
434 return True
435
436
438 """
439 Extends IdleQueue to use select.select() for polling
440
441 This class exisists for the sake of gtk2.8 on windows, which doesn't seem to
442 support io_add_watch properly (yet)
443 """
444
446 """
447 Create a dict, which maps file/pipe/sock descriptor to glib event id
448 """
449 self.read_fds = {}
450 self.write_fds = {}
451 self.error_fds = {}
452
454 """
455 This method is called when we plug a new idle object. Remove descriptor
456 to read/write/error lists, according flags
457 """
458 if flags & 3:
459 self.read_fds[fd] = fd
460 if flags & 4:
461 self.write_fds[fd] = fd
462 self.error_fds[fd] = fd
463
465 """
466 This method is called when we unplug a new idle object. Remove descriptor
467 from read/write/error lists
468 """
469 if fd in self.read_fds:
470 del(self.read_fds[fd])
471 if fd in self.write_fds:
472 del(self.write_fds[fd])
473 if fd in self.error_fds:
474 del(self.error_fds[fd])
475
477 if not self.write_fds and not self.read_fds:
478 self._check_time_events()
479 return True
480 try:
481 waiting_descriptors = select.select(self.read_fds.keys(),
482 self.write_fds.keys(), self.error_fds.keys(), 0)
483 except select.error, e:
484 waiting_descriptors = ((), (), ())
485 if e[0] != 4:
486 raise
487 for fd in waiting_descriptors[0]:
488 q = self.queue.get(fd)
489 if q:
490 q.pollin()
491 for fd in waiting_descriptors[1]:
492 q = self.queue.get(fd)
493 if q:
494 q.pollout()
495 for fd in waiting_descriptors[2]:
496 q = self.queue.get(fd)
497 if q:
498 q.pollend()
499 self._check_time_events()
500 return True
501
502
504 """
505 Extends IdleQueue to use glib io_add_wath, instead of select/poll In another
506 'non gui' implementation of Gajim IdleQueue can be used safetly
507 """
508
509
510
511 PROCESS_TIMEOUT = (2, True)
512
514 """
515 Creates a dict, which maps file/pipe/sock descriptor to glib event id
516 """
517 self.events = {}
518
519
520 self.current_time = gobject.get_current_time
521
523 """
524 This method is called when we plug a new idle object. Start listening for
525 events from fd
526 """
527 res = gobject.io_add_watch(fd, flags, self._process_events,
528 priority=gobject.PRIORITY_LOW)
529
530 self.events[fd] = res
531
539
541 """
542 This method is called when we unplug a new idle object. Stop listening
543 for events from fd
544 """
545 if not fd in self.events:
546 return
547 gobject.source_remove(self.events[fd])
548 del(self.events[fd])
549
552