1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 Main xmpp decision making logic. Provides library with methods to assign
20 different handlers to different XMPP stanzas and namespaces
21 """
22
23 import simplexml, sys, locale
24 import re
25 from xml.parsers.expat import ExpatError
26 from plugin import PlugIn
27 from protocol import (NS_STREAMS, NS_XMPP_STREAMS, NS_HTTP_BIND, Iq, Presence,
28 Message, Protocol, Node, Error, ERR_FEATURE_NOT_IMPLEMENTED, StreamError)
29 import logging
30 log = logging.getLogger('nbxmpp.dispatcher_nb')
31
32
33 DEFAULT_TIMEOUT_SECONDS = 25
34 outgoingID = 0
35
36 XML_DECLARATION = '<?xml version=\'1.0\'?>'
40 """
41 Why is this here - I needed to redefine Dispatcher for BOSH and easiest way
42 was to inherit original Dispatcher (now renamed to XMPPDispatcher). Trouble
43 is that reference used to access dispatcher instance is in Client attribute
44 named by __class__.__name__ of the dispatcher instance .. long story short:
45
46 I wrote following to avoid changing each client.Dispatcher.whatever() in xmpp
47
48 If having two kinds of dispatcher will go well, I will rewrite the dispatcher
49 references in other scripts
50 """
51
52 - def PlugIn(self, client_obj, after_SASL=False, old_features=None):
53 if client_obj.protocol_type == 'XMPP':
54 XMPPDispatcher().PlugIn(client_obj)
55 elif client_obj.protocol_type == 'BOSH':
56 BOSHDispatcher().PlugIn(client_obj, after_SASL, old_features)
57 else:
58 assert False
59
60 @classmethod
62 """
63 Factory Method for object creation
64
65 Use this instead of directly initializing the class in order to make
66 unit testing much easier.
67 """
68 return cls(*args, **kwargs)
69
72 """
73 Handles XMPP stream and is the first who takes control over a fresh stanza
74
75 Is plugged into NonBlockingClient but can be replugged to restart handled
76 stream headers (used by SASL f.e.).
77 """
78
80 PlugIn.__init__(self)
81 self.handlers = {}
82 self._expected = {}
83 self._defaultHandler = None
84 self._pendingExceptions = []
85 self._eventHandler = None
86 self._cycleHandlers = []
87 self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler,
88 self.RegisterEventHandler, self.UnregisterCycleHandler,
89 self.RegisterCycleHandler, self.RegisterHandlerOnce,
90 self.UnregisterHandler, self.RegisterProtocol,
91 self.SendAndWaitForResponse, self.SendAndCallForResponse,
92 self.getAnID, self.Event, self.send]
93
94
95 self.sm = None
96
97
98 c = u'\ufdd0'
99 r = c.encode('utf8')
100 while (c < u'\ufdef'):
101 c = unichr(ord(c) + 1)
102 r += '|' + c.encode('utf8')
103
104
105 c = u'\ufffe'
106 r += '|' + c.encode('utf8')
107 r += '|' + unichr(ord(c) + 1).encode('utf8')
108 while (c < u'\U0010fffe'):
109 c = unichr(ord(c) + 0x10000)
110 r += '|' + c.encode('utf8')
111 r += '|' + unichr(ord(c) + 1).encode('utf8')
112
113 self.invalid_chars_re = re.compile(r)
114
119
121 """
122 Return set of user-registered callbacks in it's internal format. Used
123 within the library to carry user handlers set over Dispatcher replugins
124 """
125 return self.handlers
126
128 """
129 Restore user-registered callbacks structure from dump previously obtained
130 via dumpHandlers. Used within the library to carry user handlers set over
131 Dispatcher replugins.
132 """
133 self.handlers = handlers
134
150
152 """
153 Plug the Dispatcher instance into Client class instance and send initial
154 stream header. Used internally
155 """
156 self._init()
157 self._owner.lastErrNode = None
158 self._owner.lastErr = None
159 self._owner.lastErrCode = None
160 if hasattr(self._owner, 'StreamInit'):
161 self._owner.StreamInit()
162 else:
163 self.StreamInit()
164
166 """
167 Prepare instance to be destructed
168 """
169 self.Stream.dispatch = None
170 self.Stream.features = None
171 self.Stream.destroy()
172 self._owner = None
173 self.Stream = None
174
176 """
177 Send an initial stream header
178 """
179 self._owner.Connection.sendqueue = []
180 self.Stream = simplexml.NodeBuilder()
181 self.Stream.dispatch = self.dispatch
182 self.Stream._dispatch_depth = 2
183 self.Stream.stream_header_received = self._check_stream_start
184 self.Stream.features = None
185 self._metastream = Node('stream:stream')
186 self._metastream.setNamespace(self._owner.Namespace)
187 self._metastream.setAttr('version', '1.0')
188 self._metastream.setAttr('xmlns:stream', NS_STREAMS)
189 self._metastream.setAttr('to', self._owner.Server)
190 if locale.getdefaultlocale()[0]:
191 self._metastream.setAttr('xml:lang',
192 locale.getdefaultlocale()[0].split('_')[0])
193 self._owner.send("%s%s>" % (XML_DECLARATION, str(self._metastream)[:-2]))
194
196 if ns != NS_STREAMS or tag!='stream':
197 raise ValueError('Incorrect stream start: (%s,%s). Terminating.'
198 % (tag, ns))
199
201 return re.sub(self.invalid_chars_re, u'\ufffd'.encode('utf-8'), data)
202
204 """
205 Check incoming stream for data waiting
206
207 :param data: data received from transports/IO sockets
208 :return:
209 1) length of processed data if some data were processed;
210 2) '0' string if no data were processed but link is alive;
211 3) 0 (zero) if underlying connection is closed.
212 """
213
214
215
216
217
218 data = self.replace_non_character(data)
219 for handler in self._cycleHandlers:
220 handler(self)
221 if len(self._pendingExceptions) > 0:
222 _pendingException = self._pendingExceptions.pop()
223 raise _pendingException[0], _pendingException[1], _pendingException[2]
224 try:
225 self.Stream.Parse(data)
226
227 if self.Stream and self.Stream.has_received_endtag():
228 self._owner.disconnect(self.Stream.streamError)
229 return 0
230 except ExpatError:
231 log.error('Invalid XML received from server. Forcing disconnect.')
232 self._owner.Connection.disconnect()
233 return 0
234 except ValueError, e:
235 log.debug('ValueError: %s' % str(e))
236 self._owner.Connection.pollend()
237 return 0
238 if len(self._pendingExceptions) > 0:
239 _pendingException = self._pendingExceptions.pop()
240 raise _pendingException[0], _pendingException[1], _pendingException[2]
241 if len(data) == 0:
242 return '0'
243 return len(data)
244
246 """
247 Create internal structures for newly registered namespace
248
249 You can register handlers for this namespace afterwards. By default
250 one namespace is already registered
251 (jabber:client or jabber:component:accept depending on context.
252 """
253 log.debug('Registering namespace "%s"' % xmlns)
254 self.handlers[xmlns] = {}
255 self.RegisterProtocol('unknown', Protocol, xmlns=xmlns)
256 self.RegisterProtocol('default', Protocol, xmlns=xmlns)
257
259 """
260 Used to declare some top-level stanza name to dispatcher
261
262 Needed to start registering handlers for such stanzas. Iq, message and
263 presence protocols are registered by default.
264 """
265 if not xmlns:
266 xmlns=self._owner.defaultNamespace
267 log.debug('Registering protocol "%s" as %s(%s)' %(tag_name, Proto, xmlns))
268 self.handlers[xmlns][tag_name] = {type:Proto, 'default':[]}
269
272 """
273 Register handler for processing all stanzas for specified namespace
274 """
275 self.RegisterHandler('default', handler, typ, ns, xmlns, makefirst,
276 system)
277
278 - def RegisterHandler(self, name, handler, typ='', ns='', xmlns=None,
279 makefirst=False, system=False):
280 """
281 Register user callback as stanzas handler of declared type
282
283 Callback arguments:
284 dispatcher instance (for replying), incoming return of previous handlers.
285 The callback must raise xmpp.NodeProcessed just before return if it wants
286 to prevent other callbacks to be called with the same stanza as argument
287 _and_, more importantly library from returning stanza to sender with error set.
288
289 :param name: name of stanza. F.e. "iq".
290 :param handler: user callback.
291 :param typ: value of stanza's "type" attribute. If not specified any
292 value will match
293 :param ns: namespace of child that stanza must contain.
294 :param makefirst: insert handler in the beginning of handlers list instead
295 of adding it to the end. Note that more common handlers i.e. w/o "typ"
296 and " will be called first nevertheless.
297 :param system: call handler even if NodeProcessed Exception were raised
298 already.
299 """
300 if not xmlns:
301 xmlns=self._owner.defaultNamespace
302 log.debug('Registering handler %s for "%s" type->%s ns->%s(%s)' %
303 (handler, name, typ, ns, xmlns))
304 if not typ and not ns:
305 typ='default'
306 if xmlns not in self.handlers:
307 self.RegisterNamespace(xmlns, 'warn')
308 if name not in self.handlers[xmlns]:
309 self.RegisterProtocol(name, Protocol, xmlns, 'warn')
310 if typ+ns not in self.handlers[xmlns][name]:
311 self.handlers[xmlns][name][typ+ns]=[]
312 if makefirst:
313 self.handlers[xmlns][name][typ+ns].insert(0, {'func':handler,
314 'system':system})
315 else:
316 self.handlers[xmlns][name][typ+ns].append({'func':handler,
317 'system':system})
318
319 - def RegisterHandlerOnce(self, name, handler, typ='', ns='', xmlns=None,
320 makefirst=0, system=0):
321 """
322 Unregister handler after first call (not implemented yet)
323 """
324
325 if not xmlns:
326 xmlns = self._owner.defaultNamespace
327 self.RegisterHandler(name, handler, typ, ns, xmlns, makefirst, system)
328
330 """
331 Unregister handler. "typ" and "ns" must be specified exactly the same as
332 with registering.
333 """
334 if not xmlns:
335 xmlns = self._owner.defaultNamespace
336 if not typ and not ns:
337 typ='default'
338 if xmlns not in self.handlers:
339 return
340 if name not in self.handlers[xmlns]:
341 return
342 if typ+ns not in self.handlers[xmlns][name]:
343 return
344 for pack in self.handlers[xmlns][name][typ+ns]:
345 if pack['func'] == handler:
346 try:
347 self.handlers[xmlns][name][typ+ns].remove(pack)
348 except ValueError:
349 pass
350
352 """
353 Specify the handler that will be used if no NodeProcessed exception were
354 raised. This is returnStanzaHandler by default.
355 """
356 self._defaultHandler = handler
357
359 """
360 Register handler that will process events. F.e. "FILERECEIVED" event. See
361 common/connection: _event_dispatcher()
362 """
363 self._eventHandler = handler
364
372
374 """
375 Register handler that will be called on every Dispatcher.Process() call
376 """
377 if handler not in self._cycleHandlers:
378 self._cycleHandlers.append(handler)
379
381 """
382 Unregister handler that will is called on every Dispatcher.Process() call
383 """
384 if handler in self._cycleHandlers:
385 self._cycleHandlers.remove(handler)
386
387 - def Event(self, realm, event, data):
388 """
389 Raise some event
390
391 :param realm: scope of event. Usually a namespace.
392 :param event: the event itself. F.e. "SUCCESSFUL SEND".
393 :param data: data that comes along with event. Depends on event.
394 """
395 if self._eventHandler:
396 self._eventHandler(realm, event, data)
397 else:
398 log.warning('Received unhandled event: %s' % event)
399
400 - def dispatch(self, stanza, session=None, direct=0):
401 """
402 Main procedure that performs XMPP stanza recognition and calling
403 apppropriate handlers for it. Called by simplexml
404 """
405
406
407
408
409
410 if not session:
411 session = self
412 session.Stream._mini_dom = None
413 name = stanza.getName()
414
415 if name == 'features':
416 self._owner.got_features = True
417 session.Stream.features = stanza
418 elif name == 'error':
419 if stanza.getTag('see-other-host'):
420 self._owner.got_see_other_host = stanza
421
422 xmlns = stanza.getNamespace()
423
424
425
426 if xmlns not in self.handlers:
427 log.warn("Unknown namespace: " + xmlns)
428 xmlns = 'unknown'
429
430 if name not in self.handlers[xmlns]:
431 if name != 'features':
432 log.warn("Unknown stanza: " + name)
433 else:
434 log.debug("Got %s/%s stanza" % (xmlns, name))
435 name='unknown'
436 else:
437 log.debug("Got %s/%s stanza" % (xmlns, name))
438
439 if stanza.__class__.__name__ == 'Node':
440
441 stanza=self.handlers[xmlns][name][type](node=stanza)
442
443 typ = stanza.getType()
444 if not typ:
445 typ = ''
446 stanza.props = stanza.getProperties()
447 ID = stanza.getID()
448
449
450 if self.sm and self.sm.enabled and (stanza.getName() != 'r' and
451 stanza.getName() != 'a' and stanza.getName() != 'enabled' and
452 stanza.getName() != 'resumed'):
453
454 self.sm.in_h = self.sm.in_h + 1
455 list_ = ['default']
456 if typ in self.handlers[xmlns][name]:
457 list_.append(typ)
458 for prop in stanza.props:
459 if prop in self.handlers[xmlns][name]:
460 list_.append(prop)
461 if typ and typ+prop in self.handlers[xmlns][name]:
462 list_.append(typ+prop)
463
464 chain = self.handlers[xmlns]['default']['default']
465 for key in list_:
466 if key:
467 chain = chain + self.handlers[xmlns][name][key]
468
469 if ID in session._expected:
470 user = 0
471 if isinstance(session._expected[ID], tuple):
472 cb, args = session._expected[ID]
473 log.debug("Expected stanza arrived. Callback %s(%s) found!" %
474 (cb, args))
475 try:
476 cb(session,stanza,**args)
477 except Exception, typ:
478 if typ.__class__.__name__ != 'NodeProcessed':
479 raise
480 else:
481 log.debug("Expected stanza arrived!")
482 session._expected[ID] = stanza
483 else:
484 user = 1
485 for handler in chain:
486 if user or handler['system']:
487 try:
488 handler['func'](session, stanza)
489 except Exception, typ:
490 if typ.__class__.__name__ != 'NodeProcessed':
491 self._pendingExceptions.insert(0, sys.exc_info())
492 return
493 user=0
494 if user and self._defaultHandler:
495 self._defaultHandler(session, stanza)
496
498 """
499 Internal wrapper around ProcessNonBlocking. Will check for
500 """
501 if data is None:
502 return
503 res = self.ProcessNonBlocking(data)
504
505
506 if not res:
507 return
508 for (_id, _iq) in self._expected.items():
509 if _iq is None:
510
511
512 continue
513 if _id in self.on_responses:
514 if len(self._expected) == 1:
515 self._owner.onreceive(None)
516 resp, args = self.on_responses[_id]
517 del self.on_responses[_id]
518 if args is None:
519 resp(_iq)
520 else:
521 resp(self._owner, _iq, **args)
522 del self._expected[_id]
523
525 """
526 Send stanza and wait for recipient's response to it. Will call transports
527 on_timeout callback if response is not retrieved in time
528
529 Be aware: Only timeout of latest call of SendAndWait is active.
530 """
531 if timeout is None:
532 timeout = DEFAULT_TIMEOUT_SECONDS
533 _waitid = self.send(stanza)
534 if func:
535 self.on_responses[_waitid] = (func, args)
536 if timeout:
537 self._owner.set_timeout(timeout)
538 self._owner.onreceive(self._WaitForData)
539 self._expected[_waitid] = None
540 return _waitid
541
543 """
544 Put stanza on the wire and call back when recipient replies. Additional
545 callback arguments can be specified in args
546 """
547 self.SendAndWaitForResponse(stanza, 0, func, args)
548
549 - def send(self, stanza, now=False):
550 """
551 Wrap transports send method when plugged into NonBlockingClient. Makes
552 sure stanzas get ID and from tag.
553 """
554 ID = None
555 if type(stanza) not in [type(''), type(u'')]:
556 if isinstance(stanza, Protocol):
557 ID = stanza.getID()
558 if ID is None:
559 stanza.setID(self.getAnID())
560 ID = stanza.getID()
561 if self._owner._registered_name and not stanza.getAttr('from'):
562 stanza.setAttr('from', self._owner._registered_name)
563
564
565 if self.sm and self.sm.enabled and ID:
566 self.sm.uqueue.append(stanza)
567 self.sm.out_h = self.sm.out_h + 1
568 if len(self.sm.uqueue) > self.sm.max_queue:
569 self.sm.request_ack()
570
571 self._owner.Connection.send(stanza, now)
572 return ID
573
576
577 - def PlugIn(self, owner, after_SASL=False, old_features=None):
578 self.old_features = old_features
579 self.after_SASL = after_SASL
580 XMPPDispatcher.PlugIn(self, owner)
581
583 """
584 Send an initial stream header
585 """
586 self.Stream = simplexml.NodeBuilder()
587 self.Stream.dispatch = self.dispatch
588 self.Stream._dispatch_depth = 2
589 self.Stream.stream_header_received = self._check_stream_start
590 self.Stream.features = self.old_features
591
592 self._metastream = Node('stream:stream')
593 self._metastream.setNamespace(self._owner.Namespace)
594 self._metastream.setAttr('version', '1.0')
595 self._metastream.setAttr('xmlns:stream', NS_STREAMS)
596 self._metastream.setAttr('to', self._owner.Server)
597 if locale.getdefaultlocale()[0]:
598 self._metastream.setAttr('xml:lang',
599 locale.getdefaultlocale()[0].split('_')[0])
600
601 self.restart = True
602 self._owner.Connection.send_init(after_SASL=self.after_SASL)
603
605 """
606 Send a stream terminator
607 """
608 self._owner.Connection.send_terminator()
609
618
619 - def dispatch(self, stanza, session=None, direct=0):
640