Package nbxmpp :: Module dispatcher_nb
[hide private]
[frames] | no frames]

Source Code for Module nbxmpp.dispatcher_nb

  1  ##   dispatcher_nb.py 
  2  ##       based on dispatcher.py 
  3  ## 
  4  ##   Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov 
  5  ##       modified by Dimitur Kirov <dkirov@gmail.com> 
  6  ## 
  7  ##   This program is free software; you can redistribute it and/or modify 
  8  ##   it under the terms of the GNU General Public License as published by 
  9  ##   the Free Software Foundation; either version 2, or (at your option) 
 10  ##   any later version. 
 11  ## 
 12  ##   This program is distributed in the hope that it will be useful, 
 13  ##   but WITHOUT ANY WARRANTY; without even the implied warranty of 
 14  ##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 15  ##   GNU General Public License for more details. 
 16   
 17   
 18  """ 
 19  Main xmpp decision making logic. Provides library with methods to assign 
 20  different handlers to different XMPP stanzas and namespaces 
 21  """ 
 22   
 23  import simplexml, sys, locale 
 24  import re 
 25  from xml.parsers.expat import ExpatError 
 26  from plugin import PlugIn 
 27  from protocol import (NS_STREAMS, NS_XMPP_STREAMS, NS_HTTP_BIND, Iq, Presence, 
 28          Message, Protocol, Node, Error, ERR_FEATURE_NOT_IMPLEMENTED, StreamError) 
 29  import logging 
 30  log = logging.getLogger('nbxmpp.dispatcher_nb') 
 31   
 32  #: default timeout to wait for response for our id 
 33  DEFAULT_TIMEOUT_SECONDS = 25 
 34  outgoingID = 0 
 35   
 36  XML_DECLARATION = '<?xml version=\'1.0\'?>' 
37 38 # FIXME: ugly 39 -class Dispatcher():
40 """ 41 Why is this here - I needed to redefine Dispatcher for BOSH and easiest way 42 was to inherit original Dispatcher (now renamed to XMPPDispatcher). Trouble 43 is that reference used to access dispatcher instance is in Client attribute 44 named by __class__.__name__ of the dispatcher instance .. long story short: 45 46 I wrote following to avoid changing each client.Dispatcher.whatever() in xmpp 47 48 If having two kinds of dispatcher will go well, I will rewrite the dispatcher 49 references in other scripts 50 """ 51
52 - def PlugIn(self, client_obj, after_SASL=False, old_features=None):
53 if client_obj.protocol_type == 'XMPP': 54 XMPPDispatcher().PlugIn(client_obj) 55 elif client_obj.protocol_type == 'BOSH': 56 BOSHDispatcher().PlugIn(client_obj, after_SASL, old_features) 57 else: 58 assert False # should never be reached
59 60 @classmethod
61 - def get_instance(cls, *args, **kwargs):
62 """ 63 Factory Method for object creation 64 65 Use this instead of directly initializing the class in order to make 66 unit testing much easier. 67 """ 68 return cls(*args, **kwargs)
69
70 71 -class XMPPDispatcher(PlugIn):
72 """ 73 Handles XMPP stream and is the first who takes control over a fresh stanza 74 75 Is plugged into NonBlockingClient but can be replugged to restart handled 76 stream headers (used by SASL f.e.). 77 """ 78
79 - def __init__(self):
80 PlugIn.__init__(self) 81 self.handlers = {} 82 self._expected = {} 83 self._defaultHandler = None 84 self._pendingExceptions = [] 85 self._eventHandler = None 86 self._cycleHandlers = [] 87 self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler, 88 self.RegisterEventHandler, self.UnregisterCycleHandler, 89 self.RegisterCycleHandler, self.RegisterHandlerOnce, 90 self.UnregisterHandler, self.RegisterProtocol, 91 self.SendAndWaitForResponse, self.SendAndCallForResponse, 92 self.getAnID, self.Event, self.send] 93 94 # Let the dispatcher know if there is support for stream management 95 self.sm = None 96 97 # \ufddo -> \ufdef range 98 c = u'\ufdd0' 99 r = c.encode('utf8') 100 while (c < u'\ufdef'): 101 c = unichr(ord(c) + 1) 102 r += '|' + c.encode('utf8') 103 104 # \ufffe-\uffff, \u1fffe-\u1ffff, ..., \u10fffe-\u10ffff 105 c = u'\ufffe' 106 r += '|' + c.encode('utf8') 107 r += '|' + unichr(ord(c) + 1).encode('utf8') 108 while (c < u'\U0010fffe'): 109 c = unichr(ord(c) + 0x10000) 110 r += '|' + c.encode('utf8') 111 r += '|' + unichr(ord(c) + 1).encode('utf8') 112 113 self.invalid_chars_re = re.compile(r)
114
115 - def getAnID(self):
116 global outgoingID 117 outgoingID += 1 118 return repr(outgoingID)
119
120 - def dumpHandlers(self):
121 """ 122 Return set of user-registered callbacks in it's internal format. Used 123 within the library to carry user handlers set over Dispatcher replugins 124 """ 125 return self.handlers
126
127 - def restoreHandlers(self, handlers):
128 """ 129 Restore user-registered callbacks structure from dump previously obtained 130 via dumpHandlers. Used within the library to carry user handlers set over 131 Dispatcher replugins. 132 """ 133 self.handlers = handlers
134
135 - def _init(self):
136 """ 137 Register default namespaces/protocols/handlers. Used internally 138 """ 139 # FIXME: inject dependencies, do not rely that they are defined by our 140 # owner 141 self.RegisterNamespace('unknown') 142 self.RegisterNamespace(NS_STREAMS) 143 self.RegisterNamespace(self._owner.defaultNamespace) 144 self.RegisterProtocol('iq', Iq) 145 self.RegisterProtocol('presence', Presence) 146 self.RegisterProtocol('message', Message) 147 self.RegisterDefaultHandler(self.returnStanzaHandler) 148 self.RegisterEventHandler(self._owner._caller._event_dispatcher) 149 self.on_responses = {}
150
151 - def plugin(self, owner):
152 """ 153 Plug the Dispatcher instance into Client class instance and send initial 154 stream header. Used internally 155 """ 156 self._init() 157 self._owner.lastErrNode = None 158 self._owner.lastErr = None 159 self._owner.lastErrCode = None 160 if hasattr(self._owner, 'StreamInit'): 161 self._owner.StreamInit() 162 else: 163 self.StreamInit()
164
165 - def plugout(self):
166 """ 167 Prepare instance to be destructed 168 """ 169 self.Stream.dispatch = None 170 self.Stream.features = None 171 self.Stream.destroy() 172 self._owner = None 173 self.Stream = None
174
175 - def StreamInit(self):
176 """ 177 Send an initial stream header 178 """ 179 self._owner.Connection.sendqueue = [] 180 self.Stream = simplexml.NodeBuilder() 181 self.Stream.dispatch = self.dispatch 182 self.Stream._dispatch_depth = 2 183 self.Stream.stream_header_received = self._check_stream_start 184 self.Stream.features = None 185 self._metastream = Node('stream:stream') 186 self._metastream.setNamespace(self._owner.Namespace) 187 self._metastream.setAttr('version', '1.0') 188 self._metastream.setAttr('xmlns:stream', NS_STREAMS) 189 self._metastream.setAttr('to', self._owner.Server) 190 if locale.getdefaultlocale()[0]: 191 self._metastream.setAttr('xml:lang', 192 locale.getdefaultlocale()[0].split('_')[0]) 193 self._owner.send("%s%s>" % (XML_DECLARATION, str(self._metastream)[:-2]))
194
195 - def _check_stream_start(self, ns, tag, attrs):
196 if ns != NS_STREAMS or tag!='stream': 197 raise ValueError('Incorrect stream start: (%s,%s). Terminating.' 198 % (tag, ns))
199
200 - def replace_non_character(self, data):
201 return re.sub(self.invalid_chars_re, u'\ufffd'.encode('utf-8'), data)
202
203 - def ProcessNonBlocking(self, data):
204 """ 205 Check incoming stream for data waiting 206 207 :param data: data received from transports/IO sockets 208 :return: 209 1) length of processed data if some data were processed; 210 2) '0' string if no data were processed but link is alive; 211 3) 0 (zero) if underlying connection is closed. 212 """ 213 # FIXME: 214 # When an error occurs we disconnect the transport directly. Client's 215 # disconnect method will never be called. 216 # Is this intended? 217 # also look at transports start_disconnect() 218 data = self.replace_non_character(data) 219 for handler in self._cycleHandlers: 220 handler(self) 221 if len(self._pendingExceptions) > 0: 222 _pendingException = self._pendingExceptions.pop() 223 raise _pendingException[0], _pendingException[1], _pendingException[2] 224 try: 225 self.Stream.Parse(data) 226 # end stream:stream tag received 227 if self.Stream and self.Stream.has_received_endtag(): 228 self._owner.disconnect(self.Stream.streamError) 229 return 0 230 except ExpatError: 231 log.error('Invalid XML received from server. Forcing disconnect.') 232 self._owner.Connection.disconnect() 233 return 0 234 except ValueError, e: 235 log.debug('ValueError: %s' % str(e)) 236 self._owner.Connection.pollend() 237 return 0 238 if len(self._pendingExceptions) > 0: 239 _pendingException = self._pendingExceptions.pop() 240 raise _pendingException[0], _pendingException[1], _pendingException[2] 241 if len(data) == 0: 242 return '0' 243 return len(data)
244
245 - def RegisterNamespace(self, xmlns, order='info'):
246 """ 247 Create internal structures for newly registered namespace 248 249 You can register handlers for this namespace afterwards. By default 250 one namespace is already registered 251 (jabber:client or jabber:component:accept depending on context. 252 """ 253 log.debug('Registering namespace "%s"' % xmlns) 254 self.handlers[xmlns] = {} 255 self.RegisterProtocol('unknown', Protocol, xmlns=xmlns) 256 self.RegisterProtocol('default', Protocol, xmlns=xmlns)
257
258 - def RegisterProtocol(self, tag_name, Proto, xmlns=None, order='info'):
259 """ 260 Used to declare some top-level stanza name to dispatcher 261 262 Needed to start registering handlers for such stanzas. Iq, message and 263 presence protocols are registered by default. 264 """ 265 if not xmlns: 266 xmlns=self._owner.defaultNamespace 267 log.debug('Registering protocol "%s" as %s(%s)' %(tag_name, Proto, xmlns)) 268 self.handlers[xmlns][tag_name] = {type:Proto, 'default':[]}
269
270 - def RegisterNamespaceHandler(self, xmlns, handler, typ='', ns='', 271 makefirst=0, system=0):
272 """ 273 Register handler for processing all stanzas for specified namespace 274 """ 275 self.RegisterHandler('default', handler, typ, ns, xmlns, makefirst, 276 system)
277
278 - def RegisterHandler(self, name, handler, typ='', ns='', xmlns=None, 279 makefirst=False, system=False):
280 """ 281 Register user callback as stanzas handler of declared type 282 283 Callback arguments: 284 dispatcher instance (for replying), incoming return of previous handlers. 285 The callback must raise xmpp.NodeProcessed just before return if it wants 286 to prevent other callbacks to be called with the same stanza as argument 287 _and_, more importantly library from returning stanza to sender with error set. 288 289 :param name: name of stanza. F.e. "iq". 290 :param handler: user callback. 291 :param typ: value of stanza's "type" attribute. If not specified any 292 value will match 293 :param ns: namespace of child that stanza must contain. 294 :param makefirst: insert handler in the beginning of handlers list instead 295 of adding it to the end. Note that more common handlers i.e. w/o "typ" 296 and " will be called first nevertheless. 297 :param system: call handler even if NodeProcessed Exception were raised 298 already. 299 """ 300 if not xmlns: 301 xmlns=self._owner.defaultNamespace 302 log.debug('Registering handler %s for "%s" type->%s ns->%s(%s)' % 303 (handler, name, typ, ns, xmlns)) 304 if not typ and not ns: 305 typ='default' 306 if xmlns not in self.handlers: 307 self.RegisterNamespace(xmlns, 'warn') 308 if name not in self.handlers[xmlns]: 309 self.RegisterProtocol(name, Protocol, xmlns, 'warn') 310 if typ+ns not in self.handlers[xmlns][name]: 311 self.handlers[xmlns][name][typ+ns]=[] 312 if makefirst: 313 self.handlers[xmlns][name][typ+ns].insert(0, {'func':handler, 314 'system':system}) 315 else: 316 self.handlers[xmlns][name][typ+ns].append({'func':handler, 317 'system':system})
318
319 - def RegisterHandlerOnce(self, name, handler, typ='', ns='', xmlns=None, 320 makefirst=0, system=0):
321 """ 322 Unregister handler after first call (not implemented yet) 323 """ 324 # FIXME Drop or implement 325 if not xmlns: 326 xmlns = self._owner.defaultNamespace 327 self.RegisterHandler(name, handler, typ, ns, xmlns, makefirst, system)
328
329 - def UnregisterHandler(self, name, handler, typ='', ns='', xmlns=None):
330 """ 331 Unregister handler. "typ" and "ns" must be specified exactly the same as 332 with registering. 333 """ 334 if not xmlns: 335 xmlns = self._owner.defaultNamespace 336 if not typ and not ns: 337 typ='default' 338 if xmlns not in self.handlers: 339 return 340 if name not in self.handlers[xmlns]: 341 return 342 if typ+ns not in self.handlers[xmlns][name]: 343 return 344 for pack in self.handlers[xmlns][name][typ+ns]: 345 if pack['func'] == handler: 346 try: 347 self.handlers[xmlns][name][typ+ns].remove(pack) 348 except ValueError: 349 pass
350
351 - def RegisterDefaultHandler(self, handler):
352 """ 353 Specify the handler that will be used if no NodeProcessed exception were 354 raised. This is returnStanzaHandler by default. 355 """ 356 self._defaultHandler = handler
357
358 - def RegisterEventHandler(self, handler):
359 """ 360 Register handler that will process events. F.e. "FILERECEIVED" event. See 361 common/connection: _event_dispatcher() 362 """ 363 self._eventHandler = handler
364
365 - def returnStanzaHandler(self, conn, stanza):
366 """ 367 Return stanza back to the sender with <feature-not-implemented/> error 368 set 369 """ 370 if stanza.getType() in ('get', 'set'): 371 conn._owner.send(Error(stanza, ERR_FEATURE_NOT_IMPLEMENTED))
372
373 - def RegisterCycleHandler(self, handler):
374 """ 375 Register handler that will be called on every Dispatcher.Process() call 376 """ 377 if handler not in self._cycleHandlers: 378 self._cycleHandlers.append(handler)
379
380 - def UnregisterCycleHandler(self, handler):
381 """ 382 Unregister handler that will is called on every Dispatcher.Process() call 383 """ 384 if handler in self._cycleHandlers: 385 self._cycleHandlers.remove(handler)
386
387 - def Event(self, realm, event, data):
388 """ 389 Raise some event 390 391 :param realm: scope of event. Usually a namespace. 392 :param event: the event itself. F.e. "SUCCESSFUL SEND". 393 :param data: data that comes along with event. Depends on event. 394 """ 395 if self._eventHandler: 396 self._eventHandler(realm, event, data) 397 else: 398 log.warning('Received unhandled event: %s' % event)
399
400 - def dispatch(self, stanza, session=None, direct=0):
401 """ 402 Main procedure that performs XMPP stanza recognition and calling 403 apppropriate handlers for it. Called by simplexml 404 """ 405 # FIXME: Where do we set session and direct. Why? What are those intended 406 # to do? 407 408 #log.info('dispatch called: stanza = %s, session = %s, direct= %s' 409 # % (stanza, session, direct)) 410 if not session: 411 session = self 412 session.Stream._mini_dom = None 413 name = stanza.getName() 414 415 if name == 'features': 416 self._owner.got_features = True 417 session.Stream.features = stanza 418 elif name == 'error': 419 if stanza.getTag('see-other-host'): 420 self._owner.got_see_other_host = stanza 421 422 xmlns = stanza.getNamespace() 423 424 # log.info('in dispatch, getting ns for %s, and the ns is %s' 425 # % (stanza, xmlns)) 426 if xmlns not in self.handlers: 427 log.warn("Unknown namespace: " + xmlns) 428 xmlns = 'unknown' 429 # features stanza has been handled before 430 if name not in self.handlers[xmlns]: 431 if name != 'features': 432 log.warn("Unknown stanza: " + name) 433 else: 434 log.debug("Got %s/%s stanza" % (xmlns, name)) 435 name='unknown' 436 else: 437 log.debug("Got %s/%s stanza" % (xmlns, name)) 438 439 if stanza.__class__.__name__ == 'Node': 440 # FIXME: this cannot work 441 stanza=self.handlers[xmlns][name][type](node=stanza) 442 443 typ = stanza.getType() 444 if not typ: 445 typ = '' 446 stanza.props = stanza.getProperties() 447 ID = stanza.getID() 448 449 # If server supports stream management 450 if self.sm and self.sm.enabled and (stanza.getName() != 'r' and 451 stanza.getName() != 'a' and stanza.getName() != 'enabled' and 452 stanza.getName() != 'resumed'): 453 # increments the number of stanzas that has been handled 454 self.sm.in_h = self.sm.in_h + 1 455 list_ = ['default'] # we will use all handlers: 456 if typ in self.handlers[xmlns][name]: 457 list_.append(typ) # from very common... 458 for prop in stanza.props: 459 if prop in self.handlers[xmlns][name]: 460 list_.append(prop) 461 if typ and typ+prop in self.handlers[xmlns][name]: 462 list_.append(typ+prop) # ...to very particular 463 464 chain = self.handlers[xmlns]['default']['default'] 465 for key in list_: 466 if key: 467 chain = chain + self.handlers[xmlns][name][key] 468 469 if ID in session._expected: 470 user = 0 471 if isinstance(session._expected[ID], tuple): 472 cb, args = session._expected[ID] 473 log.debug("Expected stanza arrived. Callback %s(%s) found!" % 474 (cb, args)) 475 try: 476 cb(session,stanza,**args) 477 except Exception, typ: 478 if typ.__class__.__name__ != 'NodeProcessed': 479 raise 480 else: 481 log.debug("Expected stanza arrived!") 482 session._expected[ID] = stanza 483 else: 484 user = 1 485 for handler in chain: 486 if user or handler['system']: 487 try: 488 handler['func'](session, stanza) 489 except Exception, typ: 490 if typ.__class__.__name__ != 'NodeProcessed': 491 self._pendingExceptions.insert(0, sys.exc_info()) 492 return 493 user=0 494 if user and self._defaultHandler: 495 self._defaultHandler(session, stanza)
496
497 - def _WaitForData(self, data):
498 """ 499 Internal wrapper around ProcessNonBlocking. Will check for 500 """ 501 if data is None: 502 return 503 res = self.ProcessNonBlocking(data) 504 # 0 result indicates that we have closed the connection, e.g. 505 # we have released dispatcher, so self._owner has no methods 506 if not res: 507 return 508 for (_id, _iq) in self._expected.items(): 509 if _iq is None: 510 # If the expected Stanza would have arrived, ProcessNonBlocking 511 # would have placed the reply stanza in there 512 continue 513 if _id in self.on_responses: 514 if len(self._expected) == 1: 515 self._owner.onreceive(None) 516 resp, args = self.on_responses[_id] 517 del self.on_responses[_id] 518 if args is None: 519 resp(_iq) 520 else: 521 resp(self._owner, _iq, **args) 522 del self._expected[_id]
523
524 - def SendAndWaitForResponse(self, stanza, timeout=None, func=None, args=None):
525 """ 526 Send stanza and wait for recipient's response to it. Will call transports 527 on_timeout callback if response is not retrieved in time 528 529 Be aware: Only timeout of latest call of SendAndWait is active. 530 """ 531 if timeout is None: 532 timeout = DEFAULT_TIMEOUT_SECONDS 533 _waitid = self.send(stanza) 534 if func: 535 self.on_responses[_waitid] = (func, args) 536 if timeout: 537 self._owner.set_timeout(timeout) 538 self._owner.onreceive(self._WaitForData) 539 self._expected[_waitid] = None 540 return _waitid
541
542 - def SendAndCallForResponse(self, stanza, func=None, args=None):
543 """ 544 Put stanza on the wire and call back when recipient replies. Additional 545 callback arguments can be specified in args 546 """ 547 self.SendAndWaitForResponse(stanza, 0, func, args)
548
549 - def send(self, stanza, now=False):
550 """ 551 Wrap transports send method when plugged into NonBlockingClient. Makes 552 sure stanzas get ID and from tag. 553 """ 554 ID = None 555 if type(stanza) not in [type(''), type(u'')]: 556 if isinstance(stanza, Protocol): 557 ID = stanza.getID() 558 if ID is None: 559 stanza.setID(self.getAnID()) 560 ID = stanza.getID() 561 if self._owner._registered_name and not stanza.getAttr('from'): 562 stanza.setAttr('from', self._owner._registered_name) 563 564 # If no ID then it is a whitespace 565 if self.sm and self.sm.enabled and ID: 566 self.sm.uqueue.append(stanza) 567 self.sm.out_h = self.sm.out_h + 1 568 if len(self.sm.uqueue) > self.sm.max_queue: 569 self.sm.request_ack() 570 571 self._owner.Connection.send(stanza, now) 572 return ID
573
574 575 -class BOSHDispatcher(XMPPDispatcher):
576
577 - def PlugIn(self, owner, after_SASL=False, old_features=None):
578 self.old_features = old_features 579 self.after_SASL = after_SASL 580 XMPPDispatcher.PlugIn(self, owner)
581
582 - def StreamInit(self):
583 """ 584 Send an initial stream header 585 """ 586 self.Stream = simplexml.NodeBuilder() 587 self.Stream.dispatch = self.dispatch 588 self.Stream._dispatch_depth = 2 589 self.Stream.stream_header_received = self._check_stream_start 590 self.Stream.features = self.old_features 591 592 self._metastream = Node('stream:stream') 593 self._metastream.setNamespace(self._owner.Namespace) 594 self._metastream.setAttr('version', '1.0') 595 self._metastream.setAttr('xmlns:stream', NS_STREAMS) 596 self._metastream.setAttr('to', self._owner.Server) 597 if locale.getdefaultlocale()[0]: 598 self._metastream.setAttr('xml:lang', 599 locale.getdefaultlocale()[0].split('_')[0]) 600 601 self.restart = True 602 self._owner.Connection.send_init(after_SASL=self.after_SASL)
603
604 - def StreamTerminate(self):
605 """ 606 Send a stream terminator 607 """ 608 self._owner.Connection.send_terminator()
609
610 - def ProcessNonBlocking(self, data=None):
611 if self.restart: 612 fromstream = self._metastream 613 fromstream.setAttr('from', fromstream.getAttr('to')) 614 fromstream.delAttr('to') 615 data = '%s%s>%s' % (XML_DECLARATION, str(fromstream)[:-2], data) 616 self.restart = False 617 return XMPPDispatcher.ProcessNonBlocking(self, data)
618
619 - def dispatch(self, stanza, session=None, direct=0):
620 if stanza.getName() == 'body' and stanza.getNamespace() == NS_HTTP_BIND: 621 622 stanza_attrs = stanza.getAttrs() 623 if 'authid' in stanza_attrs: 624 # should be only in init response 625 # auth module expects id of stream in document attributes 626 self.Stream._document_attrs['id'] = stanza_attrs['authid'] 627 self._owner.Connection.handle_body_attrs(stanza_attrs) 628 629 children = stanza.getChildren() 630 if children: 631 for child in children: 632 # if child doesn't have any ns specified, simplexml (or expat) 633 # thinks it's of parent's (BOSH body) namespace, so we have to 634 # rewrite it to jabber:client 635 if child.getNamespace() == NS_HTTP_BIND: 636 child.setNamespace(self._owner.defaultNamespace) 637 XMPPDispatcher.dispatch(self, child, session, direct) 638 else: 639 XMPPDispatcher.dispatch(self, stanza, session, direct)
640