Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/python-nbxmpp.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/nbxmpp
diff options
context:
space:
mode:
authorPhilipp Hörist <philipp@hoerist.com>2019-11-23 21:50:49 +0300
committerPhilipp Hörist <philipp@hoerist.com>2019-11-23 21:50:49 +0300
commitf3469e7ddf78ff260960038486e5e428d13a3d3a (patch)
tree19d90a9cc973608149555f764d94975c6b598a0c /nbxmpp
parent7e700683e9f08a5095a2075d953570bac7c80885 (diff)
Refactor IQ callbacks
- Remove SendAndWaitForResponse(), there is no need for a timeout that ends the stream. We have other means of checking if the connection to the server was lost. - Don't wrap ProcessNonBlocking(), a server can send multiple stanzas in one packet, wraping this methods means we process callbacks after processing all stanzas in the packet. Which can lead to processing stanzas not in arrival order. Instead process callbacks inside dispatch().
Diffstat (limited to 'nbxmpp')
-rw-r--r--nbxmpp/bind.py4
-rw-r--r--nbxmpp/dispatcher.py82
2 files changed, 18 insertions, 68 deletions
diff --git a/nbxmpp/bind.py b/nbxmpp/bind.py
index 3cb4a51..c780aa1 100644
--- a/nbxmpp/bind.py
+++ b/nbxmpp/bind.py
@@ -87,7 +87,7 @@ class NonBlockingBind(PlugIn):
payload = Node('bind', attrs={'xmlns': NS_BIND}, payload=resource)
node = Protocol('iq', typ='set', payload=[payload])
- self._owner.Dispatcher.SendAndWaitForResponse(node, func=self._on_bind)
+ self._owner.Dispatcher.SendAndCallForResponse(node, func=self._on_bind)
def _on_bind(self, stanza):
if isResultNode(stanza):
@@ -104,7 +104,7 @@ class NonBlockingBind(PlugIn):
else:
node = Node('session', attrs={'xmlns':NS_SESSION})
iq = Protocol('iq', typ='set', payload=[node])
- self._owner.SendAndWaitForResponse(
+ self._owner.SendAndCallForResponse(
iq, func=self._on_session)
return
if stanza:
diff --git a/nbxmpp/dispatcher.py b/nbxmpp/dispatcher.py
index 7ffaf5d..b81f66b 100644
--- a/nbxmpp/dispatcher.py
+++ b/nbxmpp/dispatcher.py
@@ -145,7 +145,7 @@ class XMPPDispatcher(PlugIn):
self.RegisterEventHandler, self.UnregisterCycleHandler,
self.RegisterCycleHandler, self.RegisterHandlerOnce,
self.UnregisterHandler, self.RegisterProtocol,
- self.SendAndWaitForResponse, self.SendAndCallForResponse,
+ self.SendAndCallForResponse,
self.getAnID, self.Event, self.send, self.get_module]
# \ufddo -> \ufdef range
@@ -245,7 +245,6 @@ class XMPPDispatcher(PlugIn):
self.RegisterDefaultHandler(self.returnStanzaHandler)
self.RegisterEventHandler(self._owner._caller._event_dispatcher)
self._register_modules()
- self.on_responses = {}
def plugin(self, owner):
"""
@@ -608,21 +607,18 @@ class XMPPDispatcher(PlugIn):
_id = stanza.getID()
processed = False
- if _id in session._expected:
- if isinstance(session._expected[_id], tuple):
- cb, args = session._expected[_id]
- log.debug('Expected stanza arrived. Callback %s(%s) found',
- cb, args)
- try:
- cb(session, stanza, **args)
- except NodeProcessed:
- pass
- except Exception:
- raise
- else:
- log.debug('Expected stanza arrived')
- session._expected[_id] = stanza
- processed = True
+ if _id in self._expected:
+ cb, args = self._expected[_id]
+ log.debug('Expected stanza arrived. Callback %s(%s) found',
+ cb, args)
+ try:
+ if args is None:
+ cb(stanza)
+ else:
+ cb(self, stanza, **args)
+ except NodeProcessed:
+ pass
+ return
# Gather specifics depending on stanza properties
specifics = ['default']
@@ -664,60 +660,14 @@ class XMPPDispatcher(PlugIn):
if not processed and self._defaultHandler:
self._defaultHandler(session, stanza)
- def _WaitForData(self, data):
- """
- Internal wrapper around ProcessNonBlocking. Will check for
- """
- if data is None:
- return
- res = self.ProcessNonBlocking(data)
- # 0 result indicates that we have closed the connection, e.g.
- # we have released dispatcher, so self._owner has no methods
- if not res:
- return
- for (_id, _iq) in list(self._expected.items()):
- if _iq is None:
- # If the expected Stanza would have arrived, ProcessNonBlocking
- # would have placed the reply stanza in there
- continue
- if _id in self.on_responses:
- if len(self._expected) == 1:
- if hasattr(self._owner, 'onreceive'):
- # With BOSH we get a terminating body with multiple stanzas
- # in it, we unplug BOSH before we parse the stanzas
- self._owner.onreceive(None)
- resp, args = self.on_responses[_id]
- del self.on_responses[_id]
- if args is None:
- resp(_iq)
- else:
- resp(self._owner, _iq, **args)
- del self._expected[_id]
-
- def SendAndWaitForResponse(self, stanza, timeout=None, func=None, args=None):
- """
- Send stanza and wait for recipient's response to it. Will call transports
- on_timeout callback if response is not retrieved in time
-
- Be aware: Only timeout of latest call of SendAndWait is active.
- """
- if timeout is None:
- timeout = DEFAULT_TIMEOUT_SECONDS
- _waitid = self.send(stanza)
- if func:
- self.on_responses[_waitid] = (func, args)
- if timeout:
- self._owner.set_timeout(timeout)
- self._owner.onreceive(self._WaitForData)
- self._expected[_waitid] = None
- return _waitid
-
def SendAndCallForResponse(self, stanza, func=None, args=None):
"""
Put stanza on the wire and call back when recipient replies. Additional
callback arguments can be specified in args
"""
- self.SendAndWaitForResponse(stanza, 0, func, args)
+ _waitid = self.send(stanza)
+ self._expected[_waitid] = (func, args)
+ return _waitid
def send(self, stanza, now=False):
"""