Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mrDoctorWho/xmpppy.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'xmpp/dispatcher.py')
-rw-r--r--xmpp/dispatcher.py976
1 files changed, 488 insertions, 488 deletions
diff --git a/xmpp/dispatcher.py b/xmpp/dispatcher.py
index f873ffd..4674969 100644
--- a/xmpp/dispatcher.py
+++ b/xmpp/dispatcher.py
@@ -1,488 +1,488 @@
-## transports.py
-##
-## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
-##
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2, or (at your option)
-## any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-
-# $Id: dispatcher.py, v1.45 2014/02/16 alkorgun Exp $
-
-"""
-Main xmpppy mechanism. Provides library with methods to assign different handlers
-to different XMPP stanzas.
-Contains one tunable attribute: DefaultTimeout (25 seconds by default). It defines time that
-Dispatcher.SendAndWaitForResponce method will wait for reply stanza before giving up.
-"""
-
-import sys
-import time
-from . import simplexml
-
-from .plugin import PlugIn
-from .protocol import *
-from select import select
-from xml.parsers.expat import ExpatError
-
-DefaultTimeout = 25
-ID = 0
-
-DBG_LINE = "dispatcher"
-
-if sys.hexversion >= 0x30000F0:
-
- def deferredRaise(e):
- raise e[0](e[1]).with_traceback(e[2])
-
-else:
-
- def deferredRaise(e):
- raise e[0], e[1], e[2]
-
-class Dispatcher(PlugIn):
- """
- Ancestor of PlugIn class. Handles XMPP stream, i.e. aware of stream headers.
- Can be plugged out/in to restart these headers (used for SASL f.e.).
- """
- def __init__(self):
- PlugIn.__init__(self)
- self.handlers = {}
- self._expected = {}
- self._defaultHandler = None
- self._pendingExceptions = []
- self._eventHandler = None
- self._cycleHandlers = []
- self._exported_methods = [
- self.Process,
- self.RegisterHandler,
-# self.RegisterDefaultHandler,
- self.RegisterEventHandler,
- self.UnregisterCycleHandler,
- self.RegisterCycleHandler,
- self.RegisterHandlerOnce,
- self.UnregisterHandler,
- self.RegisterProtocol,
- self.WaitForResponse,
- self.SendAndWaitForResponse,
- self.send,
- self.SendAndCallForResponse,
- self.disconnect,
- self.iter
- ]
-
- def dumpHandlers(self):
- """
- Return set of user-registered callbacks in it's internal format.
- Used within the library to carry user handlers set over Dispatcher replugins.
- """
- return self.handlers
-
- def restoreHandlers(self, handlers):
- """
- Restores user-registered callbacks structure from dump previously obtained via dumpHandlers.
- Used within the library to carry user handlers set over Dispatcher replugins.
- """
- self.handlers = handlers
-
- def _init(self):
- """
- Registers default namespaces/protocols/handlers. Used internally.
- """
- self.RegisterNamespace("unknown")
- self.RegisterNamespace(NS_STREAMS)
- self.RegisterNamespace(self._owner.defaultNamespace)
- self.RegisterProtocol("iq", Iq)
- self.RegisterProtocol("presence", Presence)
- self.RegisterProtocol("message", Message)
-# self.RegisterDefaultHandler(self.returnStanzaHandler)
- self.RegisterHandler("error", self.streamErrorHandler, xmlns=NS_STREAMS)
-
- def plugin(self, owner):
- """
- Plug the Dispatcher instance into Client class instance and send initial stream header. Used internally.
- """
- self._init()
- for method in self._old_owners_methods:
- if method.__name__ == "send":
- self._owner_send = method
- break
- self._owner.lastErrNode = None
- self._owner.lastErr = None
- self._owner.lastErrCode = None
- self.StreamInit()
-
- def plugout(self):
- """
- Prepares instance to be destructed.
- """
- self.Stream.dispatch = None
- self.Stream.DEBUG = None
- self.Stream.features = None
- self.Stream.destroy()
-
- def StreamInit(self):
- """
- Send an initial stream header.
- """
- self.Stream = simplexml.NodeBuilder()
- self.Stream._dispatch_depth = 2
- self.Stream.dispatch = self.dispatch
- self.Stream.stream_header_received = self._check_stream_start
- self._owner.debug_flags.append(simplexml.DBG_NODEBUILDER)
- self.Stream.DEBUG = self._owner.DEBUG
- self.Stream.features = None
- self._metastream = Node("stream:stream")
- self._metastream.setNamespace(self._owner.Namespace)
- self._metastream.setAttr("version", "1.0")
- self._metastream.setAttr("xmlns:stream", NS_STREAMS)
- self._metastream.setAttr("to", self._owner.Server)
- self._owner.send("<?xml version=\"1.0\"?>%s>" % str(self._metastream)[:-2])
-
- def _check_stream_start(self, ns, tag, attrs):
- if ns != NS_STREAMS or tag != "stream":
- raise ValueError("Incorrect stream start: (%s,%s). Terminating." % (tag, ns))
-
- def Process(self, timeout=8):
- """
- Check incoming stream for data waiting. If "timeout" is positive - block for as max. this time.
- Returns:
- 1) length of processed data if some data were processed;
- 2) "0" string if no data were processed but link is alive;
- 3) 0 (zero) if underlying connection is closed.
- Take note that in case of disconnection detect during Process() call
- disconnect handlers are called automatically.
- """
- for handler in self._cycleHandlers:
- handler(self)
- if self._pendingExceptions:
- deferredRaise(self._pendingExceptions.pop())
- if self._owner.Connection.pending_data(timeout):
- try:
- data = self._owner.Connection.receive()
- except IOError:
- return None
- try:
- self.Stream.Parse(data)
- except ExpatError:
- pass
- if self._pendingExceptions:
- deferredRaise(self._pendingExceptions.pop())
- if data:
- return len(data)
- return "0"
-
- def RegisterNamespace(self, xmlns, order="info"):
- """
- Creates internal structures for newly registered namespace.
- You can register handlers for this namespace afterwards. By default one namespace
- already registered (jabber:client or jabber:component:accept depending on context.
- """
- self.DEBUG("Registering namespace \"%s\"" % xmlns, order)
- self.handlers[xmlns] = {}
- self.RegisterProtocol("unknown", Protocol, xmlns=xmlns)
- self.RegisterProtocol("default", Protocol, xmlns=xmlns)
-
- def RegisterProtocol(self, tag_name, Proto, xmlns=None, order="info"):
- """
- Used to declare some top-level stanza name to dispatcher.
- Needed to start registering handlers for such stanzas.
- Iq, message and presence protocols are registered by default.
- """
- if not xmlns:
- xmlns = self._owner.defaultNamespace
- self.DEBUG("Registering protocol \"%s\" as %s(%s)" % (tag_name, Proto, xmlns), order)
- self.handlers[xmlns][tag_name] = {"type": Proto, "default": []}
-
- def RegisterNamespaceHandler(self, xmlns, handler, typ="", ns="", makefirst=0, system=0):
- """
- Register handler for processing all stanzas for specified namespace.
- """
- self.RegisterHandler("default", handler, typ, ns, xmlns, makefirst, system)
-
- def RegisterHandler(self, name, handler, typ="", ns="", xmlns=None, makefirst=0, system=0):
- """Register user callback as stanzas handler of declared type. Callback must take
- (if chained, see later) arguments: dispatcher instance (for replying), incomed
- return of previous handlers.
- The callback must raise xmpp.NodeProcessed just before return if it want preven
- callbacks to be called with the same stanza as argument _and_, more importantly
- library from returning stanza to sender with error set (to be enabled in 0.2 ve
- Arguments:
- "name" - name of stanza. F.e. "iq".
- "handler" - user callback.
- "typ" - value of stanza's "type" attribute. If not specified any value match
- "ns" - namespace of child that stanza must contain.
- "chained" - chain together output of several handlers.
- "makefirst" - insert handler in the beginning of handlers list instead of
- adding it to the end. Note that more common handlers (i.e. w/o "typ" and
- will be called first nevertheless).
- "system" - call handler even if NodeProcessed Exception were raised already.
- """
- if not xmlns:
- xmlns = self._owner.defaultNamespace
- self.DEBUG("Registering handler %s for \"%s\" type->%s ns->%s(%s)" % (handler, name, typ, ns, xmlns), "info")
- if not typ and not ns:
- typ = "default"
- if xmlns not in self.handlers:
- self.RegisterNamespace(xmlns, "warn")
- if name not in self.handlers[xmlns]:
- self.RegisterProtocol(name, Protocol, xmlns, "warn")
- if typ + ns not in self.handlers[xmlns][name]:
- self.handlers[xmlns][name][typ + ns] = []
- if makefirst:
- self.handlers[xmlns][name][typ + ns].insert(0, {"func": handler, "system": system})
- else:
- self.handlers[xmlns][name][typ + ns].append({"func": handler, "system": system})
-
- def RegisterHandlerOnce(self, name, handler, typ="", ns="", xmlns=None, makefirst=0, system=0):
- """
- Unregister handler after first call (not implemented yet).
- """
- if not xmlns:
- xmlns = self._owner.defaultNamespace
- self.RegisterHandler(name, handler, typ, ns, xmlns, makefirst, system)
-
- def UnregisterHandler(self, name, handler, typ="", ns="", xmlns=None):
- """
- Unregister handler. "typ" and "ns" must be specified exactly the same as with registering.
- """
- if not xmlns:
- xmlns = self._owner.defaultNamespace
- if xmlns not in self.handlers:
- return None
- if not typ and not ns:
- typ = "default"
- for pack in self.handlers[xmlns][name][typ + ns]:
- if handler == pack["func"]:
- break
- else:
- pack = None
- try:
- self.handlers[xmlns][name][typ + ns].remove(pack)
- except ValueError:
- pass
-
- def RegisterDefaultHandler(self, handler):
- """
- Specify the handler that will be used if no NodeProcessed exception were raised.
- This is returnStanzaHandler by default.
- """
- self._defaultHandler = handler
-
- def RegisterEventHandler(self, handler):
- """
- Register handler that will process events. F.e. "FILERECEIVED" event.
- """
- self._eventHandler = handler
-
- def returnStanzaHandler(self, conn, stanza):
- """
- Return stanza back to the sender with <feature-not-implemennted/> error set.
- """
- if stanza.getType() in ("get", "set"):
- conn.send(Error(stanza, ERR_FEATURE_NOT_IMPLEMENTED))
-
- def streamErrorHandler(self, conn, error):
- name, text = "error", error.getData()
- for tag in error.getChildren():
- if tag.getNamespace() == NS_XMPP_STREAMS:
- if tag.getName() == "text":
- text = tag.getData()
- else:
- name = tag.getName()
- if name in stream_exceptions.keys():
- exc = stream_exceptions[name]
- else:
- exc = StreamError
- raise exc((name, text))
-
- def RegisterCycleHandler(self, handler):
- """
- Register handler that will be called on every Dispatcher.Process() call.
- """
- if handler not in self._cycleHandlers:
- self._cycleHandlers.append(handler)
-
- def UnregisterCycleHandler(self, handler):
- """
- Unregister handler that will is called on every Dispatcher.Process() call.
- """
- if handler in self._cycleHandlers:
- self._cycleHandlers.remove(handler)
-
- def Event(self, realm, event, data):
- """
- Raise some event. Takes three arguments:
- 1) "realm" - scope of event. Usually a namespace.
- 2) "event" - the event itself. F.e. "SUCESSFULL SEND".
- 3) data that comes along with event. Depends on event.
- """
- if self._eventHandler:
- self._eventHandler(realm, event, data)
-
- def dispatch(self, stanza, session=None, direct=0):
- """
- Main procedure that performs XMPP stanza recognition and calling apppropriate handlers for it.
- Called internally.
- """
- if not session:
- session = self
- session.Stream._mini_dom = None
- name = stanza.getName()
- if not direct and self._owner._route:
- if name == "route":
- if stanza.getAttr("error") == None:
- if len(stanza.getChildren()) == 1:
- stanza = stanza.getChildren()[0]
- name = stanza.getName()
- else:
- for each in stanza.getChildren():
- self.dispatch(each, session, direct=1)
- return None
- elif name == "presence":
- return None
- elif name in ("features", "bind"):
- pass
- else:
- raise UnsupportedStanzaType(name)
- if name == "features":
- session.Stream.features = stanza
- xmlns = stanza.getNamespace()
- if xmlns not in self.handlers:
- self.DEBUG("Unknown namespace: " + xmlns, "warn")
- xmlns = "unknown"
- if name not in self.handlers[xmlns]:
- self.DEBUG("Unknown stanza: " + name, "warn")
- name = "unknown"
- else:
- self.DEBUG("Got %s/%s stanza" % (xmlns, name), "ok")
- if isinstance(stanza, Node):
- stanza = self.handlers[xmlns][name]["type"](node=stanza)
- typ = stanza.getType()
- if not typ:
- typ = ""
- stanza.props = stanza.getProperties()
- ID = stanza.getID()
- session.DEBUG("Dispatching %s stanza with type->%s props->%s id->%s" % (name, typ, stanza.props, ID), "ok")
- ls = ["default"] # we will use all handlers:
- if typ in self.handlers[xmlns][name]:
- ls.append(typ) # from very common...
- for prop in stanza.props:
- if prop in self.handlers[xmlns][name]:
- ls.append(prop)
- if typ and (typ + prop) in self.handlers[xmlns][name]:
- ls.append(typ + prop) # ...to very particular
- chain = self.handlers[xmlns]["default"]["default"]
- for key in ls:
- if key:
- chain = chain + self.handlers[xmlns][name][key]
- output = ""
- if ID in session._expected:
- user = 0
- if isinstance(session._expected[ID], tuple):
- cb, args = session._expected.pop(ID)
- session.DEBUG("Expected stanza arrived. Callback %s(%s) found!" % (cb, args), "ok")
- try:
- cb(session, stanza, **args)
- except NodeProcessed:
- pass
- else:
- session.DEBUG("Expected stanza arrived!", "ok")
- session._expected[ID] = stanza
- else:
- user = 1
- for handler in chain:
- if user or handler["system"]:
- try:
- handler["func"](session, stanza)
- except NodeProcessed:
- user = 0
- except Exception:
- self._pendingExceptions.insert(0, sys.exc_info())
- if user and self._defaultHandler:
- self._defaultHandler(session, stanza)
-
- def WaitForResponse(self, ID, timeout=DefaultTimeout):
- """
- Block and wait until stanza with specific "id" attribute will come.
- If no such stanza is arrived within timeout, return None.
- If operation failed for some reason then owner's attributes
- lastErrNode, lastErr and lastErrCode are set accordingly.
- """
- self._expected[ID] = None
- abort_time = time.time() + timeout
- self.DEBUG("Waiting for ID:%s with timeout %s..." % (ID, timeout), "wait")
- while not self._expected[ID]:
- if not self.Process(0.04):
- self._owner.lastErr = "Disconnect"
- return None
- if time.time() > abort_time:
- self._owner.lastErr = "Timeout"
- return None
- resp = self._expected.pop(ID)
- if resp.getErrorCode():
- self._owner.lastErrNode = resp
- self._owner.lastErr = resp.getError()
- self._owner.lastErrCode = resp.getErrorCode()
- return resp
-
- def SendAndWaitForResponse(self, stanza, timeout=DefaultTimeout):
- """
- Put stanza on the wire and wait for recipient's response to it.
- """
- return self.WaitForResponse(self.send(stanza), timeout)
-
- def SendAndCallForResponse(self, stanza, func, args={}):
- """
- Put stanza on the wire and call back when recipient replies.
- Additional callback arguments can be specified in args.
- """
- self._expected[self.send(stanza)] = (func, args)
-
- def send(self, stanza):
- """
- Serialize stanza and put it on the wire. Assign an unique ID to it before send.
- Returns assigned ID.
- """
- if isinstance(stanza, basestring):
- return self._owner_send(stanza)
- if not isinstance(stanza, Protocol):
- id = None
- elif not stanza.getID():
- global ID
- ID += 1
- id = repr(ID)
- stanza.setID(id)
- else:
- id = stanza.getID()
- if self._owner._registered_name and not stanza.getAttr("from"):
- stanza.setAttr("from", self._owner._registered_name)
- if self._owner._route and stanza.getName() != "bind":
- to = self._owner.Server
- if stanza.getTo() and stanza.getTo().getDomain():
- to = stanza.getTo().getDomain()
- frm = stanza.getFrom()
- if frm.getDomain():
- frm = frm.getDomain()
- route = Protocol("route", to=to, frm=frm, payload=[stanza])
- stanza = route
- stanza.setNamespace(self._owner.Namespace)
- stanza.setParent(self._metastream)
- self._owner_send(stanza)
- return id
-
- def disconnect(self):
- """
- Send a stream terminator and and handle all incoming stanzas before stream closure.
- """
- if self._owner.connected:
- self._owner_send("</stream:stream>")
- while self.Process(1):
- pass
-
- iter = type(send)(Process.__code__, Process.__globals__, name = "iter", argdefs = Process.__defaults__, closure = Process.__closure__)
+## transports.py
+##
+## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2, or (at your option)
+## any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+
+# $Id: dispatcher.py, v1.45 2014/02/16 alkorgun Exp $
+
+"""
+Main xmpppy mechanism. Provides library with methods to assign different handlers
+to different XMPP stanzas.
+Contains one tunable attribute: DefaultTimeout (25 seconds by default). It defines time that
+Dispatcher.SendAndWaitForResponce method will wait for reply stanza before giving up.
+"""
+
+import sys
+import time
+from . import simplexml
+
+from .plugin import PlugIn
+from .protocol import *
+from select import select
+from xml.parsers.expat import ExpatError
+
+DefaultTimeout = 25
+ID = 0
+
+DBG_LINE = "dispatcher"
+
+if sys.hexversion >= 0x30000F0:
+
+ def deferredRaise(e):
+ raise e[0](e[1]).with_traceback(e[2])
+
+else:
+
+ def deferredRaise(e):
+ raise e[0], e[1], e[2]
+
+class Dispatcher(PlugIn):
+ """
+ Ancestor of PlugIn class. Handles XMPP stream, i.e. aware of stream headers.
+ Can be plugged out/in to restart these headers (used for SASL f.e.).
+ """
+ def __init__(self):
+ PlugIn.__init__(self)
+ self.handlers = {}
+ self._expected = {}
+ self._defaultHandler = None
+ self._pendingExceptions = []
+ self._eventHandler = None
+ self._cycleHandlers = []
+ self._exported_methods = [
+ self.Process,
+ self.RegisterHandler,
+# self.RegisterDefaultHandler,
+ self.RegisterEventHandler,
+ self.UnregisterCycleHandler,
+ self.RegisterCycleHandler,
+ self.RegisterHandlerOnce,
+ self.UnregisterHandler,
+ self.RegisterProtocol,
+ self.WaitForResponse,
+ self.SendAndWaitForResponse,
+ self.send,
+ self.SendAndCallForResponse,
+ self.disconnect,
+ self.iter
+ ]
+
+ def dumpHandlers(self):
+ """
+ Return set of user-registered callbacks in it's internal format.
+ Used within the library to carry user handlers set over Dispatcher replugins.
+ """
+ return self.handlers
+
+ def restoreHandlers(self, handlers):
+ """
+ Restores user-registered callbacks structure from dump previously obtained via dumpHandlers.
+ Used within the library to carry user handlers set over Dispatcher replugins.
+ """
+ self.handlers = handlers
+
+ def _init(self):
+ """
+ Registers default namespaces/protocols/handlers. Used internally.
+ """
+ self.RegisterNamespace("unknown")
+ self.RegisterNamespace(NS_STREAMS)
+ self.RegisterNamespace(self._owner.defaultNamespace)
+ self.RegisterProtocol("iq", Iq)
+ self.RegisterProtocol("presence", Presence)
+ self.RegisterProtocol("message", Message)
+# self.RegisterDefaultHandler(self.returnStanzaHandler)
+ self.RegisterHandler("error", self.streamErrorHandler, xmlns=NS_STREAMS)
+
+ def plugin(self, owner):
+ """
+ Plug the Dispatcher instance into Client class instance and send initial stream header. Used internally.
+ """
+ self._init()
+ for method in self._old_owners_methods:
+ if method.__name__ == "send":
+ self._owner_send = method
+ break
+ self._owner.lastErrNode = None
+ self._owner.lastErr = None
+ self._owner.lastErrCode = None
+ self.StreamInit()
+
+ def plugout(self):
+ """
+ Prepares instance to be destructed.
+ """
+ self.Stream.dispatch = None
+ self.Stream.DEBUG = None
+ self.Stream.features = None
+ self.Stream.destroy()
+
+ def StreamInit(self):
+ """
+ Send an initial stream header.
+ """
+ self.Stream = simplexml.NodeBuilder()
+ self.Stream._dispatch_depth = 2
+ self.Stream.dispatch = self.dispatch
+ self.Stream.stream_header_received = self._check_stream_start
+ self._owner.debug_flags.append(simplexml.DBG_NODEBUILDER)
+ self.Stream.DEBUG = self._owner.DEBUG
+ self.Stream.features = None
+ self._metastream = Node("stream:stream")
+ self._metastream.setNamespace(self._owner.Namespace)
+ self._metastream.setAttr("version", "1.0")
+ self._metastream.setAttr("xmlns:stream", NS_STREAMS)
+ self._metastream.setAttr("to", self._owner.Server)
+ self._owner.send("<?xml version=\"1.0\"?>%s>" % str(self._metastream)[:-2])
+
+ def _check_stream_start(self, ns, tag, attrs):
+ if ns != NS_STREAMS or tag != "stream":
+ raise ValueError("Incorrect stream start: (%s,%s). Terminating." % (tag, ns))
+
+ def Process(self, timeout=8):
+ """
+ Check incoming stream for data waiting. If "timeout" is positive - block for as max. this time.
+ Returns:
+ 1) length of processed data if some data were processed;
+ 2) "0" string if no data were processed but link is alive;
+ 3) 0 (zero) if underlying connection is closed.
+ Take note that in case of disconnection detect during Process() call
+ disconnect handlers are called automatically.
+ """
+ for handler in self._cycleHandlers:
+ handler(self)
+ if self._pendingExceptions:
+ deferredRaise(self._pendingExceptions.pop())
+ if self._owner.Connection.pending_data(timeout):
+ try:
+ data = self._owner.Connection.receive()
+ except IOError:
+ return None
+ try:
+ self.Stream.Parse(data)
+ except ExpatError:
+ pass
+ if self._pendingExceptions:
+ deferredRaise(self._pendingExceptions.pop())
+ if data:
+ return len(data)
+ return "0"
+
+ def RegisterNamespace(self, xmlns, order="info"):
+ """
+ Creates internal structures for newly registered namespace.
+ You can register handlers for this namespace afterwards. By default one namespace
+ already registered (jabber:client or jabber:component:accept depending on context.
+ """
+ self.DEBUG("Registering namespace \"%s\"" % xmlns, order)
+ self.handlers[xmlns] = {}
+ self.RegisterProtocol("unknown", Protocol, xmlns=xmlns)
+ self.RegisterProtocol("default", Protocol, xmlns=xmlns)
+
+ def RegisterProtocol(self, tag_name, Proto, xmlns=None, order="info"):
+ """
+ Used to declare some top-level stanza name to dispatcher.
+ Needed to start registering handlers for such stanzas.
+ Iq, message and presence protocols are registered by default.
+ """
+ if not xmlns:
+ xmlns = self._owner.defaultNamespace
+ self.DEBUG("Registering protocol \"%s\" as %s(%s)" % (tag_name, Proto, xmlns), order)
+ self.handlers[xmlns][tag_name] = {"type": Proto, "default": []}
+
+ def RegisterNamespaceHandler(self, xmlns, handler, typ="", ns="", makefirst=0, system=0):
+ """
+ Register handler for processing all stanzas for specified namespace.
+ """
+ self.RegisterHandler("default", handler, typ, ns, xmlns, makefirst, system)
+
+ def RegisterHandler(self, name, handler, typ="", ns="", xmlns=None, makefirst=0, system=0):
+ """Register user callback as stanzas handler of declared type. Callback must take
+ (if chained, see later) arguments: dispatcher instance (for replying), incomed
+ return of previous handlers.
+ The callback must raise xmpp.NodeProcessed just before return if it want preven
+ callbacks to be called with the same stanza as argument _and_, more importantly
+ library from returning stanza to sender with error set (to be enabled in 0.2 ve
+ Arguments:
+ "name" - name of stanza. F.e. "iq".
+ "handler" - user callback.
+ "typ" - value of stanza's "type" attribute. If not specified any value match
+ "ns" - namespace of child that stanza must contain.
+ "chained" - chain together output of several handlers.
+ "makefirst" - insert handler in the beginning of handlers list instead of
+ adding it to the end. Note that more common handlers (i.e. w/o "typ" and
+ will be called first nevertheless).
+ "system" - call handler even if NodeProcessed Exception were raised already.
+ """
+ if not xmlns:
+ xmlns = self._owner.defaultNamespace
+ self.DEBUG("Registering handler %s for \"%s\" type->%s ns->%s(%s)" % (handler, name, typ, ns, xmlns), "info")
+ if not typ and not ns:
+ typ = "default"
+ if xmlns not in self.handlers:
+ self.RegisterNamespace(xmlns, "warn")
+ if name not in self.handlers[xmlns]:
+ self.RegisterProtocol(name, Protocol, xmlns, "warn")
+ if typ + ns not in self.handlers[xmlns][name]:
+ self.handlers[xmlns][name][typ + ns] = []
+ if makefirst:
+ self.handlers[xmlns][name][typ + ns].insert(0, {"func": handler, "system": system})
+ else:
+ self.handlers[xmlns][name][typ + ns].append({"func": handler, "system": system})
+
+ def RegisterHandlerOnce(self, name, handler, typ="", ns="", xmlns=None, makefirst=0, system=0):
+ """
+ Unregister handler after first call (not implemented yet).
+ """
+ if not xmlns:
+ xmlns = self._owner.defaultNamespace
+ self.RegisterHandler(name, handler, typ, ns, xmlns, makefirst, system)
+
+ def UnregisterHandler(self, name, handler, typ="", ns="", xmlns=None):
+ """
+ Unregister handler. "typ" and "ns" must be specified exactly the same as with registering.
+ """
+ if not xmlns:
+ xmlns = self._owner.defaultNamespace
+ if xmlns not in self.handlers:
+ return None
+ if not typ and not ns:
+ typ = "default"
+ for pack in self.handlers[xmlns][name][typ + ns]:
+ if handler == pack["func"]:
+ break
+ else:
+ pack = None
+ try:
+ self.handlers[xmlns][name][typ + ns].remove(pack)
+ except ValueError:
+ pass
+
+ def RegisterDefaultHandler(self, handler):
+ """
+ Specify the handler that will be used if no NodeProcessed exception were raised.
+ This is returnStanzaHandler by default.
+ """
+ self._defaultHandler = handler
+
+ def RegisterEventHandler(self, handler):
+ """
+ Register handler that will process events. F.e. "FILERECEIVED" event.
+ """
+ self._eventHandler = handler
+
+ def returnStanzaHandler(self, conn, stanza):
+ """
+ Return stanza back to the sender with <feature-not-implemennted/> error set.
+ """
+ if stanza.getType() in ("get", "set"):
+ conn.send(Error(stanza, ERR_FEATURE_NOT_IMPLEMENTED))
+
+ def streamErrorHandler(self, conn, error):
+ name, text = "error", error.getData()
+ for tag in error.getChildren():
+ if tag.getNamespace() == NS_XMPP_STREAMS:
+ if tag.getName() == "text":
+ text = tag.getData()
+ else:
+ name = tag.getName()
+ if name in stream_exceptions.keys():
+ exc = stream_exceptions[name]
+ else:
+ exc = StreamError
+ raise exc((name, text))
+
+ def RegisterCycleHandler(self, handler):
+ """
+ Register handler that will be called on every Dispatcher.Process() call.
+ """
+ if handler not in self._cycleHandlers:
+ self._cycleHandlers.append(handler)
+
+ def UnregisterCycleHandler(self, handler):
+ """
+ Unregister handler that will is called on every Dispatcher.Process() call.
+ """
+ if handler in self._cycleHandlers:
+ self._cycleHandlers.remove(handler)
+
+ def Event(self, realm, event, data):
+ """
+ Raise some event. Takes three arguments:
+ 1) "realm" - scope of event. Usually a namespace.
+ 2) "event" - the event itself. F.e. "SUCESSFULL SEND".
+ 3) data that comes along with event. Depends on event.
+ """
+ if self._eventHandler:
+ self._eventHandler(realm, event, data)
+
+ def dispatch(self, stanza, session=None, direct=0):
+ """
+ Main procedure that performs XMPP stanza recognition and calling apppropriate handlers for it.
+ Called internally.
+ """
+ if not session:
+ session = self
+ session.Stream._mini_dom = None
+ name = stanza.getName()
+ if not direct and self._owner._route:
+ if name == "route":
+ if stanza.getAttr("error") == None:
+ if len(stanza.getChildren()) == 1:
+ stanza = stanza.getChildren()[0]
+ name = stanza.getName()
+ else:
+ for each in stanza.getChildren():
+ self.dispatch(each, session, direct=1)
+ return None
+ elif name == "presence":
+ return None
+ elif name in ("features", "bind"):
+ pass
+ else:
+ raise UnsupportedStanzaType(name)
+ if name == "features":
+ session.Stream.features = stanza
+ xmlns = stanza.getNamespace()
+ if xmlns not in self.handlers:
+ self.DEBUG("Unknown namespace: " + xmlns, "warn")
+ xmlns = "unknown"
+ if name not in self.handlers[xmlns]:
+ self.DEBUG("Unknown stanza: " + name, "warn")
+ name = "unknown"
+ else:
+ self.DEBUG("Got %s/%s stanza" % (xmlns, name), "ok")
+ if isinstance(stanza, Node):
+ stanza = self.handlers[xmlns][name]["type"](node=stanza)
+ typ = stanza.getType()
+ if not typ:
+ typ = ""
+ stanza.props = stanza.getProperties()
+ ID = stanza.getID()
+ session.DEBUG("Dispatching %s stanza with type->%s props->%s id->%s" % (name, typ, stanza.props, ID), "ok")
+ ls = ["default"] # we will use all handlers:
+ if typ in self.handlers[xmlns][name]:
+ ls.append(typ) # from very common...
+ for prop in stanza.props:
+ if prop in self.handlers[xmlns][name]:
+ ls.append(prop)
+ if typ and (typ + prop) in self.handlers[xmlns][name]:
+ ls.append(typ + prop) # ...to very particular
+ chain = self.handlers[xmlns]["default"]["default"]
+ for key in ls:
+ if key:
+ chain = chain + self.handlers[xmlns][name][key]
+ output = ""
+ if ID in session._expected:
+ user = 0
+ if isinstance(session._expected[ID], tuple):
+ cb, args = session._expected.pop(ID)
+ session.DEBUG("Expected stanza arrived. Callback %s(%s) found!" % (cb, args), "ok")
+ try:
+ cb(session, stanza, **args)
+ except NodeProcessed:
+ pass
+ else:
+ session.DEBUG("Expected stanza arrived!", "ok")
+ session._expected[ID] = stanza
+ else:
+ user = 1
+ for handler in chain:
+ if user or handler["system"]:
+ try:
+ handler["func"](session, stanza)
+ except NodeProcessed:
+ user = 0
+ except Exception:
+ self._pendingExceptions.insert(0, sys.exc_info())
+ if user and self._defaultHandler:
+ self._defaultHandler(session, stanza)
+
+ def WaitForResponse(self, ID, timeout=DefaultTimeout):
+ """
+ Block and wait until stanza with specific "id" attribute will come.
+ If no such stanza is arrived within timeout, return None.
+ If operation failed for some reason then owner's attributes
+ lastErrNode, lastErr and lastErrCode are set accordingly.
+ """
+ self._expected[ID] = None
+ abort_time = time.time() + timeout
+ self.DEBUG("Waiting for ID:%s with timeout %s..." % (ID, timeout), "wait")
+ while not self._expected[ID]:
+ if not self.Process(0.04):
+ self._owner.lastErr = "Disconnect"
+ return None
+ if time.time() > abort_time:
+ self._owner.lastErr = "Timeout"
+ return None
+ resp = self._expected.pop(ID)
+ if resp.getErrorCode():
+ self._owner.lastErrNode = resp
+ self._owner.lastErr = resp.getError()
+ self._owner.lastErrCode = resp.getErrorCode()
+ return resp
+
+ def SendAndWaitForResponse(self, stanza, timeout=DefaultTimeout):
+ """
+ Put stanza on the wire and wait for recipient's response to it.
+ """
+ return self.WaitForResponse(self.send(stanza), timeout)
+
+ def SendAndCallForResponse(self, stanza, func, args={}):
+ """
+ Put stanza on the wire and call back when recipient replies.
+ Additional callback arguments can be specified in args.
+ """
+ self._expected[self.send(stanza)] = (func, args)
+
+ def send(self, stanza):
+ """
+ Serialize stanza and put it on the wire. Assign an unique ID to it before send.
+ Returns assigned ID.
+ """
+ if isinstance(stanza, basestring):
+ return self._owner_send(stanza)
+ if not isinstance(stanza, Protocol):
+ id = None
+ elif not stanza.getID():
+ global ID
+ ID += 1
+ id = repr(ID)
+ stanza.setID(id)
+ else:
+ id = stanza.getID()
+ if self._owner._registered_name and not stanza.getAttr("from"):
+ stanza.setAttr("from", self._owner._registered_name)
+ if self._owner._route and stanza.getName() != "bind":
+ to = self._owner.Server
+ if stanza.getTo() and stanza.getTo().getDomain():
+ to = stanza.getTo().getDomain()
+ frm = stanza.getFrom()
+ if frm.getDomain():
+ frm = frm.getDomain()
+ route = Protocol("route", to=to, frm=frm, payload=[stanza])
+ stanza = route
+ stanza.setNamespace(self._owner.Namespace)
+ stanza.setParent(self._metastream)
+ self._owner_send(stanza)
+ return id
+
+ def disconnect(self):
+ """
+ Send a stream terminator and and handle all incoming stanzas before stream closure.
+ """
+ if self._owner.connected:
+ self._owner_send("</stream:stream>")
+ while self.Process(1):
+ pass
+
+ iter = type(send)(Process.__code__, Process.__globals__, name = "iter", argdefs = Process.__defaults__, closure = Process.__closure__)