Package nbxmpp :: Module idlequeue
[hide private]
[frames] | no frames]

Source Code for Module nbxmpp.idlequeue

  1  ##   idlequeue.py 
  2  ## 
  3  ##   Copyright (C) 2006 Dimitur Kirov <dkirov@gmail.com> 
  4  ## 
  5  ##   This program is free software; you can redistribute it and/or modify 
  6  ##   it under the terms of the GNU General Public License as published by 
  7  ##   the Free Software Foundation; either version 2, or (at your option) 
  8  ##   any later version. 
  9  ## 
 10  ##   This program is distributed in the hope that it will be useful, 
 11  ##   but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  ##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  ##   GNU General Public License for more details. 
 14   
 15   
 16  """ 
 17  Idlequeues are Gajim's network heartbeat. Transports can be plugged as idle 
 18  objects and be informed about possible IO 
 19  """ 
 20   
 21  import os 
 22  import select 
 23  import logging 
 24  log = logging.getLogger('nbxmpp.idlequeue') 
 25   
 26  # needed for get_idleqeue 
 27  try: 
 28      import gobject 
 29      HAVE_GOBJECT = True 
 30  except ImportError: 
 31      HAVE_GOBJECT = False 
 32   
 33  # needed for idlecommand 
 34  if os.name == 'nt': 
 35      from subprocess import * # python24 only. we ask this for Windows 
 36  elif os.name == 'posix': 
 37      import fcntl 
 38   
 39  FLAG_WRITE                      = 20 # write only 
 40  FLAG_READ                       = 19 # read only 
 41  FLAG_READ_WRITE = 23 # read and write 
 42  FLAG_CLOSE                      = 16 # wait for close 
 43   
 44  PENDING_READ            = 3 # waiting read event 
 45  PENDING_WRITE           = 4 # waiting write event 
 46  IS_CLOSED                       = 16 # channel closed 
 47   
 48   
49 -def get_idlequeue():
50 """ 51 Get an appropriate idlequeue 52 """ 53 if os.name == 'nt': 54 # gobject.io_add_watch does not work on windows 55 return SelectIdleQueue() 56 else: 57 if HAVE_GOBJECT: 58 # Gajim's default Idlequeue 59 return GlibIdleQueue() 60 else: 61 # GUI less implementation 62 return SelectIdleQueue()
63 64
65 -class IdleObject:
66 """ 67 Idle listener interface. Listed methods are called by IdleQueue. 68 """ 69
70 - def __init__(self):
71 self.fd = -1 #: filedescriptor, must be unique for each IdleObject
72
73 - def pollend(self):
74 """ 75 Called on stream failure 76 """ 77 pass
78
79 - def pollin(self):
80 """ 81 Called on new read event 82 """ 83 pass
84
85 - def pollout(self):
86 """ 87 Called on new write event (connect in sockets is a pollout) 88 """ 89 pass
90
91 - def read_timeout(self):
92 """ 93 Called when timeout happened 94 """ 95 pass
96 97
98 -class IdleCommand(IdleObject):
99 """ 100 Can be subclassed to execute commands asynchronously by the idlequeue. 101 Result will be optained via file descriptor of created pipe 102 """ 103
104 - def __init__(self, on_result):
105 IdleObject.__init__(self) 106 # how long (sec.) to wait for result ( 0 - forever ) 107 # it is a class var, instead of a constant and we can override it. 108 self.commandtimeout = 0 109 # when we have some kind of result (valid, ot not) we call this handler 110 self.result_handler = on_result 111 # if it is True, we can safetely execute the command 112 self.canexecute = True 113 self.idlequeue = None 114 self.result =''
115
116 - def set_idlequeue(self, idlequeue):
117 self.idlequeue = idlequeue
118
119 - def _return_result(self):
120 if self.result_handler: 121 self.result_handler(self.result) 122 self.result_handler = None
123
124 - def _compose_command_args(self):
125 return ['echo', 'da']
126
127 - def _compose_command_line(self):
128 """ 129 Return one line representation of command and its arguments 130 """ 131 return reduce(lambda left, right: left + ' ' + right, 132 self._compose_command_args())
133
134 - def wait_child(self):
135 if self.pipe.poll() is None: 136 # result timeout 137 if self.endtime < self.idlequeue.current_time(): 138 self._return_result() 139 self.pipe.stdout.close() 140 self.pipe.stdin.close() 141 else: 142 # child is still active, continue to wait 143 self.idlequeue.set_alarm(self.wait_child, 0.1) 144 else: 145 # child has quit 146 self.result = self.pipe.stdout.read() 147 self._return_result() 148 self.pipe.stdout.close() 149 self.pipe.stdin.close()
150
151 - def start(self):
152 if not self.canexecute: 153 self.result = '' 154 self._return_result() 155 return 156 if os.name == 'nt': 157 self._start_nt() 158 elif os.name == 'posix': 159 self._start_posix()
160
161 - def _start_nt(self):
162 # if program is started from noninteraactive shells stdin is closed and 163 # cannot be forwarded, so we have to keep it open 164 self.pipe = Popen(self._compose_command_args(), stdout=PIPE, 165 bufsize = 1024, shell = True, stderr = STDOUT, stdin = PIPE) 166 if self.commandtimeout >= 0: 167 self.endtime = self.idlequeue.current_time() + self.commandtimeout 168 self.idlequeue.set_alarm(self.wait_child, 0.1)
169
170 - def _start_posix(self):
171 self.pipe = os.popen(self._compose_command_line()) 172 self.fd = self.pipe.fileno() 173 fcntl.fcntl(self.pipe, fcntl.F_SETFL, os.O_NONBLOCK) 174 self.idlequeue.plug_idle(self, False, True) 175 if self.commandtimeout >= 0: 176 self.idlequeue.set_read_timeout(self.fd, self.commandtimeout)
177
178 - def end(self):
179 self.idlequeue.unplug_idle(self.fd) 180 try: 181 self.pipe.close() 182 except: 183 pass
184
185 - def pollend(self):
186 self.idlequeue.remove_timeout(self.fd) 187 self.end() 188 self._return_result()
189
190 - def pollin(self):
191 try: 192 res = self.pipe.read() 193 except Exception, e: 194 res = '' 195 if res == '': 196 return self.pollend() 197 else: 198 self.result += res
199
200 - def read_timeout(self):
201 self.end() 202 self._return_result()
203 204
205 -class IdleQueue:
206 """ 207 IdleQueue provide three distinct time based features. Uses select.poll() 208 209 1. Alarm timeout: Execute a callback after foo seconds 210 2. Timeout event: Call read_timeout() of an plugged object if a timeout 211 has been set, but not removed in time. 212 3. Check file descriptor of plugged objects for read, write and error 213 events 214 215 """ 216 217 # (timeout, boolean) 218 # Boolean is True if timeout is specified in seconds, False means miliseconds 219 PROCESS_TIMEOUT = (100, False) 220
221 - def __init__(self):
222 self.queue = {} 223 224 # when there is a timeout it executes obj.read_timeout() 225 # timeout is not removed automatically! 226 # {fd1: {timeout1: func1, timeout2: func2}} 227 # timout are unique (timeout1 must be != timeout2) 228 # If func1 is None, read_time function is called 229 self.read_timeouts = {} 230 231 # cb, which are executed after XX sec., alarms are removed automatically 232 self.alarms = {} 233 self._init_idle()
234
235 - def _init_idle(self):
236 """ 237 Hook method for subclassed. Will be called by __init__ 238 """ 239 self.selector = select.poll()
240
241 - def set_alarm(self, alarm_cb, seconds):
242 """ 243 Set up a new alarm. alarm_cb will be called after specified seconds. 244 """ 245 alarm_time = self.current_time() + seconds 246 # almost impossible, but in case we have another alarm_cb at this time 247 if alarm_time in self.alarms: 248 self.alarms[alarm_time].append(alarm_cb) 249 else: 250 self.alarms[alarm_time] = [alarm_cb] 251 return alarm_time
252
253 - def remove_alarm(self, alarm_cb, alarm_time):
254 """ 255 Remove alarm callback alarm_cb scheduled on alarm_time. Returns True if 256 it was removed sucessfully, otherwise False 257 """ 258 if not alarm_time in self.alarms: 259 return False 260 i = -1 261 for i in range(len(self.alarms[alarm_time])): 262 # let's not modify the list inside the loop 263 if self.alarms[alarm_time][i] is alarm_cb: 264 break 265 if i != -1: 266 del self.alarms[alarm_time][i] 267 if self.alarms[alarm_time] == []: 268 del self.alarms[alarm_time] 269 return True 270 else: 271 return False
272
273 - def remove_timeout(self, fd, timeout=None):
274 """ 275 Remove the read timeout 276 """ 277 log.info('read timeout removed for fd %s' % fd) 278 if fd in self.read_timeouts: 279 if timeout: 280 if timeout in self.read_timeouts[fd]: 281 del(self.read_timeouts[fd][timeout]) 282 if len(self.read_timeouts[fd]) == 0: 283 del(self.read_timeouts[fd]) 284 else: 285 del(self.read_timeouts[fd])
286
287 - def set_read_timeout(self, fd, seconds, func=None):
288 """ 289 Seta a new timeout. If it is not removed after specified seconds, 290 func or obj.read_timeout() will be called 291 292 A filedescriptor fd can have several timeouts. 293 """ 294 log_txt = 'read timeout set for fd %s on %s seconds' % (fd, seconds) 295 if func: 296 log_txt += ' with function ' + str(func) 297 log.info(log_txt) 298 timeout = self.current_time() + seconds 299 if fd in self.read_timeouts: 300 self.read_timeouts[fd][timeout] = func 301 else: 302 self.read_timeouts[fd] = {timeout: func}
303
304 - def _check_time_events(self):
305 """ 306 Execute and remove alarm callbacks and execute func() or read_timeout() 307 for plugged objects if specified time has ellapsed 308 """ 309 current_time = self.current_time() 310 311 for fd, timeouts in self.read_timeouts.items(): 312 if fd not in self.queue: 313 self.remove_timeout(fd) 314 continue 315 for timeout, func in timeouts.items(): 316 if timeout > current_time: 317 continue 318 if func: 319 log.debug('Calling %s for fd %s' % (func, fd)) 320 func() 321 else: 322 log.debug('Calling read_timeout for fd %s' % fd) 323 self.queue[fd].read_timeout() 324 self.remove_timeout(fd, timeout) 325 326 times = self.alarms.keys() 327 for alarm_time in times: 328 if alarm_time > current_time: 329 continue 330 if alarm_time in self.alarms: 331 for callback in self.alarms[alarm_time]: 332 callback() 333 if alarm_time in self.alarms: 334 del(self.alarms[alarm_time])
335
336 - def plug_idle(self, obj, writable=True, readable=True):
337 """ 338 Plug an IdleObject into idlequeue. Filedescriptor fd must be set 339 340 :param obj: the IdleObject 341 :param writable: True if obj has data to sent 342 :param readable: True if obj expects data to be reiceived 343 """ 344 if obj.fd == -1: 345 return 346 if obj.fd in self.queue: 347 self.unplug_idle(obj.fd) 348 self.queue[obj.fd] = obj 349 if writable: 350 if not readable: 351 flags = FLAG_WRITE 352 else: 353 flags = FLAG_READ_WRITE 354 else: 355 if readable: 356 flags = FLAG_READ 357 else: 358 # when we paused a FT, we expect only a close event 359 flags = FLAG_CLOSE 360 self._add_idle(obj.fd, flags)
361
362 - def _add_idle(self, fd, flags):
363 """ 364 Hook method for subclasses, called by plug_idle 365 """ 366 self.selector.register(fd, flags)
367
368 - def unplug_idle(self, fd):
369 """ 370 Remove plugged IdleObject, specified by filedescriptor fd 371 """ 372 if fd in self.queue: 373 del(self.queue[fd]) 374 self._remove_idle(fd)
375
376 - def current_time(self):
377 from time import time 378 return time()
379
380 - def _remove_idle(self, fd):
381 """ 382 Hook method for subclassed, called by unplug_idle 383 """ 384 self.selector.unregister(fd)
385
386 - def _process_events(self, fd, flags):
387 obj = self.queue.get(fd) 388 if obj is None: 389 self.unplug_idle(fd) 390 return False 391 392 read_write = False 393 if flags & PENDING_READ: 394 #print 'waiting read on %d, flags are %d' % (fd, flags) 395 obj.pollin() 396 read_write = True 397 398 elif flags & PENDING_WRITE and not flags & IS_CLOSED: 399 obj.pollout() 400 read_write = True 401 402 if flags & IS_CLOSED: 403 # io error, don't expect more events 404 self.remove_timeout(obj.fd) 405 self.unplug_idle(obj.fd) 406 obj.pollend() 407 return False 408 409 if read_write: 410 return True 411 return False
412
413 - def process(self):
414 """ 415 Process idlequeue. Check for any pending timeout or alarm events. Call 416 IdleObjects on possible and requested read, write and error events on 417 their file descriptors 418 419 Call this in regular intervals. 420 """ 421 if not self.queue: 422 # check for timeouts/alert also when there are no active fds 423 self._check_time_events() 424 return True 425 try: 426 waiting_descriptors = self.selector.poll(0) 427 except select.error, e: 428 waiting_descriptors = [] 429 if e[0] != 4: # interrupt 430 raise 431 for fd, flags in waiting_descriptors: 432 self._process_events(fd, flags) 433 self._check_time_events() 434 return True
435 436
437 -class SelectIdleQueue(IdleQueue):
438 """ 439 Extends IdleQueue to use select.select() for polling 440 441 This class exisists for the sake of gtk2.8 on windows, which doesn't seem to 442 support io_add_watch properly (yet) 443 """ 444
445 - def _init_idle(self):
446 """ 447 Create a dict, which maps file/pipe/sock descriptor to glib event id 448 """ 449 self.read_fds = {} 450 self.write_fds = {} 451 self.error_fds = {}
452
453 - def _add_idle(self, fd, flags):
454 """ 455 This method is called when we plug a new idle object. Remove descriptor 456 to read/write/error lists, according flags 457 """ 458 if flags & 3: 459 self.read_fds[fd] = fd 460 if flags & 4: 461 self.write_fds[fd] = fd 462 self.error_fds[fd] = fd
463
464 - def _remove_idle(self, fd):
465 """ 466 This method is called when we unplug a new idle object. Remove descriptor 467 from read/write/error lists 468 """ 469 if fd in self.read_fds: 470 del(self.read_fds[fd]) 471 if fd in self.write_fds: 472 del(self.write_fds[fd]) 473 if fd in self.error_fds: 474 del(self.error_fds[fd])
475
476 - def process(self):
477 if not self.write_fds and not self.read_fds: 478 self._check_time_events() 479 return True 480 try: 481 waiting_descriptors = select.select(self.read_fds.keys(), 482 self.write_fds.keys(), self.error_fds.keys(), 0) 483 except select.error, e: 484 waiting_descriptors = ((), (), ()) 485 if e[0] != 4: # interrupt 486 raise 487 for fd in waiting_descriptors[0]: 488 q = self.queue.get(fd) 489 if q: 490 q.pollin() 491 for fd in waiting_descriptors[1]: 492 q = self.queue.get(fd) 493 if q: 494 q.pollout() 495 for fd in waiting_descriptors[2]: 496 q = self.queue.get(fd) 497 if q: 498 q.pollend() 499 self._check_time_events() 500 return True
501 502
503 -class GlibIdleQueue(IdleQueue):
504 """ 505 Extends IdleQueue to use glib io_add_wath, instead of select/poll In another 506 'non gui' implementation of Gajim IdleQueue can be used safetly 507 """ 508 509 # (timeout, boolean) 510 # Boolean is True if timeout is specified in seconds, False means miliseconds 511 PROCESS_TIMEOUT = (2, True) 512
513 - def _init_idle(self):
514 """ 515 Creates a dict, which maps file/pipe/sock descriptor to glib event id 516 """ 517 self.events = {} 518 # time() is already called in glib, we just get the last value 519 # overrides IdleQueue.current_time() 520 self.current_time = gobject.get_current_time
521
522 - def _add_idle(self, fd, flags):
523 """ 524 This method is called when we plug a new idle object. Start listening for 525 events from fd 526 """ 527 res = gobject.io_add_watch(fd, flags, self._process_events, 528 priority=gobject.PRIORITY_LOW) 529 # store the id of the watch, so that we can remove it on unplug 530 self.events[fd] = res
531
532 - def _process_events(self, fd, flags):
533 try: 534 return IdleQueue._process_events(self, fd, flags) 535 except Exception: 536 self._remove_idle(fd) 537 self._add_idle(fd, flags) 538 raise
539
540 - def _remove_idle(self, fd):
541 """ 542 This method is called when we unplug a new idle object. Stop listening 543 for events from fd 544 """ 545 if not fd in self.events: 546 return 547 gobject.source_remove(self.events[fd]) 548 del(self.events[fd])
549
550 - def process(self):
551 self._check_time_events()
552