diff options
author | Philipp Hörist <philipp@hoerist.com> | 2019-11-23 21:50:49 +0300 |
---|---|---|
committer | Philipp Hörist <philipp@hoerist.com> | 2019-11-23 21:50:49 +0300 |
commit | f3469e7ddf78ff260960038486e5e428d13a3d3a (patch) | |
tree | 19d90a9cc973608149555f764d94975c6b598a0c /nbxmpp | |
parent | 7e700683e9f08a5095a2075d953570bac7c80885 (diff) |
Refactor IQ callbacks
- Remove SendAndWaitForResponse(), there is no need for a timeout that
ends the stream. We have other means of checking if the connection to
the server was lost.
- Don't wrap ProcessNonBlocking(), a server can send multiple stanzas
in one packet, wraping this methods means we process callbacks after
processing all stanzas in the packet. Which can lead to processing stanzas
not in arrival order. Instead process callbacks inside dispatch().
Diffstat (limited to 'nbxmpp')
-rw-r--r-- | nbxmpp/bind.py | 4 | ||||
-rw-r--r-- | nbxmpp/dispatcher.py | 82 |
2 files changed, 18 insertions, 68 deletions
diff --git a/nbxmpp/bind.py b/nbxmpp/bind.py index 3cb4a51..c780aa1 100644 --- a/nbxmpp/bind.py +++ b/nbxmpp/bind.py @@ -87,7 +87,7 @@ class NonBlockingBind(PlugIn): payload = Node('bind', attrs={'xmlns': NS_BIND}, payload=resource) node = Protocol('iq', typ='set', payload=[payload]) - self._owner.Dispatcher.SendAndWaitForResponse(node, func=self._on_bind) + self._owner.Dispatcher.SendAndCallForResponse(node, func=self._on_bind) def _on_bind(self, stanza): if isResultNode(stanza): @@ -104,7 +104,7 @@ class NonBlockingBind(PlugIn): else: node = Node('session', attrs={'xmlns':NS_SESSION}) iq = Protocol('iq', typ='set', payload=[node]) - self._owner.SendAndWaitForResponse( + self._owner.SendAndCallForResponse( iq, func=self._on_session) return if stanza: diff --git a/nbxmpp/dispatcher.py b/nbxmpp/dispatcher.py index 7ffaf5d..b81f66b 100644 --- a/nbxmpp/dispatcher.py +++ b/nbxmpp/dispatcher.py @@ -145,7 +145,7 @@ class XMPPDispatcher(PlugIn): self.RegisterEventHandler, self.UnregisterCycleHandler, self.RegisterCycleHandler, self.RegisterHandlerOnce, self.UnregisterHandler, self.RegisterProtocol, - self.SendAndWaitForResponse, self.SendAndCallForResponse, + self.SendAndCallForResponse, self.getAnID, self.Event, self.send, self.get_module] # \ufddo -> \ufdef range @@ -245,7 +245,6 @@ class XMPPDispatcher(PlugIn): self.RegisterDefaultHandler(self.returnStanzaHandler) self.RegisterEventHandler(self._owner._caller._event_dispatcher) self._register_modules() - self.on_responses = {} def plugin(self, owner): """ @@ -608,21 +607,18 @@ class XMPPDispatcher(PlugIn): _id = stanza.getID() processed = False - if _id in session._expected: - if isinstance(session._expected[_id], tuple): - cb, args = session._expected[_id] - log.debug('Expected stanza arrived. Callback %s(%s) found', - cb, args) - try: - cb(session, stanza, **args) - except NodeProcessed: - pass - except Exception: - raise - else: - log.debug('Expected stanza arrived') - session._expected[_id] = stanza - processed = True + if _id in self._expected: + cb, args = self._expected[_id] + log.debug('Expected stanza arrived. Callback %s(%s) found', + cb, args) + try: + if args is None: + cb(stanza) + else: + cb(self, stanza, **args) + except NodeProcessed: + pass + return # Gather specifics depending on stanza properties specifics = ['default'] @@ -664,60 +660,14 @@ class XMPPDispatcher(PlugIn): if not processed and self._defaultHandler: self._defaultHandler(session, stanza) - def _WaitForData(self, data): - """ - Internal wrapper around ProcessNonBlocking. Will check for - """ - if data is None: - return - res = self.ProcessNonBlocking(data) - # 0 result indicates that we have closed the connection, e.g. - # we have released dispatcher, so self._owner has no methods - if not res: - return - for (_id, _iq) in list(self._expected.items()): - if _iq is None: - # If the expected Stanza would have arrived, ProcessNonBlocking - # would have placed the reply stanza in there - continue - if _id in self.on_responses: - if len(self._expected) == 1: - if hasattr(self._owner, 'onreceive'): - # With BOSH we get a terminating body with multiple stanzas - # in it, we unplug BOSH before we parse the stanzas - self._owner.onreceive(None) - resp, args = self.on_responses[_id] - del self.on_responses[_id] - if args is None: - resp(_iq) - else: - resp(self._owner, _iq, **args) - del self._expected[_id] - - def SendAndWaitForResponse(self, stanza, timeout=None, func=None, args=None): - """ - Send stanza and wait for recipient's response to it. Will call transports - on_timeout callback if response is not retrieved in time - - Be aware: Only timeout of latest call of SendAndWait is active. - """ - if timeout is None: - timeout = DEFAULT_TIMEOUT_SECONDS - _waitid = self.send(stanza) - if func: - self.on_responses[_waitid] = (func, args) - if timeout: - self._owner.set_timeout(timeout) - self._owner.onreceive(self._WaitForData) - self._expected[_waitid] = None - return _waitid - def SendAndCallForResponse(self, stanza, func=None, args=None): """ Put stanza on the wire and call back when recipient replies. Additional callback arguments can be specified in args """ - self.SendAndWaitForResponse(stanza, 0, func, args) + _waitid = self.send(stanza) + self._expected[_waitid] = (func, args) + return _waitid def send(self, stanza, now=False): """ |