Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/python-nbxmpp.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--nbxmpp/__init__.py15
-rw-r--r--nbxmpp/addresses.py233
-rw-r--r--nbxmpp/auth.py273
-rw-r--r--nbxmpp/client.py757
-rw-r--r--nbxmpp/connection.py130
-rw-r--r--nbxmpp/const.py124
-rw-r--r--nbxmpp/dispatcher.py814
-rw-r--r--nbxmpp/examples/client.py281
-rw-r--r--nbxmpp/examples/client.ui469
-rw-r--r--nbxmpp/exceptions.py24
-rw-r--r--nbxmpp/idlequeue.py4
-rw-r--r--nbxmpp/modules/activity.py2
-rw-r--r--nbxmpp/modules/attention.py2
-rw-r--r--nbxmpp/modules/bookmarks.py6
-rw-r--r--nbxmpp/modules/captcha.py2
-rw-r--r--nbxmpp/modules/chat_markers.py2
-rw-r--r--nbxmpp/modules/chatstates.py2
-rw-r--r--nbxmpp/modules/correction.py2
-rw-r--r--nbxmpp/modules/delay.py4
-rw-r--r--nbxmpp/modules/eme.py2
-rw-r--r--nbxmpp/modules/entity_caps.py2
-rw-r--r--nbxmpp/modules/http_auth.py2
-rw-r--r--nbxmpp/modules/ibb.py21
-rw-r--r--nbxmpp/modules/idle.py2
-rw-r--r--nbxmpp/modules/iq.py4
-rw-r--r--nbxmpp/modules/location.py2
-rw-r--r--nbxmpp/modules/message.py4
-rw-r--r--nbxmpp/modules/mood.py2
-rw-r--r--nbxmpp/modules/muc.py26
-rw-r--r--nbxmpp/modules/muclumbus.py26
-rw-r--r--nbxmpp/modules/nickname.py4
-rw-r--r--nbxmpp/modules/omemo.py4
-rw-r--r--nbxmpp/modules/oob.py2
-rw-r--r--nbxmpp/modules/openpgp.py4
-rw-r--r--nbxmpp/modules/pgplegacy.py4
-rw-r--r--nbxmpp/modules/presence.py4
-rw-r--r--nbxmpp/modules/pubsub.py2
-rw-r--r--nbxmpp/modules/receipts.py2
-rw-r--r--nbxmpp/modules/security_labels.py2
-rw-r--r--nbxmpp/modules/software_version.py2
-rw-r--r--nbxmpp/modules/tune.py2
-rw-r--r--nbxmpp/modules/user_avatar.py2
-rw-r--r--nbxmpp/modules/vcard_avatar.py2
-rw-r--r--nbxmpp/old_dispatcher.py758
-rw-r--r--nbxmpp/protocol.py134
-rw-r--r--nbxmpp/resolver.py150
-rw-r--r--nbxmpp/simplexml.py27
-rw-r--r--nbxmpp/smacks.py114
-rw-r--r--nbxmpp/structs.py75
-rw-r--r--nbxmpp/tcp.py335
-rw-r--r--nbxmpp/util.py156
-rw-r--r--nbxmpp/websocket.py182
52 files changed, 4310 insertions, 896 deletions
diff --git a/nbxmpp/__init__.py b/nbxmpp/__init__.py
index f2cbf11..18e8461 100644
--- a/nbxmpp/__init__.py
+++ b/nbxmpp/__init__.py
@@ -1,18 +1,3 @@
-"""
-This is a fork of the xmpppy jabber python library. Most of the code is
-inherited but has been extended by implementation of non-blocking transports
-and new features like BOSH.
-
-Most of the xmpp classes are ancestors of PlugIn class to share a single set of methods in order to compile a featured and extensible XMPP client.
-
-Thanks and credits to the xmpppy developers. See: http://xmpppy.sourceforge.net/
-"""
-
from .protocol import *
-from . import simplexml, protocol, auth, transports, roster
-from . import dispatcher, features, idlequeue, bosh, tls, proxy_connectors
-from .client import NonBlockingClient
-from .plugin import PlugIn
-from .smacks import Smacks
__version__ = "0.9.93"
diff --git a/nbxmpp/addresses.py b/nbxmpp/addresses.py
new file mode 100644
index 0000000..7131778
--- /dev/null
+++ b/nbxmpp/addresses.py
@@ -0,0 +1,233 @@
+# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com>
+#
+# This file is part of nbxmpp.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 3
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+from collections import namedtuple
+
+from nbxmpp.util import Observable
+from nbxmpp.resolver import GioResolver
+from nbxmpp.const import ConnectionType
+from nbxmpp.const import ConnectionProtocol
+
+
+log = logging.getLogger('nbxmpp.addresses')
+
+
+class ServerAddress(namedtuple('ServerAddress', 'domain service host uri '
+ 'protocol type proxy')):
+
+ __slots__ = []
+
+ @property
+ def is_service(self):
+ return self.service is not None
+
+ @property
+ def is_host(self):
+ return self.host is not None
+
+ @property
+ def is_uri(self):
+ return self.uri is not None
+
+ def has_proxy(self):
+ return self.proxy is not None
+
+
+class ServerAddresses(Observable):
+ '''
+ Signals:
+
+ resolved
+
+ '''
+
+ def __init__(self, domain):
+ Observable.__init__(self, log)
+
+ self._domain = domain
+ self._custom_host = None
+ self._proxy = None
+ self._is_resolved = False
+
+ self._addresses = [
+ ServerAddress(domain=self._domain,
+ service='xmpps-client',
+ host=None,
+ uri=None,
+ protocol=ConnectionProtocol.TCP,
+ type=ConnectionType.DIRECT_TLS,
+ proxy=None),
+
+ ServerAddress(domain=self._domain,
+ service='xmpp-client',
+ host=None,
+ uri=None,
+ protocol=ConnectionProtocol.TCP,
+ type=ConnectionType.START_TLS,
+ proxy=None),
+
+ ServerAddress(domain=self._domain,
+ service='xmpp-client',
+ host=None,
+ uri=None,
+ protocol=ConnectionProtocol.TCP,
+ type=ConnectionType.PLAIN,
+ proxy=None)
+ ]
+
+ self._fallback_addresses = [
+ ServerAddress(domain=self._domain,
+ service=None,
+ host='%s:%s' % (self._domain, 5222),
+ uri=None,
+ protocol=ConnectionProtocol.TCP,
+ type=ConnectionType.START_TLS,
+ proxy=None),
+
+ ServerAddress(domain=self._domain,
+ service=None,
+ host='%s:%s' % (self._domain, 5222),
+ uri=None,
+ protocol=ConnectionProtocol.TCP,
+ type=ConnectionType.PLAIN,
+ proxy=None)
+ ]
+
+ @property
+ def domain(self):
+ return self._domain
+
+ @property
+ def is_resolved(self):
+ return self._is_resolved
+
+ def resolve(self):
+ if self._is_resolved:
+ self._on_request_resolved()
+ return
+
+ if self._proxy is not None:
+ # Let the proxy resolve the domain
+ self._on_request_resolved()
+ return
+
+ if self._custom_host is not None:
+ self._on_request_resolved()
+ return
+
+ GioResolver().resolve_alternatives(self._domain,
+ self._on_alternatives_result)
+
+ def cancel_resolve(self):
+ self.remove_subscriptions()
+
+ def set_custom_host(self, address):
+ # Set a custom host, overwrites all other addresses
+ self._custom_host = address
+ if address is None:
+ return
+
+ host, protocol, type_ = address
+
+ self._fallback_addresses = []
+ self._addresses = [
+ ServerAddress(domain=self._domain,
+ service=None,
+ host=host,
+ uri=None,
+ protocol=protocol,
+ type=type_,
+ proxy=None)]
+
+ def set_proxy(self, proxy):
+ self._proxy = proxy
+
+ def _on_alternatives_result(self, uri):
+ if uri is None:
+ self._on_request_resolved()
+ return
+
+ if uri.startswith('wss'):
+ type_ = ConnectionType.DIRECT_TLS
+ elif uri.startswith('ws'):
+ type_ = ConnectionType.PLAIN
+ else:
+ log.warning('Invalid websocket uri: %s', uri)
+ return
+
+ addr = ServerAddress(domain=self._domain,
+ service=None,
+ host=None,
+ uri=uri,
+ protocol=ConnectionProtocol.WEBSOCKET,
+ type=type_,
+ proxy=None)
+ self._addresses.append(addr)
+
+ self._on_request_resolved()
+
+ def _on_request_resolved(self):
+ self._is_resolved = True
+ self.notify('resolved')
+ self.remove_subscriptions()
+
+ def get_next_address(self,
+ allowed_types,
+ allowed_protocols):
+ '''
+ Selects next address
+ '''
+
+ for addr in self._filter_allowed(self._addresses,
+ allowed_types,
+ allowed_protocols):
+ yield self._assure_proxy(addr)
+
+ for addr in self._filter_allowed(self._fallback_addresses,
+ allowed_types,
+ allowed_protocols):
+ yield self._assure_proxy(addr)
+
+ raise NoMoreAddresses
+
+ def _assure_proxy(self, addr):
+ if self._proxy is None:
+ return addr
+
+ if addr.protocol == ConnectionProtocol.TCP:
+ return addr._replace(proxy=self._proxy)
+
+ return addr
+
+ def _filter_allowed(self, addresses, allowed_types, allowed_protocols):
+ if self._proxy is not None:
+ addresses = filter(lambda addr: addr.host is not None, addresses)
+
+ addresses = filter(lambda addr: addr.type in allowed_types,
+ addresses)
+ addresses = filter(lambda addr: addr.protocol in allowed_protocols,
+ addresses)
+ return addresses
+
+ def __str__(self):
+ addresses = self._addresses + self._fallback_addresses
+ return '\n'.join([str(addr) for addr in addresses])
+
+
+class NoMoreAddresses(Exception):
+ pass
diff --git a/nbxmpp/auth.py b/nbxmpp/auth.py
index b46daf8..e279c41 100644
--- a/nbxmpp/auth.py
+++ b/nbxmpp/auth.py
@@ -1,10 +1,10 @@
-# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com>
+# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com>
#
# This file is part of nbxmpp.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
+# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
@@ -21,18 +21,15 @@ import binascii
import logging
import hashlib
from hashlib import pbkdf2_hmac
-from functools import partial
-from nbxmpp.plugin import PlugIn
from nbxmpp.protocol import NS_SASL
from nbxmpp.protocol import Node
-from nbxmpp.protocol import NodeProcessed
from nbxmpp.protocol import SASL_ERROR_CONDITIONS
from nbxmpp.protocol import SASL_AUTH_MECHS
-from nbxmpp.protocol import NS_DOMAIN_BASED_NAME
from nbxmpp.util import b64decode
from nbxmpp.util import b64encode
from nbxmpp.const import GSSAPIState
+from nbxmpp.const import StreamState
log = logging.getLogger('nbxmpp.auth')
@@ -44,174 +41,124 @@ except ImportError:
KERBEROS_AVAILABLE = False
-class SASL(PlugIn):
+class SASL:
"""
- Implements SASL authentication. Can be plugged into NonBlockingClient
- to start authentication
+ Implements SASL authentication.
"""
-
- _default_mechs = set(['SCRAM-SHA-256-PLUS',
- 'SCRAM-SHA-256',
- 'SCRAM-SHA-1-PLUS',
- 'SCRAM-SHA-1',
- 'PLAIN'])
-
- def __init__(self, username, auth_mechs, get_password, on_finished):
+ def __init__(self, client):
"""
- :param username: XMPP username
- :param auth_mechs: Set of valid authentication mechanisms.
- Possible entries are:
- 'ANONYMOUS', 'EXTERNAL', 'GSSAPI', 'SCRAM-SHA-1-PLUS',
- 'SCRAM-SHA-1', 'SCRAM-SHA-256', 'SCRAM-SHA-256-PLUS', 'PLAIN'
- :param on_finished: Callback after SASL is finished
- :param get_password: Callback that must return the password for the
- chosen mechanism
+ :param client: Client object
"""
- PlugIn.__init__(self)
- self.username = username
- self._on_finished = on_finished
- self._get_password = get_password
-
- self._prefered_mechs = auth_mechs
- self._enabled_mechs = self._prefered_mechs or self._default_mechs
- self._chosen_mechanism = None
- self._method = None
+ self._client = client
- self._channel_binding = None
- self._domain_based_name = None
-
- def _setup_mechs(self):
- if self._owner.connected in ('ssl', 'tls'):
- if self._owner.protocol_type == 'BOSH':
- # PLUS would break if the server uses any kind of reverse proxy
- self._enabled_mechs.discard('SCRAM-SHA-1-PLUS')
- self._enabled_mechs.discard('SCRAM-SHA-256-PLUS')
- else:
- self._channel_binding = self._owner.Connection.NonBlockingTLS.get_channel_binding()
- # TLS handshake is finished so channel binding data muss exist
- if self._channel_binding is None:
- raise ValueError('No channel binding data found')
-
- else:
- self._enabled_mechs.discard('SCRAM-SHA-1-PLUS')
- self._enabled_mechs.discard('SCRAM-SHA-256-PLUS')
- if self._prefered_mechs is None:
- # if the client didnt specify any auth mechs avoid
- # sending the password over a plain connection
- self._enabled_mechs.discard('PLAIN')
+ self._password = None
- if not KERBEROS_AVAILABLE:
- self._enabled_mechs.discard('GSSAPI')
+ self._allowed_mechs = None
+ self._enabled_mechs = None
+ self._method = None
+ self._error = None
- def plugin(self, _owner):
- self._setup_mechs()
- self._owner.RegisterHandler(
- 'challenge', self._on_challenge, xmlns=NS_SASL)
- self._owner.RegisterHandler(
- 'failure', self._on_failure, xmlns=NS_SASL)
- self._owner.RegisterHandler(
- 'success', self._on_success, xmlns=NS_SASL)
+ @property
+ def error(self):
+ return self._error
- # Execute the Handler manually, we already received the features
- self._on_features(None, self._owner.Dispatcher.Stream.features)
+ def set_password(self, password):
+ self._password = password
- def plugout(self):
- """
- Remove SASL handlers from owner's dispatcher. Used internally
- """
- self._owner.UnregisterHandler(
- 'challenge', self._on_challenge, xmlns=NS_SASL)
- self._owner.UnregisterHandler(
- 'failure', self._on_failure, xmlns=NS_SASL)
- self._owner.UnregisterHandler(
- 'success', self._on_success, xmlns=NS_SASL)
-
- def _on_features(self, _con, stanza):
- """
- Used to determine if server supports SASL auth. Used internally
- """
- if not stanza.getTag('mechanisms', namespace=NS_SASL):
+ def delegate(self, stanza):
+ if stanza.getNamespace() != NS_SASL:
return
+ if stanza.getName() == 'challenge':
+ self._on_challenge(stanza)
+ elif stanza.getName() == 'failure':
+ self._on_failure(stanza)
+ elif stanza.getName() == 'success':
+ self._on_success(stanza)
+
+ def start_auth(self, features):
+ self._allowed_mechs = self._client.mechs
+ self._enabled_mechs = self._allowed_mechs
+ self._method = None
+ self._error = None
- mechanisms = stanza.getTag('mechanisms', namespace=NS_SASL)
- mechanisms = mechanisms.getTags('mechanism')
+ # -PLUS variants need TLS channel binding data
+ # This is currently not supported via GLib
+ self._enabled_mechs.discard('SCRAM-SHA-1-PLUS')
+ self._enabled_mechs.discard('SCRAM-SHA-256-PLUS')
+ # channel_binding_data = None
- mechs = set(mech.getData() for mech in mechanisms)
- available_mechs = mechs & self._enabled_mechs
+ if not KERBEROS_AVAILABLE:
+ self._enabled_mechs.discard('GSSAPI')
+ available_mechs = features.get_mechs() & self._enabled_mechs
log.info('Available mechanisms: %s', available_mechs)
- hostname = stanza.getTag('hostname', namespace=NS_DOMAIN_BASED_NAME)
- if hostname is not None:
- self._domain_based_name = hostname.getData()
- log.info('Found domain based name: %s', self._domain_based_name)
+ domain_based_name = features.get_domain_based_name()
+ if domain_based_name is not None:
+ log.info('Found domain based name: %s', domain_based_name)
if not available_mechs:
log.error('No available auth mechanisms found')
self._abort_auth('invalid-mechanism')
return
+ chosen_mechanism = None
for mech in SASL_AUTH_MECHS:
if mech in available_mechs:
- self._chosen_mechanism = mech
+ chosen_mechanism = mech
break
- if self._chosen_mechanism is None:
+ if chosen_mechanism is None:
log.error('No available auth mechanisms found')
self._abort_auth('invalid-mechanism')
return
- log.info('Chosen auth mechanism: %s', self._chosen_mechanism)
- self._auth()
+ log.info('Chosen auth mechanism: %s', chosen_mechanism)
- def _auth(self):
- password_cb = partial(self._on_password, self.username)
+ if chosen_mechanism in ('SCRAM-SHA-256', 'SCRAM-SHA-1', 'PLAIN'):
+ if not self._password:
+ self._on_sasl_finished(False, 'no-password')
+ return
- if self._chosen_mechanism == 'SCRAM-SHA-256-PLUS':
- self._method = SCRAM_SHA_256_PLUS(self._owner.Connection,
- self._channel_binding)
- self._get_password(self._chosen_mechanism, password_cb)
+ # if chosen_mechanism == 'SCRAM-SHA-256-PLUS':
+ # self._method = SCRAM_SHA_256_PLUS(self._client,
+ # channel_binding_data)
+ # self._method.initiate(self._client.username, self._password)
- elif self._chosen_mechanism == 'SCRAM-SHA-256':
- self._method = SCRAM_SHA_256(self._owner.Connection, None)
- self._get_password(self._chosen_mechanism, password_cb)
+ # elif chosen_mechanism == 'SCRAM-SHA-1-PLUS':
+ # self._method = SCRAM_SHA_1_PLUS(self._client,
+ # channel_binding_data)
+ # self._method.initiate(self._client.username, self._password)
- elif self._chosen_mechanism == 'SCRAM-SHA-1-PLUS':
- self._method = SCRAM_SHA_1_PLUS(self._owner.Connection,
- self._channel_binding)
- self._get_password(self._chosen_mechanism, password_cb)
+ if chosen_mechanism == 'SCRAM-SHA-256':
+ self._method = SCRAM_SHA_256(self._client, None)
+ self._method.initiate(self._client.username, self._password)
- elif self._chosen_mechanism == 'SCRAM-SHA-1':
- self._method = SCRAM_SHA_1(self._owner.Connection, None)
- self._get_password(self._chosen_mechanism, password_cb)
+ elif chosen_mechanism == 'SCRAM-SHA-1':
+ self._method = SCRAM_SHA_1(self._client, None)
+ self._method.initiate(self._client.username, self._password)
- elif self._chosen_mechanism == 'PLAIN':
- self._method = PLAIN(self._owner.Connection)
- self._get_password(self._chosen_mechanism, password_cb)
+ elif chosen_mechanism == 'PLAIN':
+ self._method = PLAIN(self._client)
+ self._method.initiate(self._client.username, self._password)
- elif self._chosen_mechanism == 'ANONYMOUS':
- self._method = ANONYMOUS(self._owner.Connection)
+ elif chosen_mechanism == 'ANONYMOUS':
+ self._method = ANONYMOUS(self._client)
self._method.initiate()
- elif self._chosen_mechanism == 'EXTERNAL':
- self._method = EXTERNAL(self._owner.Connection)
- self._method.initiate(self.username, self._owner.Server)
+ elif chosen_mechanism == 'EXTERNAL':
+ self._method = EXTERNAL(self._client)
+ self._method.initiate(self._client.username, self._client.Server)
- elif self._chosen_mechanism == 'GSSAPI':
- self._method = GSSAPI(self._owner.Connection)
- self._method.initiate(self._domain_based_name or
- self._owner.xmpp_hostname)
+ elif chosen_mechanism == 'GSSAPI':
+ self._method = GSSAPI(self._client)
+ self._method.initiate(domain_based_name or
+ self._client.domain)
else:
log.error('Unknown auth mech')
- def _on_password(self, username, password):
- if password is None:
- log.warning('No password supplied')
- return
- self._method.initiate(username, password)
-
- def _on_challenge(self, _con, stanza):
+ def _on_challenge(self, stanza):
try:
self._method.response(stanza.getData())
except AttributeError:
@@ -220,9 +167,8 @@ class SASL(PlugIn):
except AuthFail as error:
log.error(error)
self._abort_auth()
- raise NodeProcessed
- def _on_success(self, _con, stanza):
+ def _on_success(self, stanza):
log.info('Successfully authenticated with remote server')
try:
self._method.success(stanza.getData())
@@ -231,14 +177,13 @@ class SASL(PlugIn):
except AuthFail as error:
log.error(error)
self._abort_auth()
- raise NodeProcessed
+ return
- self._on_finished(True, None, None)
- raise NodeProcessed
+ self._on_sasl_finished(True, None, None)
- def _on_failure(self, _con, stanza):
+ def _on_failure(self, stanza):
text = stanza.getTagData('text')
- reason = 'not-authorized'
+ reason = 'unknown-error'
childs = stanza.getChildren()
for child in childs:
name = child.getName()
@@ -250,63 +195,68 @@ class SASL(PlugIn):
log.info('Failed SASL authentification: %s %s', reason, text)
self._abort_auth(reason, text)
- raise NodeProcessed
def _abort_auth(self, reason='malformed-request', text=None):
node = Node('abort', attrs={'xmlns': NS_SASL})
- self._owner.send(node)
- self._owner.Connection.start_disconnect()
- self._on_finished(False, reason, text)
+ self._client.send_nonza(node)
+ self._on_sasl_finished(False, reason, text)
+
+ def _on_sasl_finished(self, successful, reason, text=None):
+ if not successful:
+ self._error = (reason, text)
+ self._client.set_state(StreamState.AUTH_FAILED)
+ else:
+ self._client.set_state(StreamState.AUTH_SUCCESSFUL)
class PLAIN:
_mechanism = 'PLAIN'
- def __init__(self, con):
- self._con = con
+ def __init__(self, client):
+ self._client = client
def initiate(self, username, password):
payload = b64encode('\x00%s\x00%s' % (username, password))
node = Node('auth',
attrs={'xmlns': NS_SASL, 'mechanism': 'PLAIN'},
payload=[payload])
- self._con.send(node)
+ self._client.send_nonza(node)
class EXTERNAL:
_mechanism = 'EXTERNAL'
- def __init__(self, con):
- self._con = con
+ def __init__(self, client):
+ self._client = client
def initiate(self, username, server):
payload = b64encode('%s@%s' % (username, server))
node = Node('auth',
attrs={'xmlns': NS_SASL, 'mechanism': 'EXTERNAL'},
payload=[payload])
- self._con.send(node)
+ self._client.send_nonza(node)
class ANONYMOUS:
_mechanism = 'ANONYMOUS'
- def __init__(self, con):
- self._con = con
+ def __init__(self, client):
+ self._client = client
def initiate(self):
node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'ANONYMOUS'})
- self._con.send(node)
+ self._client.send_nonza(node)
class GSSAPI:
_mechanism = 'GSSAPI'
- def __init__(self, con):
- self._con = con
+ def __init__(self, client):
+ self._client = client
self._gss_vc = None
self._state = GSSAPIState.STEP
@@ -317,7 +267,7 @@ class GSSAPI:
node = Node('auth',
attrs={'xmlns': NS_SASL, 'mechanism': 'GSSAPI'},
payload=(response or ''))
- self._con.send(node)
+ self._client.send_nonza(node)
def response(self, server_message, *args, **kwargs):
server_message = b64decode(server_message, bytes)
@@ -339,7 +289,7 @@ class GSSAPI:
node = Node('response',
attrs={'xmlns': NS_SASL},
payload=response)
- self._con.send(node)
+ self._client.send_nonza(node)
class SCRAM:
@@ -348,8 +298,8 @@ class SCRAM:
_channel_binding = ''
_hash_method = ''
- def __init__(self, con, channel_binding):
- self._con = con
+ def __init__(self, client, channel_binding):
+ self._client = client
self._channel_binding_data = channel_binding
self._client_nonce = '%x' % int(binascii.hexlify(os.urandom(24)), 16)
self._client_first_message_bare = None
@@ -377,11 +327,12 @@ class SCRAM:
self._client_nonce)
client_first_message = '%s%s' % (self._channel_binding,
self._client_first_message_bare)
+
payload = b64encode(client_first_message)
node = Node('auth',
attrs={'xmlns': NS_SASL, 'mechanism': self._mechanism},
payload=[payload])
- self._con.send(node)
+ self._client.send_nonza(node)
def response(self, server_first_message):
server_first_message = b64decode(server_first_message)
@@ -429,7 +380,7 @@ class SCRAM:
node = Node('response',
attrs={'xmlns': NS_SASL},
payload=[payload])
- self._con.send(node)
+ self._client.send_nonza(node)
def success(self, server_last_message):
server_last_message = b64decode(server_last_message)
@@ -454,7 +405,7 @@ class SCRAM:
class SCRAM_SHA_1(SCRAM):
_mechanism = 'SCRAM-SHA-1'
- _channel_binding = 'y,,'
+ _channel_binding = 'n,,'
_hash_method = 'sha1'
@@ -467,7 +418,7 @@ class SCRAM_SHA_1_PLUS(SCRAM_SHA_1):
class SCRAM_SHA_256(SCRAM):
_mechanism = 'SCRAM-SHA-256'
- _channel_binding = 'y,,'
+ _channel_binding = 'n,,'
_hash_method = 'sha256'
diff --git a/nbxmpp/client.py b/nbxmpp/client.py
new file mode 100644
index 0000000..9583aad
--- /dev/null
+++ b/nbxmpp/client.py
@@ -0,0 +1,757 @@
+# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com>
+#
+# This file is part of nbxmpp.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 3
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+
+from gi.repository import GLib
+
+from nbxmpp.protocol import NS_TLS
+from nbxmpp.protocol import NS_PING
+from nbxmpp.protocol import Features
+from nbxmpp.protocol import StanzaMalformed
+from nbxmpp.protocol import SessionRequest
+from nbxmpp.protocol import BindRequest
+from nbxmpp.protocol import TLSRequest
+from nbxmpp.protocol import isResultNode
+from nbxmpp.protocol import JID
+from nbxmpp.protocol import Iq
+from nbxmpp.protocol import Protocol
+from nbxmpp.protocol import WebsocketCloseHeader
+from nbxmpp.addresses import ServerAddresses
+from nbxmpp.addresses import NoMoreAddresses
+from nbxmpp.tcp import TCPConnection
+from nbxmpp.websocket import WebsocketConnection
+from nbxmpp.smacks import Smacks
+from nbxmpp.auth import SASL
+from nbxmpp.const import StreamState
+from nbxmpp.const import StreamError
+from nbxmpp.const import ConnectionType
+from nbxmpp.const import ConnectionProtocol
+from nbxmpp.const import Mode
+from nbxmpp.dispatcher import StanzaDispatcher
+from nbxmpp.util import get_stream_header
+from nbxmpp.util import get_stanza_id
+from nbxmpp.util import Observable
+from nbxmpp.util import validate_stream_header
+from nbxmpp.util import is_error_result
+
+log = logging.getLogger('nbxmpp.stream')
+
+# TODO: check if signals make sense
+
+
+class Client(Observable):
+ def __init__(self):
+ '''
+ Signals:
+ resume-failed
+ resume-successful
+ login-successful
+ disconnected
+ connected
+ connection-failed
+ stanza-sent
+ stanza-received
+ '''
+ Observable.__init__(self, log)
+ self._jid = None
+ self._lang = 'en'
+ self._domain = None
+ self._username = None
+ self._resource = None
+
+ self._custom_host = None
+
+ self._addresses = None
+ self._current_address = None
+ self._address_generator = None
+
+ self._client_cert = None
+ self._client_cert_pass = None
+ self._proxy = None
+
+ self._allowed_con_types = None
+ self._allowed_protocols = None
+ self._allowed_mechs = None
+
+ self._stream_id = None
+ self._stream_secure = False
+ self._stream_authenticated = False
+ self._stream_features = None
+ self._session_required = False
+ self._connect_successful = False
+ self._stream_close_initiated = False
+ self._error = None, None, None
+
+ self._ignored_tls_errors = []
+ self._ignore_tls_errors = False
+ self._accepted_certificates = []
+ self._peer_certificate = None
+ self._peer_certificate_errors = None
+
+ self._con = None
+ self._mode = Mode.CLIENT
+
+ self._ping_source_id = None
+
+ self._dispatcher = StanzaDispatcher(self)
+ self._dispatcher.subscribe('before-dispatch', self._on_before_dispatch)
+ self._dispatcher.subscribe('parsing-error', self._on_parsing_error)
+ self._dispatcher.subscribe('stream-end', self._on_stream_end)
+
+ self._smacks = Smacks(self)
+ self._sasl = SASL(self)
+
+ self._state = StreamState.DISCONNECTED
+
+ @property
+ def features(self):
+ return self._stream_features
+
+ @property
+ def sm_supported(self):
+ return self._smacks.sm_supported
+
+ @property
+ def lang(self):
+ return self._lang
+
+ @property
+ def username(self):
+ return self._username
+
+ @property
+ def domain(self):
+ return self._domain
+
+ @property
+ def resource(self):
+ return self._resource
+
+ def set_username(self, username):
+ self._username = username
+
+ def set_domain(self, domain):
+ self._domain = domain
+
+ def set_resource(self, resource):
+ self._resource = resource
+
+ def set_mode(self, mode):
+ self._mode = mode
+
+ def set_custom_host(self, host, protocol, type_):
+ if self._domain is None:
+ raise ValueError('Call set_domain() first before set_custom_host()')
+ self._custom_host = (host, protocol, type_)
+
+ def set_accepted_certificates(self, certificates):
+ self._accepted_certificates = certificates
+
+ @property
+ def ignored_tls_errors(self):
+ return self._ignored_tls_errors
+
+ def set_ignored_tls_errors(self, errors):
+ self._ignored_tls_errors = errors
+
+ @property
+ def ignore_tls_errors(self):
+ return self._ignore_tls_errors
+
+ def set_ignore_tls_errors(self, ignore):
+ self._ignore_tls_errors = ignore
+
+ def set_password(self, password):
+ self._sasl.set_password(password)
+
+ @property
+ def peer_certificate(self):
+ return self._peer_certificate, self._peer_certificate_errors
+
+ @property
+ def current_connection_type(self):
+ return self._current_address.type
+
+ @property
+ def is_websocket(self):
+ return self._current_address.protocol == ConnectionProtocol.WEBSOCKET
+
+ @property
+ def stream_id(self):
+ return self._stream_id
+
+ @property
+ def is_stream_secure(self):
+ direct_tls = self.current_connection_type == ConnectionType.DIRECT_TLS
+ return self._stream_secure or direct_tls
+
+ @property
+ def is_stream_authenticated(self):
+ return self._stream_authenticated
+
+ @property
+ def state(self):
+ return self._state
+
+ @state.setter
+ def state(self, value):
+ self._state = value
+ log.info('Set state: %s', value)
+
+ def set_state(self, state):
+ self.state = state
+ self._xmpp_state_machine()
+
+ @property
+ def connection_types(self):
+ return list(self._allowed_con_types or [ConnectionType.DIRECT_TLS,
+ ConnectionType.START_TLS])
+
+ def set_connection_types(self, con_types):
+ self._allowed_con_types = con_types
+
+ @property
+ def mechs(self):
+ return set(self._allowed_mechs or set(['SCRAM-SHA-256',
+ 'SCRAM-SHA-1',
+ # 'SCRAM-SHA-256-PLUS',
+ # 'SCRAM-SHA-1-PLUS',
+ 'PLAIN']))
+
+ def set_mechs(self, mechs):
+ self._allowed_mechs = mechs
+
+ @property
+ def protocols(self):
+ return list(self._allowed_protocols or [ConnectionProtocol.TCP,
+ ConnectionProtocol.WEBSOCKET])
+
+ def set_protocols(self, protocols):
+ self._allowed_protocols = protocols
+
+ @property
+ def client_cert(self):
+ return self._client_cert, self._client_cert_pass
+
+ def set_client_cert(self, client_cert, client_cert_pass):
+ self._client_cert = client_cert
+ self._client_cert_pass = client_cert_pass
+
+ def set_proxy(self, proxy):
+ self._proxy = proxy
+ self._dispatcher.get_module('Muclumbus').set_proxy(proxy)
+
+ def get_bound_jid(self):
+ return self._jid
+
+ def _set_bound_jid(self, jid):
+ self._jid = JID(jid)
+
+ @property
+ def has_error(self):
+ return self._error[0] is not None
+
+ def get_error(self):
+ return self._error
+
+ def _reset_error(self):
+ self._error = None, None, None
+
+ def _set_error(self, domain, error, text=None):
+ log.info('Set error: %s, %s, %s', domain, error, text)
+ self._error = domain, error, text
+
+ def _connect(self):
+ if self._state not in (StreamState.DISCONNECTED, StreamState.RESOLVED):
+ log.error('Stream can\'t connect, stream state: %s', self._state)
+ return
+
+ self.state = StreamState.CONNECTING
+ self._reset_error()
+
+ self._con = self._get_connection(self._current_address,
+ self._accepted_certificates,
+ self._ignore_tls_errors,
+ self._ignored_tls_errors,
+ self.client_cert)
+
+ self._con.subscribe('connected', self._on_connected)
+ self._con.subscribe('connection-failed', self._on_connection_failed)
+ self._con.subscribe('disconnected', self._on_disconnected)
+ self._con.subscribe('data-sent', self._on_data_sent)
+ self._con.subscribe('data-received', self._on_data_received)
+ self._con.subscribe('bad-certificate', self._on_bad_certificate)
+ self._con.subscribe('certificate-set', self._on_certificate_set)
+ self._con.connect()
+
+ def _get_connection(self, *args):
+ if self.is_websocket:
+ return WebsocketConnection(*args)
+ return TCPConnection(*args)
+
+ def connect(self):
+ if self._state != StreamState.DISCONNECTED:
+ log.error('Stream can\'t reconnect, stream state: %s', self._state)
+ return
+
+ if self._connect_successful:
+ log.info('Reconnect')
+ self._connect()
+ return
+
+ log.info('Connect')
+ self._reset_error()
+ self.state = StreamState.RESOLVE
+
+ self._addresses = ServerAddresses(self._domain)
+ self._addresses.set_custom_host(self._custom_host)
+ self._addresses.set_proxy(self._proxy)
+ self._addresses.subscribe('resolved', self._on_addresses_resolved)
+ self._addresses.resolve()
+
+ def _on_addresses_resolved(self, _addresses, _signal_name):
+ log.info('Domain resolved')
+ log.info(self._addresses)
+ self.state = StreamState.RESOLVED
+ self._address_generator = self._addresses.get_next_address(
+ self.connection_types,
+ self.protocols)
+
+ self._try_next_ip()
+
+ def _try_next_ip(self, *args):
+ try:
+ self._current_address = next(self._address_generator)
+ except NoMoreAddresses:
+ self._current_address = None
+ self.state = StreamState.DISCONNECTED
+ log.error('Unable to connect to %s', self._addresses.domain)
+ self._set_error(StreamError.CONNECTION_FAILED,
+ 'connection-failed',
+ 'Unable to connect to %s' % self._addresses.domain)
+ self.notify('connection-failed')
+ return
+
+ log.info('Current address: %s', self._current_address)
+ self._connect()
+
+ def disconnect(self, immediate=False):
+ if self._state == StreamState.RESOLVE:
+ self._addresses.cancel_resolve()
+ self.state = StreamState.DISCONNECTED
+ return
+
+ if self._state == StreamState.CONNECTING:
+ self._disconnect()
+ return
+
+ if self._state in (StreamState.DISCONNECTED,
+ StreamState.DISCONNECTING):
+ log.warning('Stream can\'t disconnect, stream state: %s',
+ self._state)
+ return
+
+ self._disconnect(immediate=immediate)
+
+ def _disconnect(self, immediate=True):
+ self.state = StreamState.DISCONNECTING
+ self._remove_ping_timer()
+ if not immediate:
+ self._stream_close_initiated = True
+ self._smacks.close_session()
+ self._end_stream()
+ self._con.shutdown_output()
+ else:
+ self._con.disconnect()
+
+ def send(self, stanza, *args, **kwargs):
+ # Alias for backwards compat
+ return self.send_stanza(stanza)
+
+ def _on_connected(self, _connection, _signal_name):
+ self._connect_successful = True
+ self.set_state(StreamState.CONNECTED)
+
+ def _on_disconnected(self, _connection, _signal_name):
+ self.state = StreamState.DISCONNECTED
+ self._reset_stream()
+ self.notify('disconnected')
+
+ def _on_connection_failed(self, _connection, _signal_name):
+ self.state = StreamState.DISCONNECTED
+ self._reset_stream()
+ if not self._connect_successful:
+ self._try_next_ip()
+ else:
+ self._set_error(StreamError.CONNECTION_FAILED,
+ 'connection-failed',
+ 'Unable to connect to last successful address: %s' % str(self._current_address))
+ self.notify('connection-failed')
+
+ def _disconnect_with_error(self, error_domain, error, text=None):
+ self._set_error(error_domain, error, text)
+ self.disconnect()
+
+ def _on_parsing_error(self, _dispatcher, _signal_name, error):
+ self._disconnect_with_error(StreamError.PARSING, 'parsing-error', error)
+
+ def _on_stream_end(self, _dispatcher, _signal_name, error):
+ if not self.has_error:
+ self._set_error(StreamError.STREAM, 'stream-end', error)
+
+ self._con.shutdown_input()
+ if not self._stream_close_initiated:
+ self.state = StreamState.DISCONNECTING
+ self._smacks.close_session()
+ self._end_stream()
+ self._con.shutdown_output()
+
+ def _reset_stream(self):
+ self._stream_id = None
+ self._stream_secure = False
+ self._stream_authenticated = False
+ self._stream_features = None
+ self._session_required = False
+ self._con = None
+
+ def _end_stream(self):
+ if self.is_websocket:
+ nonza = WebsocketCloseHeader()
+ else:
+ nonza = '</stream:stream>'
+ self.send_nonza(nonza)
+
+ def get_module(self, name):
+ return self._dispatcher.get_module(name)
+
+ def _on_bad_certificate(self, connection, _signal_name):
+ self._peer_certificate, self._peer_certificate_errors = connection.peer_certificate
+ self._set_error(StreamError.BAD_CERTIFICATE, 'bad certificate')
+
+ def _on_certificate_set(self, connection, _signal_name):
+ self._peer_certificate, self._peer_certificate_errors = connection.peer_certificate
+
+ def accept_certificate(self):
+ log.info('Certificate accepted')
+ self._accepted_certificates.append(self._peer_certificate)
+ self._connect()
+
+ def _on_data_sent(self, _connection, _signal_name, data):
+ self.notify('stanza-sent', data)
+
+ def _on_before_dispatch(self, _dispatcher, _signal_name, data):
+ self.notify('stanza-received', data)
+
+ def _on_data_received(self, _connection, _signal_name, data):
+ self._dispatcher.process_data(data)
+ self._reset_ping_timer()
+
+ def _reset_ping_timer(self):
+ if self.is_websocket:
+ return
+
+ if not self._mode.is_client:
+ return
+
+ if self.state != StreamState.ACTIVE:
+ return
+
+ if self._ping_source_id is not None:
+ log.info('Remove ping timer')
+ GLib.source_remove(self._ping_source_id)
+ self._ping_source_id = None
+
+ log.info('Start ping timer')
+ self._ping_source_id = GLib.timeout_add_seconds(180, self._ping)
+
+ def _remove_ping_timer(self):
+ if self._ping_source_id is None:
+ return
+ log.info('Remove ping timer')
+ GLib.source_remove(self._ping_source_id)
+ self._ping_source_id = None
+
+ def send_stanza(self, stanza, now=False, callback=None,
+ timeout=None, user_data=None):
+ if user_data is not None and not isinstance(user_data, dict):
+ raise ValueError('arg user_data must be of dict type')
+
+ if not isinstance(stanza, Protocol):
+ raise ValueError('Nonzas not allowed, use send_nonza()')
+
+ id_ = stanza.getID()
+ if id_ is None:
+ id_ = get_stanza_id()
+ stanza.setID(id_)
+
+ if callback is not None:
+ self._dispatcher.add_callback_for_id(
+ id_, callback, timeout, user_data)
+ self._con.send(stanza, now)
+ self._smacks.save_in_queue(stanza)
+ return id_
+
+ def SendAndCallForResponse(self, stanza, callback, user_data=None):
+ self.send_stanza(stanza, callback=callback, user_data=user_data)
+
+ def send_nonza(self, nonza, now=False):
+ self._con.send(nonza, now)
+
+ def _xmpp_state_machine(self, stanza=None):
+ log.info('Execute state machine')
+ if stanza is not None:
+ if stanza.getName() == 'error':
+ log.info('Stream error')
+ # TODO:
+ # self._disconnect_with_error(StreamError.SASL,
+ # stanza.get_condition())
+ return
+
+ if self.state == StreamState.CONNECTED:
+ self._dispatcher.set_dispatch_callback(self._xmpp_state_machine)
+ if (self.current_connection_type == ConnectionType.DIRECT_TLS and
+ not self.is_websocket):
+ self._con.start_tls_negotiation()
+ self._stream_secure = True
+ self._start_stream()
+ return
+
+ self._start_stream()
+
+ elif self.state == StreamState.WAIT_FOR_STREAM_START:
+ try:
+ self._stream_id = validate_stream_header(stanza,
+ self._domain,
+ self.is_websocket)
+ except StanzaMalformed as error:
+ log.error(error)
+ self._disconnect_with_error(StreamError.STREAM,
+ 'stanza-malformed',
+ 'Invalid stream header')
+ return
+
+ self.state = StreamState.WAIT_FOR_FEATURES
+
+ elif self.state == StreamState.WAIT_FOR_FEATURES:
+ if stanza.getName() != 'features':
+ log.error('Invalid response: %s', stanza)
+ self._disconnect_with_error(
+ StreamError.STREAM,
+ 'stanza-malformed',
+ 'Invalid response, expected features')
+ return
+ self._on_stream_features(Features(stanza))
+
+ elif self.state == StreamState.WAIT_FOR_TLS_PROCEED:
+ if stanza.getNamespace() != NS_TLS:
+ self._disconnect_with_error(
+ StreamError.TLS,
+ 'stanza-malformed',
+ 'Invalid namespace for TLS response')
+ return
+
+ if stanza.getName() == 'failure':
+ self._disconnect_with_error(StreamError.TLS,
+ 'negotiation-failed')
+ return
+
+ if stanza.getName() == 'proceed':
+ self._con.start_tls_negotiation()
+ self._stream_secure = True
+ self._start_stream()
+ return
+
+ log.error('Invalid response')
+ self._disconnect_with_error(StreamError.TLS,
+ 'stanza-malformed',
+ 'Invalid TLS response')
+ return
+
+ elif self.state == StreamState.PROCEED_WITH_AUTH:
+ self._sasl.delegate(stanza)
+
+ elif self.state == StreamState.AUTH_SUCCESSFUL:
+ self._stream_authenticated = True
+ if self._mode.is_login_test:
+ self.notify('login-successful')
+ self.disconnect()
+ return
+
+ self._start_stream()
+
+ elif self.state == StreamState.AUTH_FAILED:
+ self._disconnect_with_error(StreamError.SASL,
+ *self._sasl.error)
+
+ elif self.state == StreamState.WAIT_FOR_BIND:
+ self._on_bind(stanza)
+
+ elif self.state == StreamState.BIND_FAILED:
+ self._disconnect_with_error(StreamError.BIND, 'bind-failed')
+
+ elif self.state == StreamState.BIND_SUCCESSFUL:
+ self._smacks.send_enable()
+ self._dispatcher.set_dispatch_callback(None)
+ self.state = StreamState.ACTIVE
+ self.notify('connected')
+
+ elif self.state == StreamState.WAIT_FOR_SESSION:
+ self._on_session(stanza)
+
+ elif self.state == StreamState.SESSION_FAILED:
+ self._disconnect_with_error(StreamError.SESSION, 'session-failed')
+
+ elif self.state == StreamState.WAIT_FOR_RESUMED:
+ self._smacks.delegate(stanza)
+
+ elif self.state == StreamState.RESUME_FAILED:
+ self.notify('resume-failed')
+ self._start_bind()
+
+ elif self.state == StreamState.RESUME_SUCCESSFUL:
+ self._dispatcher.set_dispatch_callback(None)
+ self.state = StreamState.ACTIVE
+ self.notify('resume-successful')
+
+ def _on_stream_features(self, features):
+ if self.is_stream_authenticated:
+ self._stream_features = features
+ self._smacks.sm_supported = features.has_sm()
+ self._session_required = features.session_required()
+ if self._smacks.resume_supported:
+ self._smacks.resume_request()
+ self.state = StreamState.WAIT_FOR_RESUMED
+ else:
+ self._start_bind()
+
+ elif self.is_stream_secure:
+ if self._mode.is_register:
+ if features.has_register:
+ self.state = StreamState.ACTIVE
+ self._dispatcher.set_dispatch_callback(None)
+ self.notify('connected')
+ else:
+ self._disconnect_with_error(StreamError.REGISTER,
+ 'register-not-supported')
+ return
+
+ self._start_auth(features)
+
+ else:
+ tls_supported, required = features.has_starttls()
+ if self._current_address.type == ConnectionType.PLAIN:
+ if tls_supported and required:
+ log.error('Server requires TLS')
+ self._disconnect_with_error(StreamError.TLS, 'tls-required')
+ return
+ self._start_auth(features)
+ return
+
+ if not tls_supported:
+ log.error('Server does not support TLS')
+ self._disconnect_with_error(StreamError.TLS,
+ 'tls-not-supported')
+ return
+ self._start_tls()
+
+ def _start_stream(self):
+ log.info('Start stream')
+ self._stream_id = None
+ self._dispatcher.reset_parser()
+ header = get_stream_header(self._domain, self._lang, self.is_websocket)
+ self.send_nonza(header)
+ self.state = StreamState.WAIT_FOR_STREAM_START
+
+ def _start_tls(self):
+ self.send_nonza(TLSRequest())
+ self.state = StreamState.WAIT_FOR_TLS_PROCEED
+
+ def _start_auth(self, features):
+ if not features.has_sasl():
+ log.error('Server does not support SASL')
+ self._disconnect_with_error(StreamError.SASL,
+ 'sasl-not-supported')
+ return
+ self.state = StreamState.PROCEED_WITH_AUTH
+ self._sasl.start_auth(features)
+
+ def _start_bind(self):
+ log.info('Send bind')
+ bind_request = BindRequest(self.resource)
+ self.send_stanza(bind_request)
+ self.state = StreamState.WAIT_FOR_BIND
+
+ def _on_bind(self, stanza):
+ if not isResultNode(stanza):
+ log.error('Binding failed: %s.', stanza.getTag('error'))
+ if stanza.getError() == 'conflict' and self.resource is not None:
+ log.info('Try to request server generated resource')
+ self._start_bind()
+ return
+ self.set_state(StreamState.BIND_FAILED)
+ return
+
+ jid = stanza.getTag('bind').getTagData('jid')
+ log.info('Successfully bound %s', jid)
+ self._set_bound_jid(jid)
+
+ if not self._session_required:
+ # Server don't want us to initialize a session
+ log.info('No session required')
+ self.set_state(StreamState.BIND_SUCCESSFUL)
+ else:
+ session_request = SessionRequest()
+ self.send_stanza(session_request)
+ self.state = StreamState.WAIT_FOR_SESSION
+
+ def _on_session(self, stanza):
+ if isResultNode(stanza):
+ log.info('Successfully started session')
+ self.set_state(StreamState.BIND_SUCCESSFUL)
+ else:
+ log.error('Session open failed')
+ self.set_state(StreamState.SESSION_FAILED)
+
+ def _ping(self):
+ iq = Iq('get', to=self._jid.getBare())
+ iq.addChild(name='ping', namespace=NS_PING)
+ self.send_stanza(iq, timeout=10, callback=self._on_pong)
+ log.info('Ping')
+
+ def _on_pong(self, _client, result):
+ self._ping_source_id = None
+ if is_error_result(result):
+ if result.condition == 'timeout':
+ log.info('Ping timeout')
+ self._disconnect(immediate=True)
+ return
+ log.info('Pong')
+
+ def register_handler(self, *args, **kwargs):
+ self._dispatcher.register_handler(*args, **kwargs)
+
+ def unregister_handler(self, *args, **kwargs):
+ self._dispatcher.unregister_handler(*args, **kwargs)
+
+ def destroy(self):
+ self._remove_ping_timer()
+ self._smacks = None
+ self._sasl = None
+ self._dispatcher.cleanup()
+ self._dispatcher = None
+ self.remove_subscriptions()
diff --git a/nbxmpp/connection.py b/nbxmpp/connection.py
new file mode 100644
index 0000000..70a79d0
--- /dev/null
+++ b/nbxmpp/connection.py
@@ -0,0 +1,130 @@
+# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com>
+#
+# This file is part of nbxmpp.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 3
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+
+from gi.repository import Gio
+
+from nbxmpp.const import TCPState
+from nbxmpp.util import Observable
+
+log = logging.getLogger('nbxmpp.connection')
+
+
+class Connection(Observable):
+ '''
+ Base Connection Class
+
+ Signals:
+
+ data-sent
+ data-received
+ bad-certificate
+ certificate-set
+ connection-failed
+ disconnected
+ '''
+ def __init__(self,
+ address,
+ accepted_certificates,
+ ignore_tls_errors,
+ ignored_tls_errors,
+ client_cert):
+ Observable.__init__(self, log)
+
+ self._client_cert = client_cert
+ self._address = address
+ self._state = None
+
+ self._state = TCPState.DISCONNECTED
+
+ self._peer_certificate = None
+ self._peer_certificate_errors = None
+ self._accepted_certificates = accepted_certificates
+ self._ignore_tls_errors = ignore_tls_errors
+ self._ignored_tls_errors = ignored_tls_errors
+
+ @property
+ def peer_certificate(self):
+ return (self._peer_certificate, self._peer_certificate_errors)
+
+ @property
+ def connection_type(self):
+ return self._address.type
+
+ @property
+ def state(self):
+ return self._state
+
+ @state.setter
+ def state(self, value):
+ log.info('Set Connection State: %s', value)
+ self._state = value
+
+ def _accept_certificate(self):
+ if not self._peer_certificate_errors:
+ return True
+
+ log.info('Found TLS certificate errors: %s',
+ self._peer_certificate_errors)
+
+ if self._ignore_tls_errors:
+ log.warning('Ignore all errors')
+ return True
+
+ if self._ignored_tls_errors:
+ log.warning('Ignore TLS certificate errors: %s',
+ self._ignored_tls_errors)
+ self._peer_certificate_errors -= self._ignored_tls_errors
+
+ if Gio.TlsCertificateFlags.UNKNOWN_CA in self._peer_certificate_errors:
+ for accepted_certificate in self._accepted_certificates:
+ if certificate.is_same(accepted_certificate):
+ self._peer_certificate_errors.discard(
+ Gio.TlsCertificateFlags.UNKNOWN_CA)
+ break
+
+ if not self._peer_certificate_errors:
+ return True
+ return False
+
+ def disconnect(self):
+ raise NotImplementedError
+
+ def connect(self):
+ raise NotImplementedError
+
+ def send(self, stanza, now=False):
+ raise NotImplementedError
+
+ @staticmethod
+ def _log_stanza(data, received=True):
+ direction = 'RECEIVED' if received else 'SENT'
+ message = ('::::: DATA %s ::::'
+ '\n_____________\n'
+ '%s'
+ '\n_____________')
+ log.info(message, direction, data)
+
+ def start_tls_negotiation(self):
+ raise NotImplementedError
+
+ def destroy(self):
+ self.remove_subscriptions()
+ self._peer_certificate = None
+ self._client_cert = None
+ self._address = None
diff --git a/nbxmpp/const.py b/nbxmpp/const.py
index 9877a35..ef19585 100644
--- a/nbxmpp/const.py
+++ b/nbxmpp/const.py
@@ -17,31 +17,9 @@
from enum import Enum
from enum import IntEnum
-from enum import unique
from functools import total_ordering
-
-@unique
-class Realm(Enum):
- CONNECTING = 'Connecting'
-
- def __str__(self):
- return self.value
-
-
-@unique
-class Event(Enum):
- AUTH_SUCCESSFUL = 'Auth successful'
- AUTH_FAILED = 'Auth failed'
- BIND_FAILED = 'Bind failed'
- SESSION_FAILED = 'Session failed'
- RESUME_SUCCESSFUL = 'Resume successful'
- RESUME_FAILED = 'Resume failed'
- CONNECTION_ACTIVE = 'Connection active'
-
- def __str__(self):
- return self.value
-
+from gi.repository import Gio
class GSSAPIState(IntEnum):
STEP = 0
@@ -341,6 +319,94 @@ class AdHocNoteType(Enum):
ERROR = 'error'
+class ConnectionType(Enum):
+ DIRECT_TLS = 'ssl'
+ START_TLS = 'tls'
+ PLAIN = 'plain'
+
+ @property
+ def is_direct_tls(self):
+ return self == ConnectionType.DIRECT_TLS
+
+ @property
+ def is_start_tls(self):
+ return self == ConnectionType.START_TLS
+
+ @property
+ def is_plain(self):
+ return self == ConnectionType.PLAIN
+
+
+class ConnectionProtocol(IntEnum):
+ TCP = 0
+ WEBSOCKET = 1
+
+
+class StreamState(Enum):
+ RESOLVE = 'resolve'
+ RESOLVED = 'resolved'
+ CONNECTING = 'connecting'
+ CONNECTED = 'connected'
+ DISCONNECTED = 'disconnected'
+ DISCONNECTING = 'disconnecting'
+ STREAM_START = 'stream start'
+ WAIT_FOR_STREAM_START = 'wait for stream start'
+ WAIT_FOR_FEATURES = 'wait for features'
+ WAIT_FOR_TLS_PROCEED = 'wait for tls proceed'
+ TLS_START_SUCCESSFUL = 'tls start successful'
+ PROCEED_WITH_AUTH = 'proceed with auth'
+ AUTH_SUCCESSFUL = 'auth successful'
+ AUTH_FAILED = 'auth failed'
+ SESSION_FAILED = 'session failed'
+ WAIT_FOR_RESUMED = 'wait for resumed'
+ RESUME_FAILED = 'resume failed'
+ RESUME_SUCCESSFUL = 'resume successful'
+ PROCEED_WITH_BIND = 'proceed with bind'
+ BIND_SUCCESSFUL = 'bind successful'
+ BIND_FAILED = 'bind failed'
+ WAIT_FOR_BIND = 'wait for bind'
+ WAIT_FOR_SESSION = 'wait for session'
+ ACTIVE = 'active'
+
+
+class StreamError(Enum):
+ PARSING = 0
+ CONNECTION_FAILED = 1
+ SESSION = 2
+ BIND = 3
+ TLS = 4
+ BAD_CERTIFICATE = 5
+ STREAM = 6
+ SASL = 7
+ REGISTER = 8
+ END = 9
+
+
+class TCPState(Enum):
+ DISCONNECTED = 'disconnected'
+ DISCONNECTING = 'disconnecting'
+ CONNECTING = 'connecting'
+ CONNECTED = 'connected'
+
+
+class Mode(IntEnum):
+ CLIENT = 0
+ REGISTER = 1
+ LOGIN_TEST = 2
+
+ @property
+ def is_client(self):
+ return self == Mode.CLIENT
+
+ @property
+ def is_register(self):
+ return self == Mode.REGISTER
+
+ @property
+ def is_login_test(self):
+ return self == Mode.LOGIN_TEST
+
+
MOODS = [
'afraid',
'amazed',
@@ -571,3 +637,15 @@ REGISTER_FIELDS = [
'url',
'date',
]
+
+# pylint: disable=line-too-long
+GIO_TLS_ERRORS = {
+ Gio.TlsCertificateFlags.UNKNOWN_CA: 'The signing certificate authority is not known',
+ Gio.TlsCertificateFlags.REVOKED: 'The certificate has been revoked',
+ Gio.TlsCertificateFlags.BAD_IDENTITY: 'The certificate does not match the expected identity of the site',
+ Gio.TlsCertificateFlags.INSECURE: 'The certificate’s algorithm is insecure',
+ Gio.TlsCertificateFlags.NOT_ACTIVATED: 'The certificate’s activation time is in the future',
+ Gio.TlsCertificateFlags.GENERIC_ERROR: 'Unknown validation error',
+ Gio.TlsCertificateFlags.EXPIRED: 'The certificate has expired',
+}
+# pylint: enable=line-too-long
diff --git a/nbxmpp/dispatcher.py b/nbxmpp/dispatcher.py
index 60f68d5..c91be32 100644
--- a/nbxmpp/dispatcher.py
+++ b/nbxmpp/dispatcher.py
@@ -1,36 +1,32 @@
-## dispatcher.py
-##
-## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
-## modified by Dimitur Kirov <dkirov@gmail.com>
-##
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2, or (at your option)
-## any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-
-
-"""
-Main xmpp decision making logic. Provides library with methods to assign
-different handlers to different XMPP stanzas and namespaces
-"""
-
-import sys
-import locale
-import re
-import uuid
+# Copyright (C) 2019 Philipp Hörist <philipp AT hoerist.com>
+#
+# This file is part of nbxmpp.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 3
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; If not, see <http://www.gnu.org/licenses/>.
+
import logging
-import inspect
+import re
+import time
from xml.parsers.expat import ExpatError
+from gi.repository import GLib
+
from nbxmpp.simplexml import NodeBuilder
-from nbxmpp.plugin import PlugIn
+from nbxmpp.simplexml import Node
from nbxmpp.protocol import NS_STREAMS
-from nbxmpp.protocol import NS_HTTP_BIND
+from nbxmpp.protocol import NS_CLIENT
+from nbxmpp.protocol import NS_XMPP_STREAMS
from nbxmpp.protocol import NodeProcessed
from nbxmpp.protocol import InvalidFrom
from nbxmpp.protocol import InvalidJid
@@ -39,8 +35,8 @@ from nbxmpp.protocol import Iq
from nbxmpp.protocol import Presence
from nbxmpp.protocol import Message
from nbxmpp.protocol import Protocol
-from nbxmpp.protocol import Node
from nbxmpp.protocol import Error
+from nbxmpp.protocol import StreamErrorNode
from nbxmpp.protocol import ERR_FEATURE_NOT_IMPLEMENTED
from nbxmpp.modules.eme import EME
from nbxmpp.modules.http_auth import HTTPAuth
@@ -82,479 +78,288 @@ from nbxmpp.modules.register import Register
from nbxmpp.modules.http_upload import HTTPUpload
from nbxmpp.modules.misc import unwrap_carbon
from nbxmpp.modules.misc import unwrap_mam
+from nbxmpp.structs import StanzaTimeoutError
from nbxmpp.util import get_properties_struct
+from nbxmpp.util import get_invalid_xml_regex
+from nbxmpp.util import is_websocket_close
+from nbxmpp.util import is_websocket_stream_error
+from nbxmpp.util import Observable
log = logging.getLogger('nbxmpp.dispatcher')
-#: default timeout to wait for response for our id
-DEFAULT_TIMEOUT_SECONDS = 25
-XML_DECLARATION = '<?xml version=\'1.0\'?>'
-
-# FIXME: ugly
-class Dispatcher:
+class StanzaDispatcher(Observable):
"""
- Why is this here - I needed to redefine Dispatcher for BOSH and easiest way
- was to inherit original Dispatcher (now renamed to XMPPDispatcher). Trouble
- is that reference used to access dispatcher instance is in Client attribute
- named by __class__.__name__ of the dispatcher instance .. long story short:
+ Dispatches stanzas to handlers
- I wrote following to avoid changing each client.Dispatcher.whatever() in xmpp
+ Signals:
+ before-dispatch
+ parsing-error
+ stream-end
- If having two kinds of dispatcher will go well, I will rewrite the dispatcher
- references in other scripts
"""
- def PlugIn(self, client_obj, after_SASL=False, old_features=None):
- if client_obj.protocol_type == 'XMPP':
- XMPPDispatcher().PlugIn(client_obj)
- elif client_obj.protocol_type == 'BOSH':
- BOSHDispatcher().PlugIn(client_obj, after_SASL, old_features)
- else:
- assert False # should never be reached
+ def __init__(self, client):
+ Observable.__init__(self, log)
+ self._client = client
+ self._modules = {}
+ self._parser = None
+ self._websocket_stream_error = None
- @classmethod
- def get_instance(cls, *args, **kwargs):
- """
- Factory Method for object creation
+ self._handlers = {}
- Use this instead of directly initializing the class in order to make
- unit testing much easier.
- """
- return cls(*args, **kwargs)
+ self._id_callbacks = {}
+ self._dispatch_callback = None
+ self._timeout_id = None
+ self._stanza_types = {
+ 'iq': Iq,
+ 'message': Message,
+ 'presence': Presence,
+ 'error': StreamErrorNode,
+ }
-class XMPPDispatcher(PlugIn):
- """
- Handles XMPP stream and is the first who takes control over a fresh stanza
+ self.invalid_chars_re = get_invalid_xml_regex()
- Is plugged into NonBlockingClient but can be replugged to restart handled
- stream headers (used by SASL f.e.).
- """
+ self._register_namespace('unknown')
+ self._register_namespace(NS_STREAMS)
+ self._register_namespace(NS_CLIENT)
+ self._register_protocol('iq', Iq)
+ self._register_protocol('presence', Presence)
+ self._register_protocol('message', Message)
- def __init__(self):
- PlugIn.__init__(self)
- self.handlers = {}
- self._modules = {}
- self._expected = {}
- self._defaultHandler = None
- self._pendingExceptions = []
- self._eventHandler = None
- self._cycleHandlers = []
- self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler,
- self.RegisterEventHandler, self.UnregisterCycleHandler,
- self.RegisterCycleHandler, self.RegisterHandlerOnce,
- self.UnregisterHandler, self.RegisterProtocol,
- self.SendAndCallForResponse,
- self.getAnID, self.Event, self.send, self.get_module]
-
- # \ufddo -> \ufdef range
- c = '\ufdd0'
- r = c
- while c < '\ufdef':
- c = chr(ord(c) + 1)
- r += '|' + c
-
- # \ufffe-\uffff, \u1fffe-\u1ffff, ..., \u10fffe-\u10ffff
- c = '\ufffe'
- r += '|' + c
- r += '|' + chr(ord(c) + 1)
- while c < '\U0010fffe':
- c = chr(ord(c) + 0x10000)
- r += '|' + c
- r += '|' + chr(ord(c) + 1)
-
- self.invalid_chars_re = re.compile(r)
-
- def getAnID(self):
- return str(uuid.uuid4())
-
- def dumpHandlers(self):
- """
- Return set of user-registered callbacks in it's internal format. Used
- within the library to carry user handlers set over Dispatcher replugins
- """
- return self.handlers
+ self._register_modules()
- def restoreHandlers(self, handlers):
- """
- Restore user-registered callbacks structure from dump previously obtained
- via dumpHandlers. Used within the library to carry user handlers set over
- Dispatcher replugins.
- """
- self.handlers = handlers
+ def set_dispatch_callback(self, callback):
+ log.info('Set dispatch callback: %s', callback)
+ self._dispatch_callback = callback
def get_module(self, name):
return self._modules[name]
def _register_modules(self):
- self._modules['BasePresence'] = BasePresence(self._owner)
- self._modules['BaseMessage'] = BaseMessage(self._owner)
- self._modules['BaseIq'] = BaseIq(self._owner)
- self._modules['EME'] = EME(self._owner)
- self._modules['HTTPAuth'] = HTTPAuth(self._owner)
- self._modules['Nickname'] = Nickname(self._owner)
- self._modules['MUC'] = MUC(self._owner)
- self._modules['Delay'] = Delay(self._owner)
- self._modules['Captcha'] = Captcha(self._owner)
- self._modules['Idle'] = Idle(self._owner)
- self._modules['PGPLegacy'] = PGPLegacy(self._owner)
- self._modules['VCardAvatar'] = VCardAvatar(self._owner)
- self._modules['EntityCaps'] = EntityCaps(self._owner)
- self._modules['Blocking'] = Blocking(self._owner)
- self._modules['PubSub'] = PubSub(self._owner)
- self._modules['Mood'] = Mood(self._owner)
- self._modules['Activity'] = Activity(self._owner)
- self._modules['Tune'] = Tune(self._owner)
- self._modules['Location'] = Location(self._owner)
- self._modules['UserAvatar'] = UserAvatar(self._owner)
- self._modules['Bookmarks'] = Bookmarks(self._owner)
- self._modules['OpenPGP'] = OpenPGP(self._owner)
- self._modules['OMEMO'] = OMEMO(self._owner)
- self._modules['Annotations'] = Annotations(self._owner)
- self._modules['Muclumbus'] = Muclumbus(self._owner)
- self._modules['SoftwareVersion'] = SoftwareVersion(self._owner)
- self._modules['AdHoc'] = AdHoc(self._owner)
- self._modules['IBB'] = IBB(self._owner)
- self._modules['Discovery'] = Discovery(self._owner)
- self._modules['ChatMarkers'] = ChatMarkers(self._owner)
- self._modules['Receipts'] = Receipts(self._owner)
- self._modules['OOB'] = OOB(self._owner)
- self._modules['Correction'] = Correction(self._owner)
- self._modules['Attention'] = Attention(self._owner)
- self._modules['SecurityLabels'] = SecurityLabels(self._owner)
- self._modules['Chatstates'] = Chatstates(self._owner)
- self._modules['Register'] = Register(self._owner)
- self._modules['HTTPUpload'] = HTTPUpload(self._owner)
+ self._modules['BasePresence'] = BasePresence(self._client)
+ self._modules['BaseMessage'] = BaseMessage(self._client)
+ self._modules['BaseIq'] = BaseIq(self._client)
+ self._modules['EME'] = EME(self._client)
+ self._modules['HTTPAuth'] = HTTPAuth(self._client)
+ self._modules['Nickname'] = Nickname(self._client)
+ self._modules['MUC'] = MUC(self._client)
+ self._modules['Delay'] = Delay(self._client)
+ self._modules['Captcha'] = Captcha(self._client)
+ self._modules['Idle'] = Idle(self._client)
+ self._modules['PGPLegacy'] = PGPLegacy(self._client)
+ self._modules['VCardAvatar'] = VCardAvatar(self._client)
+ self._modules['EntityCaps'] = EntityCaps(self._client)
+ self._modules['Blocking'] = Blocking(self._client)
+ self._modules['PubSub'] = PubSub(self._client)
+ self._modules['Mood'] = Mood(self._client)
+ self._modules['Activity'] = Activity(self._client)
+ self._modules['Tune'] = Tune(self._client)
+ self._modules['Location'] = Location(self._client)
+ self._modules['UserAvatar'] = UserAvatar(self._client)
+ self._modules['Bookmarks'] = Bookmarks(self._client)
+ self._modules['OpenPGP'] = OpenPGP(self._client)
+ self._modules['OMEMO'] = OMEMO(self._client)
+ self._modules['Annotations'] = Annotations(self._client)
+ self._modules['Muclumbus'] = Muclumbus(self._client)
+ self._modules['SoftwareVersion'] = SoftwareVersion(self._client)
+ self._modules['AdHoc'] = AdHoc(self._client)
+ self._modules['IBB'] = IBB(self._client)
+ self._modules['Discovery'] = Discovery(self._client)
+ self._modules['ChatMarkers'] = ChatMarkers(self._client)
+ self._modules['Receipts'] = Receipts(self._client)
+ self._modules['OOB'] = OOB(self._client)
+ self._modules['Correction'] = Correction(self._client)
+ self._modules['Attention'] = Attention(self._client)
+ self._modules['SecurityLabels'] = SecurityLabels(self._client)
+ self._modules['Chatstates'] = Chatstates(self._client)
+ self._modules['Register'] = Register(self._client)
+ self._modules['HTTPUpload'] = HTTPUpload(self._client)
for instance in self._modules.values():
for handler in instance.handlers:
- self.RegisterHandler(*handler)
+ self.register_handler(*handler)
- def _init(self):
- """
- Register default namespaces/protocols/handlers. Used internally
- """
- # FIXME: inject dependencies, do not rely that they are defined by our
- # owner
- self.RegisterNamespace('unknown')
- self.RegisterNamespace(NS_STREAMS)
- self.RegisterNamespace(self._owner.defaultNamespace)
- self.RegisterProtocol('iq', Iq)
- self.RegisterProtocol('presence', Presence)
- self.RegisterProtocol('message', Message)
- self.RegisterDefaultHandler(self.returnStanzaHandler)
- self.RegisterEventHandler(self._owner._caller._event_dispatcher)
- self._register_modules()
+ def reset_parser(self):
+ self._remove_timeout_source()
+ if self._parser is not None:
+ self._parser.dispatch = None
+ self._parser.destroy()
+ self._parser = None
- def plugin(self, owner):
- """
- Plug the Dispatcher instance into Client class instance and send initial
- stream header. Used internally
- """
- self._init()
- self._owner.lastErrNode = None
- self._owner.lastErr = None
- self._owner.lastErrCode = None
- if hasattr(self._owner, 'StreamInit'):
- self._owner.StreamInit()
- else:
- self.StreamInit()
-
- def plugout(self):
- """
- Prepare instance to be destructed
- """
- self._modules = {}
- self.Stream.dispatch = None
- self.Stream.features = None
- self.Stream.destroy()
- self._owner = None
- self.Stream = None
+ self._id_callbacks.clear()
- def StreamInit(self):
- """
- Send an initial stream header
- """
- self._owner.Connection.sendqueue = []
- self.Stream = NodeBuilder()
- self.Stream.dispatch = self.dispatch
- self.Stream._dispatch_depth = 2
- self.Stream.stream_header_received = self._check_stream_start
- self.Stream.features = None
- self._metastream = Node('stream:stream')
- self._metastream.setNamespace(self._owner.Namespace)
- self._metastream.setAttr('version', '1.0')
- self._metastream.setAttr('xmlns:stream', NS_STREAMS)
- self._metastream.setAttr('to', self._owner.Server)
- self._metastream.setAttr('xml:lang', self._owner.lang)
- self._owner.send("%s%s>" % (XML_DECLARATION, str(self._metastream)[:-2]))
-
- def _check_stream_start(self, ns, tag, attrs):
- if ns != NS_STREAMS or tag!='stream':
- raise ValueError('Incorrect stream start: (%s,%s). Terminating.'
- % (tag, ns))
+ self._parser = NodeBuilder(dispatch_depth=2,
+ finished=False)
+ self._parser.dispatch = self.dispatch
def replace_non_character(self, data):
return re.sub(self.invalid_chars_re, '\ufffd', data)
- def ProcessNonBlocking(self, data):
- """
- Check incoming stream for data waiting
+ def process_data(self, data):
+ # Parse incoming data
- :param data: data received from transports/IO sockets
- :return:
- 1) length of processed data if some data were processed;
- 2) '0' string if no data were processed but link is alive;
- 3) 0 (zero) if underlying connection is closed.
- """
- # FIXME:
- # When an error occurs we disconnect the transport directly. Client's
- # disconnect method will never be called.
- # Is this intended?
- # also look at transports start_disconnect()
data = self.replace_non_character(data)
- for handler in self._cycleHandlers:
- handler(self)
- if len(self._pendingExceptions) > 0:
- _pendingException = self._pendingExceptions.pop()
- sys.excepthook(*_pendingException)
+
+ if self._client.is_websocket:
+ stanza = Node(node=data)
+ if is_websocket_stream_error(stanza):
+ for tag in stanza.getChildren():
+ name = tag.getName()
+ if name != 'text' and tag.getNamespace() == NS_XMPP_STREAMS:
+ self._websocket_stream_error = name
+
+ elif is_websocket_close(stanza):
+ log.info('Stream <close> received')
+ self.notify('stream-end', self._websocket_stream_error)
+ return
+
+ self.dispatch(stanza)
return
+
try:
- self.Stream.Parse(data)
- # end stream:stream tag received
- if self.Stream and self.Stream.has_received_endtag():
- self._owner.disconnect(self.Stream.streamError)
- return 0
- except ExpatError as error:
- log.error('Invalid XML received from server. Forcing disconnect.')
- log.error(error)
- self._owner.Connection.disconnect()
- return 0
- except ValueError as e:
- log.debug('ValueError: %s' % str(e))
- self._owner.Connection.pollend()
- return 0
- if len(self._pendingExceptions) > 0:
- _pendingException = self._pendingExceptions.pop()
- sys.excepthook(*_pendingException)
+ self._parser.Parse(data)
+ except (ExpatError, ValueError) as error:
+ log.error('XML parsing error: %s', error)
+ self.notify('parsing-error', error)
return
- if len(data) == 0:
- return '0'
- return len(data)
- def RegisterNamespace(self, xmlns, order='info'):
- """
- Create internal structures for newly registered namespace
-
- You can register handlers for this namespace afterwards. By default
- one namespace is already registered
- (jabber:client or jabber:component:accept depending on context.
- """
- log.debug('Registering namespace "%s"' % xmlns)
- self.handlers[xmlns] = {}
- self.RegisterProtocol('unknown', Protocol, xmlns=xmlns)
- self.RegisterProtocol('default', Protocol, xmlns=xmlns)
+ # end stream:stream tag received
+ if self._parser.has_received_endtag():
+ log.info('End of stream: %s', self._parser.streamError)
+ self.notify('stream-end', self._parser.streamError)
+ return
- def RegisterProtocol(self, tag_name, proto, xmlns=None, order='info'):
+ def _register_namespace(self, xmlns):
"""
- Used to declare some top-level stanza name to dispatcher
-
- Needed to start registering handlers for such stanzas. Iq, message and
- presence protocols are registered by default.
+ Setup handler structure for namespace
"""
- if not xmlns:
- xmlns = self._owner.defaultNamespace
- log.debug('Registering protocol "%s" as %s(%s)', tag_name, proto, xmlns)
- self.handlers[xmlns][tag_name] = {'type': proto, 'default': []}
+ log.debug('Register namespace "%s"', xmlns)
+ self._handlers[xmlns] = {}
+ self._register_protocol('error', Protocol, xmlns=xmlns)
+ self._register_protocol('unknown', Protocol, xmlns=xmlns)
+ self._register_protocol('default', Protocol, xmlns=xmlns)
- def RegisterNamespaceHandler(self, xmlns, handler, typ='', ns='', system=0):
+ def _register_protocol(self, tag_name, protocol, xmlns=None):
"""
- Register handler for processing all stanzas for specified namespace
+ Register protocol for top level tag names
"""
- self.RegisterHandler('default', handler, typ, ns, xmlns, system)
+ if xmlns is None:
+ xmlns = NS_CLIENT
+ log.debug('Register protocol "%s (%s)" as %s',
+ tag_name, xmlns, protocol)
+ self._handlers[xmlns][tag_name] = {'type': protocol, 'default': []}
- def RegisterHandler(self, name, handler, typ='', ns='', xmlns=None,
- system=False, priority=50):
+ def register_handler(self, name, handler, typ='', ns='',
+ xmlns=None, priority=50):
"""
- Register user callback as stanzas handler of declared type
-
- Callback arguments:
- dispatcher instance (for replying), incoming return of previous handlers.
- The callback must raise xmpp.NodeProcessed just before return if it wants
- to prevent other callbacks to be called with the same stanza as argument
- _and_, more importantly library from returning stanza to sender with error set.
-
- :param name: name of stanza. F.e. "iq".
- :param handler: user callback.
- :param typ: value of stanza's "type" attribute. If not specified any
- value will match
- :param ns: namespace of child that stanza must contain.
- :param xmlns: xml namespace
- :param system: call handler even if NodeProcessed Exception were raised
- already.
+ Register handler
+
+ :param name: name of top level tag, example: iq, message, presence
+ :param handler: callback
+ :param typ: value of stanza's "type" attribute.
+ If not specified any value will match
+ :param ns: Namespace of child that stanza must contain
+ :param xmlns: XML namespace, only needed if not jabber:client
:param priority: The priority of the handler, higher get called later
"""
+
if not xmlns:
- xmlns = self._owner.defaultNamespace
+ xmlns = NS_CLIENT
if not typ and not ns:
typ = 'default'
- log.debug(
- 'Registering handler %s for "%s" type->%s ns->%s(%s) priority->%s',
- handler, name, typ, ns, xmlns, priority)
+ log.debug('Register handler %s for "%s" type->%s ns->%s(%s) priority->%s',
+ handler, name, typ, ns, xmlns, priority)
- if xmlns not in self.handlers:
- self.RegisterNamespace(xmlns, 'warn')
- if name not in self.handlers[xmlns]:
- self.RegisterProtocol(name, Protocol, xmlns, 'warn')
+ if xmlns not in self._handlers:
+ self._register_namespace(xmlns)
+ if name not in self._handlers[xmlns]:
+ self._register_protocol(name, Protocol, xmlns)
specific = typ + ns
- if specific not in self.handlers[xmlns][name]:
- self.handlers[xmlns][name][specific] = []
+ if specific not in self._handlers[xmlns][name]:
+ self._handlers[xmlns][name][specific] = []
- self.handlers[xmlns][name][specific].append(
+ self._handlers[xmlns][name][specific].append(
{'func': handler,
- 'system': system,
'priority': priority,
'specific': specific})
- def RegisterHandlerOnce(self, name, handler, typ='', ns='', xmlns=None,
- system=0):
+ def unregister_handler(self, name, handler, typ='', ns='', xmlns=None):
"""
- Unregister handler after first call (not implemented yet)
+ Unregister handler
"""
- # FIXME Drop or implement
- if not xmlns:
- xmlns = self._owner.defaultNamespace
- self.RegisterHandler(name, handler, typ, ns, xmlns, system)
- def UnregisterHandler(self, name, handler, typ='', ns='', xmlns=None):
- """
- Unregister handler. "typ" and "ns" must be specified exactly the same as
- with registering.
- """
if not xmlns:
- xmlns = self._owner.defaultNamespace
+ xmlns = NS_CLIENT
+
if not typ and not ns:
typ = 'default'
- if xmlns not in self.handlers:
- return
- if name not in self.handlers[xmlns]:
- return
specific = typ + ns
- if specific not in self.handlers[xmlns][name]:
+ try:
+ self._handlers[xmlns][name][specific]
+ except KeyError:
return
- for handler_dict in self.handlers[xmlns][name][specific]:
- if handler_dict['func'] == handler:
- try:
- self.handlers[xmlns][name][specific].remove(handler_dict)
- log.debug(
- 'Unregister handler %s for "%s" type->%s ns->%s(%s)',
- handler, name, typ, ns, xmlns)
- except ValueError:
- log.warning(
- 'Unregister failed: %s for "%s" type->%s ns->%s(%s)',
- handler, name, typ, ns, xmlns)
- pass
-
- def RegisterDefaultHandler(self, handler):
- """
- Specify the handler that will be used if no NodeProcessed exception were
- raised. This is returnStanzaHandler by default.
- """
- self._defaultHandler = handler
- def RegisterEventHandler(self, handler):
- """
- Register handler that will process events. F.e. "FILERECEIVED" event. See
- common/connection: _event_dispatcher()
- """
- self._eventHandler = handler
+ for handler_dict in self._handlers[xmlns][name][specific]:
+ if handler_dict['func'] != handler:
+ return
- def returnStanzaHandler(self, conn, stanza):
+ try:
+ self._handlers[xmlns][name][specific].remove(handler_dict)
+ except ValueError:
+ log.warning('Unregister failed: %s for "%s" type->%s ns->%s(%s)',
+ handler, name, typ, ns, xmlns)
+ else:
+ log.debug('Unregister handler %s for "%s" type->%s ns->%s(%s)',
+ handler, name, typ, ns, xmlns)
+
+ def _default_handler(self, stanza):
"""
Return stanza back to the sender with <feature-not-implemented/> error
- set
"""
if stanza.getType() in ('get', 'set'):
- conn._owner.send(Error(stanza, ERR_FEATURE_NOT_IMPLEMENTED))
-
- def RegisterCycleHandler(self, handler):
- """
- Register handler that will be called on every Dispatcher.Process() call
- """
- if handler not in self._cycleHandlers:
- self._cycleHandlers.append(handler)
-
- def UnregisterCycleHandler(self, handler):
- """
- Unregister handler that will be called on every Dispatcher.Process() call
- """
- if handler in self._cycleHandlers:
- self._cycleHandlers.remove(handler)
-
- def Event(self, realm, event, data=None):
- """
- Raise some event
-
- :param realm: scope of event. Usually a namespace.
- :param event: the event itself. F.e. "SUCCESSFUL SEND".
- :param data: data that comes along with event. Depends on event.
- """
- if self._eventHandler:
- self._eventHandler(realm, event, data)
- else:
- log.warning('Received unhandled event: %s' % event)
+ self._client.send_stanza(Error(stanza, ERR_FEATURE_NOT_IMPLEMENTED))
def dispatch(self, stanza):
- """
- Main procedure that performs XMPP stanza recognition and calling
- apppropriate handlers for it. Called by simplexml
- """
-
- self.Event('', 'STANZA RECEIVED', stanza)
-
- self.Stream._mini_dom = None
+ self.notify('before-dispatch', stanza)
+
+ if self._dispatch_callback is not None:
+ name = stanza.getName()
+ protocol_class = self._stanza_types.get(name)
+ if protocol_class is not None:
+ stanza = protocol_class(node=stanza)
+ self._dispatch_callback(stanza)
+ return
# Count stanza
- self._owner.Smacks.count_incoming(stanza.getName())
+ self._client._smacks.count_incoming(stanza.getName())
name = stanza.getName()
- if name == 'features':
- self._owner.got_features = True
- self.Stream.features = stanza
- elif name == 'error':
- if stanza.getTag('see-other-host'):
- self._owner.got_see_other_host = stanza
-
xmlns = stanza.getNamespace()
- if xmlns not in self.handlers:
+ if xmlns not in self._handlers:
log.warning('Unknown namespace: %s', xmlns)
xmlns = 'unknown'
- # features stanza has been handled before
- if name not in self.handlers[xmlns]:
- if name not in ('features', 'stream'):
- log.warning('Unknown stanza: %s', stanza)
- else:
- log.debug('Got %s / %s stanza', xmlns, name)
+
+ if name not in self._handlers[xmlns]:
+ log.warning('Unknown stanza: %s', stanza)
name = 'unknown'
- else:
- log.debug('Got %s / %s stanza', xmlns, name)
# Convert simplexml to Protocol object
try:
- stanza = self.handlers[xmlns][name]['type'](node=stanza)
+ stanza = self._handlers[xmlns][name]['type'](node=stanza)
except InvalidJid:
log.warning('Invalid JID, ignoring stanza')
log.warning(stanza)
return
- own_jid = self._owner.get_bound_jid()
+ own_jid = self._client.get_bound_jid()
properties = get_properties_struct(name)
if name == 'iq':
@@ -605,154 +410,91 @@ class XMPPDispatcher(PlugIn):
stanza.props = stanza.getProperties()
log.debug('type: %s, properties: %s', typ, stanza.props)
+ # Process callbacks
_id = stanza.getID()
- processed = False
- if _id in self._expected:
- cb, args = self._expected[_id]
- log.debug('Expected stanza arrived. Callback %s(%s) found',
- cb, args)
+ func, _timeout, user_data = self._id_callbacks.pop(
+ _id, (None, None, {}))
+ if user_data is None:
+ user_data = {}
+
+ if func is not None:
try:
- if args is None:
- cb(self, stanza)
- else:
- cb(self, stanza, **args)
- except NodeProcessed:
- pass
+ func(self._client, stanza, **user_data)
+ except Exception:
+ log.exception('Error while handling stanza')
return
# Gather specifics depending on stanza properties
specifics = ['default']
- if typ and typ in self.handlers[xmlns][name]:
+ if typ and typ in self._handlers[xmlns][name]:
specifics.append(typ)
for prop in stanza.props:
- if prop in self.handlers[xmlns][name]:
+ if prop in self._handlers[xmlns][name]:
specifics.append(prop)
- if typ and typ + prop in self.handlers[xmlns][name]:
+ if typ and typ + prop in self._handlers[xmlns][name]:
specifics.append(typ + prop)
# Create the handler chain
chain = []
- chain += self.handlers[xmlns]['default']['default']
+ chain += self._handlers[xmlns]['default']['default']
for specific in specifics:
- chain += self.handlers[xmlns][name][specific]
+ chain += self._handlers[xmlns][name][specific]
# Sort chain with priority
chain.sort(key=lambda x: x['priority'])
for handler in chain:
- if not processed or handler['system']:
- try:
- log.info('Call handler: %s', handler['func'].__qualname__)
- # Backwards compatibility until all handlers support
- # properties
- signature = inspect.signature(handler['func'])
- if len(signature.parameters) > 2:
- handler['func'](self, stanza, properties)
- else:
- handler['func'](self, stanza)
- except NodeProcessed:
- processed = True
- except Exception:
- self._pendingExceptions.insert(0, sys.exc_info())
- return
+ log.info('Call handler: %s', handler['func'].__qualname__)
+ try:
+ handler['func'](self._client, stanza, properties)
+ except NodeProcessed:
+ return
+ except Exception:
+ log.exception('Handler exception:')
+ return
# Stanza was not processed call default handler
- if not processed and self._defaultHandler:
- self._defaultHandler(self, stanza)
-
- def SendAndCallForResponse(self, stanza, func=None, args=None):
- """
- Put stanza on the wire and call back when recipient replies. Additional
- callback arguments can be specified in args
- """
- _waitid = self.send(stanza)
- self._expected[_waitid] = (func, args)
- return _waitid
-
- def send(self, stanza, now=False):
- """
- Wrap transports send method when plugged into NonBlockingClient. Makes
- sure stanzas get ID and from tag.
- """
- ID = None
- if type(stanza) != str:
- if isinstance(stanza, Protocol):
- ID = stanza.getID()
- if ID is None:
- stanza.setID(self.getAnID())
- ID = stanza.getID()
- if self._owner._registered_name and not stanza.getAttr('from'):
- stanza.setAttr('from', self._owner._registered_name)
-
- self._owner.Connection.send(stanza, now)
-
- # If no ID then it is a whitespace
- if hasattr(self._owner, 'Smacks') and ID:
- self._owner.Smacks.save_in_queue(stanza)
-
- return ID
-
-
-class BOSHDispatcher(XMPPDispatcher):
-
- def PlugIn(self, owner, after_SASL=False, old_features=None):
- self.old_features = old_features
- self.after_SASL = after_SASL
- XMPPDispatcher.PlugIn(self, owner)
-
- def StreamInit(self):
- """
- Send an initial stream header
- """
- self.Stream = NodeBuilder()
- self.Stream.dispatch = self.dispatch
- self.Stream._dispatch_depth = 2
- self.Stream.stream_header_received = self._check_stream_start
- self.Stream.features = self.old_features
-
- self._metastream = Node('stream:stream')
- self._metastream.setNamespace(self._owner.Namespace)
- self._metastream.setAttr('version', '1.0')
- self._metastream.setAttr('xmlns:stream', NS_STREAMS)
- self._metastream.setAttr('to', self._owner.Server)
- self._metastream.setAttr('xml:lang', self._owner.lang)
-
- self.restart = True
- self._owner.Connection.send_init(after_SASL=self.after_SASL)
-
- def StreamTerminate(self):
- """
- Send a stream terminator
- """
- self._owner.Connection.send_terminator()
-
- def ProcessNonBlocking(self, data=None):
- if self.restart:
- fromstream = self._metastream
- fromstream.setAttr('from', fromstream.getAttr('to'))
- fromstream.delAttr('to')
- data = '%s%s>%s' % (XML_DECLARATION, str(fromstream)[:-2], data)
- self.restart = False
- return XMPPDispatcher.ProcessNonBlocking(self, data)
-
- def dispatch(self, stanza):
- if stanza.getName() == 'body' and stanza.getNamespace() == NS_HTTP_BIND:
-
- stanza_attrs = stanza.getAttrs()
- if 'authid' in stanza_attrs:
- # should be only in init response
- # auth module expects id of stream in document attributes
- self.Stream._document_attrs['id'] = stanza_attrs['authid']
- self._owner.Connection.handle_body_attrs(stanza_attrs)
-
- children = stanza.getChildren()
- if children:
- for child in children:
- # if child doesn't have any ns specified, simplexml (or expat)
- # thinks it's of parent's (BOSH body) namespace, so we have to
- # rewrite it to jabber:client
- if child.getNamespace() == NS_HTTP_BIND:
- child.setNamespace(self._owner.defaultNamespace)
- XMPPDispatcher.dispatch(self, child)
- else:
- XMPPDispatcher.dispatch(self, stanza)
+ self._default_handler(stanza)
+
+ def add_callback_for_id(self, id_, func, timeout, user_data):
+ if timeout is not None and self._timeout_id is None:
+ log.info('Add timeout source')
+ self._timeout_id = GLib.timeout_add_seconds(
+ 1, self._timeout_check)
+ timeout = time.monotonic() + timeout
+ self._id_callbacks[id_] = (func, timeout, user_data)
+
+ def _timeout_check(self):
+ log.info('Run timeout check')
+ if not self._id_callbacks:
+ log.info('Remove timeout source, no callbacks scheduled')
+ self._timeout_id = None
+ return False
+
+ for id_ in list(self._id_callbacks.keys()):
+ func, timeout, user_data = self._id_callbacks.get(id_)
+ if timeout is None:
+ continue
+
+ if user_data is None:
+ user_data = {}
+
+ if timeout < time.monotonic():
+ self._id_callbacks.pop(id_)
+ func(self._client, StanzaTimeoutError(id_), **user_data)
+ return True
+
+ def _remove_timeout_source(self):
+ if self._timeout_id is not None:
+ GLib.source_remove(self._timeout_id)
+ self._timeout_id = None
+
+ def cleanup(self):
+ self._client = None
+ self._modules = {}
+ self._parser = None
+ self._id_callbacks.clear()
+ self._dispatch_callback = None
+ self._handlers.clear()
+ self._remove_timeout_source()
+ self.remove_subscriptions()
diff --git a/nbxmpp/examples/client.py b/nbxmpp/examples/client.py
new file mode 100644
index 0000000..82f6f5c
--- /dev/null
+++ b/nbxmpp/examples/client.py
@@ -0,0 +1,281 @@
+#!/usr/bin/python3
+
+import os
+import logging
+import json
+from pathlib import Path
+
+import gi
+gi.require_version('Gtk', '3.0')
+gi.require_version('GLib', '2.0')
+from gi.repository import Gtk
+from gi.repository import GLib
+
+import nbxmpp
+from nbxmpp.protocol import JID
+from nbxmpp.client import Client
+from nbxmpp.structs import ProxyData
+from nbxmpp.addresses import ServerAddress
+from nbxmpp.const import ConnectionType
+from nbxmpp.const import ConnectionProtocol
+from nbxmpp.const import StreamError
+
+consoleloghandler = logging.StreamHandler()
+log = logging.getLogger('nbxmpp')
+log.setLevel('INFO')
+log.addHandler(consoleloghandler)
+
+formatter = logging.Formatter('%(asctime)s %(levelname)-7s %(name)-18s %(message)s',
+ datefmt='%H:%M:%S')
+consoleloghandler.setFormatter(formatter)
+
+
+class Builder:
+ def __init__(self, filename):
+ file_path = Path(__file__).resolve()
+ ui_file_path = file_path.parent / filename
+ self._builder = Gtk.Builder()
+ self._builder.add_from_file(str(ui_file_path))
+
+ def __getattr__(self, name):
+ try:
+ return getattr(self._builder, name)
+ except AttributeError:
+ return self._builder.get_object(name)
+
+
+class StanzaRow(Gtk.ListBoxRow):
+ def __init__(self, stanza, incoming):
+ Gtk.ListBoxRow.__init__(self)
+ color = 'red' if incoming else 'blue'
+ if isinstance(stanza, bytes):
+ stanza = str(stanza)
+ if not isinstance(stanza, str):
+ stanza = stanza.__str__(fancy=True)
+ stanza = GLib.markup_escape_text(stanza)
+ label = Gtk.Label()
+ label.set_markup('<span foreground="%s">%s</span>' % (color, stanza))
+ label.set_xalign(0)
+ label.set_halign(Gtk.Align.START)
+ self.add(label)
+ self.show_all()
+
+
+class TestClient(Gtk.Window):
+ def __init__(self):
+ Gtk.Window.__init__(self, title='Test Client')
+ self.set_default_size(500, 500)
+
+ self._builder = Builder('client.ui')
+ self._builder.connect_signals(self)
+
+ self.add(self._builder.grid)
+
+ self._client = None
+ self._scroll_timeout = None
+ self._create_paths()
+ self._load_config()
+
+ def _create_client(self):
+ self._client = Client()
+ self._client.set_domain(self.address.getDomain())
+ self._client.set_username(self.address.getNode())
+ self._client.set_resource('test')
+
+ proxy_ip = self._builder.proxy_ip.get_text()
+ if proxy_ip:
+ proxy_port = int(self._builder.proxy_port.get_text())
+ proxy_host = '%s:%s' % (proxy_ip, proxy_port)
+ proxy = ProxyData(self._builder.proxy_type.get_active_text().lower(),
+ proxy_host,
+ self._builder.proxy_username.get_text() or None,
+ self._builder.proxy_password.get_text() or None)
+ self._client.set_proxy(proxy)
+
+ self._client.set_connection_types(self._get_connection_types())
+ self._client.set_protocols(self._get_connection_protocols())
+
+ self._client.set_password(self.password)
+
+ self._client.subscribe('resume-failed', self._on_signal)
+ self._client.subscribe('resume-successful', self._on_signal)
+ self._client.subscribe('disconnected', self._on_signal)
+ self._client.subscribe('connection-lost', self._on_signal)
+ self._client.subscribe('connection-failed', self._on_signal)
+ self._client.subscribe('connected', self._on_connected)
+
+ self._client.subscribe('stanza-sent', self._on_stanza_sent)
+ self._client.subscribe('stanza-received', self._on_stanza_received)
+
+ self._client.register_handler('message', self._on_message)
+
+ @property
+ def password(self):
+ return self._builder.password.get_text()
+
+ @property
+ def address(self):
+ return JID(self._builder.address.get_text())
+
+ @property
+ def xml_box(self):
+ return self._builder.xml_box
+
+ def scroll_to_end(self):
+ adj_v = self._builder.scrolledwin.get_vadjustment()
+ if adj_v is None:
+ # This can happen when the Widget is already destroyed when called
+ # from GLib.idle_add
+ self._scroll_timeout = None
+ return
+ max_scroll_pos = adj_v.get_upper() - adj_v.get_page_size()
+ adj_v.set_value(max_scroll_pos)
+
+ adj_h = self._builder.scrolledwin.get_hadjustment()
+ adj_h.set_value(0)
+ self._scroll_timeout = None
+
+ def _on_signal(self, _client, signal_name, *args, **kwargs):
+ log.info('%s, Error: %s', signal_name, self._client.get_error())
+ if signal_name == 'disconnected':
+ if self._client.get_error() is None:
+ return
+ domain, error, text = self._client.get_error()
+ if domain == StreamError.BAD_CERTIFICATE:
+ self._client.set_ignore_tls_errors(True)
+ self._client.connect()
+
+ def _on_connected(self, _client, _signal_name):
+ self.send_presence()
+
+ def _on_message(self, _stream, stanza, _properties):
+ log.info('Message received')
+ log.info(stanza.getBody())
+
+ def _on_stanza_sent(self, _stream, _signal_name, data):
+ self.xml_box.add(StanzaRow(data, False))
+ self._add_scroll_timeout()
+
+ def _on_stanza_received(self, _stream, _signal_name, data):
+ self.xml_box.add(StanzaRow(data, True))
+ self._add_scroll_timeout()
+
+ def _add_scroll_timeout(self):
+ if self._scroll_timeout is not None:
+ return
+ self._scroll_timeout = GLib.timeout_add(50, self.scroll_to_end)
+
+ def _connect_clicked(self, *args):
+ if self._client is None:
+ self._create_client()
+
+ self._client.connect()
+
+ def _disconnect_clicked(self, *args):
+ if self._client is not None:
+ self._client.disconnect()
+
+ def _clear_clicked(self, *args):
+ self.xml_box.foreach(self._remove)
+
+ def _on_reconnect_clicked(self, *args):
+ if self._client is not None:
+ self._client.reconnect()
+
+ def _get_connection_types(self):
+ types = []
+ if self._builder.directtls.get_active():
+ types.append(ConnectionType.DIRECT_TLS)
+ if self._builder.starttls.get_active():
+ types.append(ConnectionType.START_TLS)
+ if self._builder.plain.get_active():
+ types.append(ConnectionType.PLAIN)
+ return types
+
+ def _get_connection_protocols(self):
+ protocols = []
+ if self._builder.tcp.get_active():
+ protocols.append(ConnectionProtocol.TCP)
+ if self._builder.websocket.get_active():
+ protocols.append(ConnectionProtocol.WEBSOCKET)
+ return protocols
+
+ def _on_save_clicked(self, *args):
+ data = {}
+ data['jid'] = self._builder.address.get_text()
+ data['password'] = self._builder.password.get_text()
+ data['proxy_type'] = self._builder.proxy_type.get_active_text()
+ data['proxy_ip'] = self._builder.proxy_ip.get_text()
+ data['proxy_port'] = self._builder.proxy_port.get_text()
+ data['proxy_username'] = self._builder.proxy_username.get_text()
+ data['proxy_password'] = self._builder.proxy_password.get_text()
+
+ data['directtls'] = self._builder.directtls.get_active()
+ data['starttls'] = self._builder.starttls.get_active()
+ data['plain'] = self._builder.plain.get_active()
+ data['tcp'] = self._builder.tcp.get_active()
+ data['websocket'] = self._builder.websocket.get_active()
+
+ path = self._get_config_dir() / 'config'
+ with path.open('w') as fp:
+ json.dump(data, fp)
+
+ def _load_config(self):
+ path = self._get_config_dir() / 'config'
+ if not path.exists():
+ return
+
+ with path.open('r') as fp:
+ data = json.load(fp)
+
+ self._builder.address.set_text(data.get('jid', ''))
+ self._builder.password.set_text(data.get('password', ''))
+ self._builder.proxy_type.set_active_id(data.get('proxy_type', 'HTTP'))
+ self._builder.proxy_ip.set_text(data.get('proxy_ip', ''))
+ self._builder.proxy_port.set_text(data.get('proxy_port', ''))
+ self._builder.proxy_username.set_text(data.get('proxy_username', ''))
+ self._builder.proxy_password.set_text(data.get('proxy_password', ''))
+
+ self._builder.directtls.set_active(data.get('directtls', False))
+ self._builder.starttls.set_active(data.get('starttls', False))
+ self._builder.plain.set_active(data.get('plain', False))
+ self._builder.tcp.set_active(data.get('tcp', False))
+ self._builder.websocket.set_active(data.get('websocket', False))
+
+ @staticmethod
+ def _get_config_dir():
+ if os.name == 'nt':
+ return Path(os.path.join(os.environ['appdata'], 'nbxmpp'))
+
+ expand = os.path.expanduser
+ base = os.getenv('XDG_CONFIG_HOME')
+ if base is None or base[0] != '/':
+ base = expand('~/.config')
+ return Path(os.path.join(base, 'nbxmpp'))
+
+ def _create_paths(self):
+ path_ = self._get_config_dir()
+ if not path_.exists():
+ for parent_path in reversed(path_.parents):
+ # Create all parent folders
+ # don't use mkdir(parent=True), as it ignores `mode`
+ # when creating the parents
+ if not parent_path.exists():
+ print('creating %s directory' % parent_path)
+ parent_path.mkdir(mode=0o700)
+ print('creating %s directory' % path_)
+ path_.mkdir(mode=0o700)
+
+ def _remove(self, item):
+ self.xml_box.remove(item)
+ item.destroy()
+
+ def send_presence(self):
+ presence = nbxmpp.Presence()
+ self._client.send_stanza(presence)
+
+
+win = TestClient()
+win.connect("delete-event", Gtk.main_quit)
+win.show_all()
+Gtk.main()
diff --git a/nbxmpp/examples/client.ui b/nbxmpp/examples/client.ui
new file mode 100644
index 0000000..6744feb
--- /dev/null
+++ b/nbxmpp/examples/client.ui
@@ -0,0 +1,469 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Generated with glade 3.22.1 -->
+<interface>
+ <requires lib="gtk+" version="3.20"/>
+ <object class="GtkGrid" id="grid">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="margin_left">24</property>
+ <property name="margin_right">24</property>
+ <property name="margin_top">24</property>
+ <property name="margin_bottom">24</property>
+ <property name="row_spacing">12</property>
+ <child>
+ <object class="GtkButtonBox">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="spacing">12</property>
+ <property name="layout_style">start</property>
+ <child>
+ <object class="GtkButton">
+ <property name="label" translatable="yes">Connect</property>
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ <property name="receives_default">True</property>
+ <signal name="clicked" handler="_connect_clicked" swapped="no"/>
+ <style>
+ <class name="suggested-action"/>
+ </style>
+ </object>
+ <packing>
+ <property name="expand">True</property>
+ <property name="fill">True</property>
+ <property name="position">0</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkButton">
+ <property name="label" translatable="yes">Disconnect</property>
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ <property name="receives_default">True</property>
+ <signal name="clicked" handler="_disconnect_clicked" swapped="no"/>
+ <style>
+ <class name="destructive-action"/>
+ </style>
+ </object>
+ <packing>
+ <property name="expand">True</property>
+ <property name="fill">True</property>
+ <property name="position">1</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkButton">
+ <property name="label" translatable="yes">Reconnect</property>
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ <property name="receives_default">True</property>
+ <signal name="clicked" handler="_on_reconnect_clicked" swapped="no"/>
+ </object>
+ <packing>
+ <property name="expand">True</property>
+ <property name="fill">True</property>
+ <property name="position">2</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkButton" id="clear">
+ <property name="label" translatable="yes">Clear</property>
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ <property name="receives_default">True</property>
+ <signal name="clicked" handler="_clear_clicked" swapped="no"/>
+ </object>
+ <packing>
+ <property name="expand">True</property>
+ <property name="fill">True</property>
+ <property name="position">3</property>
+ </packing>
+ </child>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">2</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkGrid">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="hexpand">True</property>
+ <property name="row_spacing">6</property>
+ <property name="column_spacing">12</property>
+ <child>
+ <object class="GtkLabel">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="label" translatable="yes">XMPP-Address</property>
+ <property name="xalign">1</property>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">0</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkLabel">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="label" translatable="yes">Password</property>
+ <property name="xalign">1</property>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">1</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkEntry" id="address">
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ <property name="valign">center</property>
+ <property name="hexpand">True</property>
+ </object>
+ <packing>
+ <property name="left_attach">1</property>
+ <property name="top_attach">0</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkEntry" id="password">
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ <property name="valign">center</property>
+ </object>
+ <packing>
+ <property name="left_attach">1</property>
+ <property name="top_attach">1</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkLabel">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="label" translatable="yes">Proxy Host or IP</property>
+ <property name="xalign">1</property>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">3</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkLabel">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="label" translatable="yes">Proxy Port</property>
+ <property name="xalign">1</property>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">4</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkLabel">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="label" translatable="yes">Proxy Password</property>
+ <property name="xalign">1</property>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">6</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkLabel">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="label" translatable="yes">Proxy Username</property>
+ <property name="xalign">1</property>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">5</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkEntry" id="proxy_ip">
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ </object>
+ <packing>
+ <property name="left_attach">1</property>
+ <property name="top_attach">3</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkEntry" id="proxy_username">
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ </object>
+ <packing>
+ <property name="left_attach">1</property>
+ <property name="top_attach">5</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkEntry" id="proxy_port">
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ </object>
+ <packing>
+ <property name="left_attach">1</property>
+ <property name="top_attach">4</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkEntry" id="proxy_password">
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ </object>
+ <packing>
+ <property name="left_attach">1</property>
+ <property name="top_attach">6</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkLabel">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="label" translatable="yes">Save</property>
+ <property name="xalign">1</property>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">7</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkButton">
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ <property name="receives_default">True</property>
+ <property name="tooltip_text" translatable="yes">Save</property>
+ <property name="halign">start</property>
+ <signal name="clicked" handler="_on_save_clicked" swapped="no"/>
+ <child>
+ <object class="GtkImage">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="icon_name">document-save-symbolic</property>
+ </object>
+ </child>
+ </object>
+ <packing>
+ <property name="left_attach">1</property>
+ <property name="top_attach">7</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkFrame">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="label_xalign">0</property>
+ <property name="shadow_type">none</property>
+ <child>
+ <object class="GtkAlignment">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="top_padding">12</property>
+ <property name="bottom_padding">12</property>
+ <property name="left_padding">12</property>
+ <child>
+ <object class="GtkGrid">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <child>
+ <object class="GtkCheckButton" id="directtls">
+ <property name="label" translatable="yes">DIRECT TLS</property>
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ <property name="receives_default">False</property>
+ <property name="halign">start</property>
+ <property name="draw_indicator">True</property>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">0</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkCheckButton" id="starttls">
+ <property name="label" translatable="yes">START TLS</property>
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ <property name="receives_default">False</property>
+ <property name="halign">start</property>
+ <property name="draw_indicator">True</property>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">1</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkCheckButton" id="plain">
+ <property name="label" translatable="yes">PLAIN</property>
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ <property name="receives_default">False</property>
+ <property name="halign">start</property>
+ <property name="draw_indicator">True</property>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">2</property>
+ </packing>
+ </child>
+ </object>
+ </child>
+ </object>
+ </child>
+ <child type="label">
+ <object class="GtkLabel">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="label" translatable="yes">Connection Type</property>
+ </object>
+ </child>
+ </object>
+ <packing>
+ <property name="left_attach">2</property>
+ <property name="top_attach">0</property>
+ <property name="height">3</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkFrame">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="label_xalign">0</property>
+ <property name="shadow_type">none</property>
+ <child>
+ <object class="GtkAlignment">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="top_padding">12</property>
+ <property name="bottom_padding">12</property>
+ <property name="left_padding">12</property>
+ <child>
+ <object class="GtkGrid">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <child>
+ <object class="GtkCheckButton" id="tcp">
+ <property name="label" translatable="yes">TCP</property>
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ <property name="receives_default">False</property>
+ <property name="halign">start</property>
+ <property name="draw_indicator">True</property>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">0</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkCheckButton" id="websocket">
+ <property name="label" translatable="yes">WEBSOCKET</property>
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ <property name="receives_default">False</property>
+ <property name="halign">start</property>
+ <property name="draw_indicator">True</property>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">1</property>
+ </packing>
+ </child>
+ </object>
+ </child>
+ </object>
+ </child>
+ <child type="label">
+ <object class="GtkLabel">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="label" translatable="yes">Connection Protocol</property>
+ </object>
+ </child>
+ </object>
+ <packing>
+ <property name="left_attach">2</property>
+ <property name="top_attach">3</property>
+ <property name="height">3</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkLabel">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="halign">end</property>
+ <property name="label" translatable="yes">Proxy Type</property>
+ <property name="xalign">1</property>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">2</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkComboBoxText" id="proxy_type">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="halign">start</property>
+ <property name="active">0</property>
+ <items>
+ <item id="SOCKS5" translatable="yes">SOCKS5</item>
+ </items>
+ </object>
+ <packing>
+ <property name="left_attach">1</property>
+ <property name="top_attach">2</property>
+ </packing>
+ </child>
+ <child>
+ <placeholder/>
+ </child>
+ <child>
+ <placeholder/>
+ </child>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">0</property>
+ </packing>
+ </child>
+ <child>
+ <object class="GtkScrolledWindow" id="scrolledwin">
+ <property name="visible">True</property>
+ <property name="can_focus">True</property>
+ <property name="hexpand">True</property>
+ <property name="vexpand">True</property>
+ <property name="shadow_type">in</property>
+ <child>
+ <object class="GtkViewport">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <child>
+ <object class="GtkListBox" id="xml_box">
+ <property name="visible">True</property>
+ <property name="can_focus">False</property>
+ <property name="vexpand">True</property>
+ </object>
+ </child>
+ </object>
+ </child>
+ </object>
+ <packing>
+ <property name="left_attach">0</property>
+ <property name="top_attach">1</property>
+ </packing>
+ </child>
+ </object>
+</interface>
diff --git a/nbxmpp/exceptions.py b/nbxmpp/exceptions.py
new file mode 100644
index 0000000..473bf0e
--- /dev/null
+++ b/nbxmpp/exceptions.py
@@ -0,0 +1,24 @@
+# Copyright (C) 2019 Philipp Hörist <philipp AT hoerist.com>
+#
+# This file is part of nbxmpp.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 3
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; If not, see <http://www.gnu.org/licenses/>.
+
+
+class EndOfConnection(Exception):
+ pass
+
+
+class NonFatalSSLError(Exception):
+ pass
diff --git a/nbxmpp/idlequeue.py b/nbxmpp/idlequeue.py
index d20ac46..0ab6f18 100644
--- a/nbxmpp/idlequeue.py
+++ b/nbxmpp/idlequeue.py
@@ -285,7 +285,7 @@ class IdleQueue:
"""
Remove the read timeout
"""
- log.info('read timeout removed for fd %s' % fd)
+ log.debug('read timeout removed for fd %s' % fd)
if fd in self.read_timeouts:
if timeout:
if timeout in self.read_timeouts[fd]:
@@ -302,7 +302,7 @@ class IdleQueue:
A filedescriptor fd can have several timeouts.
"""
- log_txt = 'read timeout set for fd %s on %s seconds' % (fd, seconds)
+ log_txt = 'read timeout set for fd %s on %i seconds' % (fd, seconds)
if func:
log_txt += ' with function ' + str(func)
log.info(log_txt)
diff --git a/nbxmpp/modules/activity.py b/nbxmpp/modules/activity.py
index 7f48ba1..9e1f0b3 100644
--- a/nbxmpp/modules/activity.py
+++ b/nbxmpp/modules/activity.py
@@ -38,7 +38,7 @@ class Activity:
priority=16),
]
- def _process_pubsub_activity(self, _con, stanza, properties):
+ def _process_pubsub_activity(self, _client, stanza, properties):
if not properties.is_pubsub_event:
return
diff --git a/nbxmpp/modules/attention.py b/nbxmpp/modules/attention.py
index a0d2269..109bfde 100644
--- a/nbxmpp/modules/attention.py
+++ b/nbxmpp/modules/attention.py
@@ -34,7 +34,7 @@ class Attention:
priority=15),
]
- def _process_message_attention(self, _con, stanza, properties):
+ def _process_message_attention(self, _client, stanza, properties):
attention = stanza.getTag('attention', namespace=NS_ATTENTION)
if attention is None:
return
diff --git a/nbxmpp/modules/bookmarks.py b/nbxmpp/modules/bookmarks.py
index 85ca237..5b6d8c3 100644
--- a/nbxmpp/modules/bookmarks.py
+++ b/nbxmpp/modules/bookmarks.py
@@ -78,7 +78,7 @@ class Bookmarks:
self._node_configuration_in_progress = False
self._node_configuration_not_possible = False
- def _process_pubsub_bookmarks(self, _con, stanza, properties):
+ def _process_pubsub_bookmarks(self, _client, stanza, properties):
if not properties.is_pubsub_event:
return
@@ -108,7 +108,7 @@ class Bookmarks:
properties.pubsub_event = pubsub_event
- def _process_pubsub_bookmarks2(self, _con, _stanza, properties):
+ def _process_pubsub_bookmarks2(self, _client, _stanza, properties):
if not properties.is_pubsub_event:
return
@@ -415,6 +415,6 @@ class Bookmarks:
return Iq('set', NS_PRIVATE, payload=storage_node)
@staticmethod
- def _on_private_store_result(_con, stanza):
+ def _on_private_store_result(_client, stanza):
if not isResultNode(stanza):
return raise_error(log.info, stanza)
diff --git a/nbxmpp/modules/captcha.py b/nbxmpp/modules/captcha.py
index 52bfc77..8e4f90a 100644
--- a/nbxmpp/modules/captcha.py
+++ b/nbxmpp/modules/captcha.py
@@ -38,7 +38,7 @@ class Captcha:
]
@staticmethod
- def _process_captcha(_con, stanza, properties):
+ def _process_captcha(_client, stanza, properties):
captcha = stanza.getTag('captcha', namespace=NS_CAPTCHA)
if captcha is None:
return
diff --git a/nbxmpp/modules/chat_markers.py b/nbxmpp/modules/chat_markers.py
index 7d33dd6..eec8fa6 100644
--- a/nbxmpp/modules/chat_markers.py
+++ b/nbxmpp/modules/chat_markers.py
@@ -35,7 +35,7 @@ class ChatMarkers:
]
@staticmethod
- def _process_message_marker(_con, stanza, properties):
+ def _process_message_marker(_client, stanza, properties):
type_ = stanza.getTag('received', namespace=NS_CHATMARKERS)
if type_ is None:
type_ = stanza.getTag('displayed', namespace=NS_CHATMARKERS)
diff --git a/nbxmpp/modules/chatstates.py b/nbxmpp/modules/chatstates.py
index 8304310..8f9869f 100644
--- a/nbxmpp/modules/chatstates.py
+++ b/nbxmpp/modules/chatstates.py
@@ -35,7 +35,7 @@ class Chatstates:
priority=15),
]
- def _process_message_chatstate(self, _con, stanza, properties):
+ def _process_message_chatstate(self, _client, stanza, properties):
chatstate = parse_chatstate(stanza)
if chatstate is None:
return
diff --git a/nbxmpp/modules/correction.py b/nbxmpp/modules/correction.py
index d56bd29..68d6376 100644
--- a/nbxmpp/modules/correction.py
+++ b/nbxmpp/modules/correction.py
@@ -34,7 +34,7 @@ class Correction:
priority=15),
]
- def _process_message_correction(self, _con, stanza, properties):
+ def _process_message_correction(self, _client, stanza, properties):
replace = stanza.getTag('replace', namespace=NS_CORRECT)
if replace is None:
return
diff --git a/nbxmpp/modules/delay.py b/nbxmpp/modules/delay.py
index cf440d6..859ad1a 100644
--- a/nbxmpp/modules/delay.py
+++ b/nbxmpp/modules/delay.py
@@ -38,7 +38,7 @@ class Delay:
priority=15)
]
- def _process_message_delay(self, _con, stanza, properties):
+ def _process_message_delay(self, _client, stanza, properties):
if properties.is_muc_subject:
# MUC Subjects can have a delay timestamp
# to indicate when the user has set the subject,
@@ -65,7 +65,7 @@ class Delay:
properties.user_timestamp = parse_delay(stanza, not_from=jids)
@staticmethod
- def _process_presence_delay(_con, stanza, properties):
+ def _process_presence_delay(_client, stanza, properties):
properties.user_timestamp = parse_delay(stanza)
diff --git a/nbxmpp/modules/eme.py b/nbxmpp/modules/eme.py
index c585a2a..84b1d3e 100644
--- a/nbxmpp/modules/eme.py
+++ b/nbxmpp/modules/eme.py
@@ -35,7 +35,7 @@ class EME:
]
@staticmethod
- def _process_eme(_con, stanza, properties):
+ def _process_eme(_client, stanza, properties):
encryption = stanza.getTag('encryption', namespace=NS_EME)
if encryption is None:
return
diff --git a/nbxmpp/modules/entity_caps.py b/nbxmpp/modules/entity_caps.py
index 4eee6ab..dd04ee2 100644
--- a/nbxmpp/modules/entity_caps.py
+++ b/nbxmpp/modules/entity_caps.py
@@ -35,7 +35,7 @@ class EntityCaps:
]
@staticmethod
- def _process_entity_caps(_con, stanza, properties):
+ def _process_entity_caps(_client, stanza, properties):
caps = stanza.getTag('c', namespace=NS_CAPS)
if caps is None:
properties.entity_caps = EntityCapsData()
diff --git a/nbxmpp/modules/http_auth.py b/nbxmpp/modules/http_auth.py
index fb78c46..5488f77 100644
--- a/nbxmpp/modules/http_auth.py
+++ b/nbxmpp/modules/http_auth.py
@@ -40,7 +40,7 @@ class HTTPAuth:
]
@staticmethod
- def _process_http_auth(_con, stanza, properties):
+ def _process_http_auth(_client, stanza, properties):
confirm = stanza.getTag('confirm', namespace=NS_HTTP_AUTH)
if confirm is None:
return
diff --git a/nbxmpp/modules/ibb.py b/nbxmpp/modules/ibb.py
index e70721c..402f5eb 100644
--- a/nbxmpp/modules/ibb.py
+++ b/nbxmpp/modules/ibb.py
@@ -46,7 +46,7 @@ class IBB:
priority=20),
]
- def _process_ibb(self, _con, stanza, properties):
+ def _process_ibb(self, _client, stanza, properties):
if properties.type.is_set:
open_ = stanza.getTag('open', namespace=NS_IBB)
if open_ is not None:
@@ -70,23 +70,24 @@ class IBB:
except Exception as error:
log.warning(error)
log.warning(stanza)
- self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST))
+ self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST))
raise NodeProcessed
if block_size > 65535:
log.warning('Invalid block-size')
- self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST))
+ self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST))
raise NodeProcessed
sid = attrs.get('sid')
if not sid:
log.warning('Invalid sid')
- self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST))
+ self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST))
raise NodeProcessed
type_ = attrs.get('stanza')
if type_ == 'message':
- self._client.send(ErrorStanza(stanza, ERR_FEATURE_NOT_IMPLEMENTED))
+ self._client.send_stanza(ErrorStanza(stanza,
+ ERR_FEATURE_NOT_IMPLEMENTED))
raise NodeProcessed
return IBBData(type='open', block_size=block_size, sid=sid)
@@ -95,7 +96,7 @@ class IBB:
sid = close.getAttrs().get('sid')
if sid is None:
log.warning('Invalid sid')
- self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST))
+ self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST))
raise NodeProcessed
return IBBData(type='close', sid=sid)
@@ -105,21 +106,21 @@ class IBB:
sid = attrs.get('sid')
if sid is None:
log.warning('Invalid sid')
- self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST))
+ self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST))
raise NodeProcessed
try:
seq = int(attrs.get('seq'))
except Exception:
log.exception('Invalid seq')
- self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST))
+ self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST))
raise NodeProcessed
try:
decoded_data = b64decode(data.getData(), return_type=bytes)
except Exception:
log.exception('Failed to decode IBB data')
- self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST))
+ self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST))
raise NodeProcessed
return IBBData(type='data', sid=sid, seq=seq, data=decoded_data)
@@ -130,7 +131,7 @@ class IBB:
reply.getChildren().clear()
else:
reply = ErrorStanza(stanza, error)
- self._client.send(reply)
+ self._client.send_stanza(reply)
@call_on_response('_default_response')
def send_open(self, jid, sid, block_size):
diff --git a/nbxmpp/modules/idle.py b/nbxmpp/modules/idle.py
index 5f5d6db..81870f2 100644
--- a/nbxmpp/modules/idle.py
+++ b/nbxmpp/modules/idle.py
@@ -35,7 +35,7 @@ class Idle:
]
@staticmethod
- def _process_idle(_con, stanza, properties):
+ def _process_idle(_client, stanza, properties):
idle_tag = stanza.getTag('idle', namespace=NS_IDLE)
if idle_tag is None:
return
diff --git a/nbxmpp/modules/iq.py b/nbxmpp/modules/iq.py
index e699c75..35b92ea 100644
--- a/nbxmpp/modules/iq.py
+++ b/nbxmpp/modules/iq.py
@@ -36,13 +36,13 @@ class BaseIq:
priority=10),
]
- def _process_iq_base(self, _con, stanza, properties):
+ def _process_iq_base(self, _client, stanza, properties):
try:
properties.type = IqType(stanza.getType())
except ValueError:
log.warning('Message with invalid type: %s', stanza.getType())
log.warning(stanza)
- self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST))
+ self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST))
raise NodeProcessed
properties.jid = stanza.getFrom()
diff --git a/nbxmpp/modules/location.py b/nbxmpp/modules/location.py
index 5fe2252..88ac35d 100644
--- a/nbxmpp/modules/location.py
+++ b/nbxmpp/modules/location.py
@@ -37,7 +37,7 @@ class Location:
priority=16),
]
- def _process_pubsub_location(self, _con, _stanza, properties):
+ def _process_pubsub_location(self, _client, _stanza, properties):
if not properties.is_pubsub_event:
return
diff --git a/nbxmpp/modules/message.py b/nbxmpp/modules/message.py
index 8a708eb..23c4022 100644
--- a/nbxmpp/modules/message.py
+++ b/nbxmpp/modules/message.py
@@ -41,7 +41,7 @@ class BaseMessage:
priority=10),
]
- def _process_message_base(self, _con, stanza, properties):
+ def _process_message_base(self, _client, stanza, properties):
properties.type = self._parse_type(stanza)
# Determine remote JID
@@ -72,7 +72,7 @@ class BaseMessage:
properties.error = error_factory(stanza)
@staticmethod
- def _process_message_after_base(_con, stanza, properties):
+ def _process_message_after_base(_client, stanza, properties):
# This handler runs after decryption handlers had the chance
# to decrypt the body
properties.body = stanza.getBody()
diff --git a/nbxmpp/modules/mood.py b/nbxmpp/modules/mood.py
index 00cd755..7f02025 100644
--- a/nbxmpp/modules/mood.py
+++ b/nbxmpp/modules/mood.py
@@ -38,7 +38,7 @@ class Mood:
priority=16),
]
- def _process_pubsub_mood(self, _con, stanza, properties):
+ def _process_pubsub_mood(self, _client, stanza, properties):
if not properties.is_pubsub_event:
return
diff --git a/nbxmpp/modules/muc.py b/nbxmpp/modules/muc.py
index 6df191d..28c96e1 100644
--- a/nbxmpp/modules/muc.py
+++ b/nbxmpp/modules/muc.py
@@ -94,7 +94,7 @@ class MUC:
]
@staticmethod
- def _process_muc_presence(_con, stanza, properties):
+ def _process_muc_presence(_client, stanza, properties):
muc = stanza.getTag('x', namespace=NS_MUC)
if muc is None:
return
@@ -103,7 +103,7 @@ class MUC:
properties.muc_jid.setBare()
properties.muc_nickname = properties.jid.getResource()
- def _process_muc_user_presence(self, _con, stanza, properties):
+ def _process_muc_user_presence(self, _client, stanza, properties):
muc_user = stanza.getTag('x', namespace=NS_MUC_USER)
if muc_user is None:
return
@@ -162,7 +162,7 @@ class MUC:
properties.muc_user = self._parse_muc_user(muc_user)
- def _process_groupchat_message(self, _con, stanza, properties):
+ def _process_groupchat_message(self, _client, stanza, properties):
properties.from_muc = True
properties.muc_jid = properties.jid.copy()
properties.muc_jid.setBare()
@@ -179,7 +179,7 @@ class MUC:
properties.muc_ofrom = JID(address.getAttr('jid'))
@staticmethod
- def _process_message(_con, stanza, properties):
+ def _process_message(_client, stanza, properties):
muc_user = stanza.getTag('x', namespace=NS_MUC_USER)
if muc_user is None:
return
@@ -230,7 +230,7 @@ class MUC:
properties.muc_status_codes = codes
@staticmethod
- def _process_direct_invite(_con, stanza, properties):
+ def _process_direct_invite(_client, stanza, properties):
direct = stanza.getTag('x', namespace=NS_CONFERENCE)
if direct is None:
return
@@ -252,7 +252,7 @@ class MUC:
properties.muc_invite = InviteData(**data)
@staticmethod
- def _process_mediated_invite(_con, stanza, properties):
+ def _process_mediated_invite(_client, stanza, properties):
muc_user = stanza.getTag('x', namespace=NS_MUC_USER)
if muc_user is None:
return
@@ -292,7 +292,7 @@ class MUC:
return
@staticmethod
- def _process_voice_request(_con, stanza, properties):
+ def _process_voice_request(_client, stanza, properties):
data_form = stanza.getTag('x', namespace=NS_DATA)
if data_form is None:
return
@@ -324,7 +324,7 @@ class MUC:
form = voice_request.form
form.type_ = 'submit'
form['muc#request_allow'].value = True
- self._client.send(Message(to=muc_jid, payload=form))
+ self._client.send_stanza(Message(to=muc_jid, payload=form))
@call_on_response('_affiliation_received')
def get_affiliation(self, jid, affiliation):
@@ -451,7 +451,7 @@ class MUC:
def set_subject(self, room_jid, subject):
message = Message(room_jid, typ='groupchat', subject=subject)
log.info('Set subject for %s', room_jid)
- self._client.send(message)
+ self._client.send_stanza(message)
def decline(self, room, to, reason=None):
message = Message(to=room)
@@ -459,7 +459,7 @@ class MUC:
decline = muc_user.addChild('decline', attrs={'to': to})
if reason:
decline.setTagData('reason', reason)
- self._client.send(message)
+ self._client.send_stanza(message)
def request_voice(self, room):
message = Message(to=room)
@@ -470,7 +470,7 @@ class MUC:
value='participant',
typ='text-single'))
message.addChild(node=xdata)
- self._client.send(message)
+ self._client.send_stanza(message)
def invite(self, room, to, password, reason=None, continue_=False,
type_=InviteType.MEDIATED):
@@ -480,7 +480,7 @@ class MUC:
else:
invite = self._build_mediated_invite(
room, to, reason, password, continue_)
- self._client.send(invite)
+ self._client.send_stanza(invite)
@staticmethod
def _build_direct_invite(room, to, reason, password, continue_):
@@ -520,7 +520,7 @@ class MUC:
message = Message(typ='error', to=room_jid)
message.setID(message_id)
message.setError(ERR_NOT_ACCEPTABLE)
- self._client.send(message)
+ self._client.send_stanza(message)
@callback
def _default_response(self, stanza):
diff --git a/nbxmpp/modules/muclumbus.py b/nbxmpp/modules/muclumbus.py
index e68bfcf..e6a71eb 100644
--- a/nbxmpp/modules/muclumbus.py
+++ b/nbxmpp/modules/muclumbus.py
@@ -18,6 +18,10 @@
import logging
import json
+import gi
+gi.require_version('Soup', '2.4')
+from gi.repository import Soup
+
from nbxmpp.protocol import NS_MUCLUMBUS
from nbxmpp.protocol import NS_DATA
from nbxmpp.protocol import NS_RSM
@@ -32,14 +36,6 @@ from nbxmpp.util import call_on_response
from nbxmpp.util import callback
from nbxmpp.util import raise_error
-try:
- import gi
- gi.require_version('Soup', '2.4')
- from gi.repository import Soup
- SOUP_AVAILABLE = True
-except (ValueError, ImportError):
- SOUP_AVAILABLE = False
-
log = logging.getLogger('nbxmpp.m.muclumbus')
# API Documentation
@@ -50,9 +46,14 @@ class Muclumbus:
self._client = client
self.handlers = []
- self._soup_session = None
- if SOUP_AVAILABLE:
- self._soup_session = Soup.Session()
+ self._proxy_resolver = None
+ self._soup_session = Soup.Session()
+
+ def set_proxy(self, proxy):
+ if proxy is None:
+ return
+ self._proxy_resolver = proxy.get_resolver()
+ self._soup_session.props.proxy_resolver = self._proxy_resolver
@call_on_response('_parameters_received')
def request_parameters(self, jid):
@@ -90,9 +91,6 @@ class Muclumbus:
def set_http_search(self, uri, keywords, after=None,
callback=None, user_data=None):
- if not SOUP_AVAILABLE:
- raise ImportError('Module Soup not found')
-
search = {'keywords': keywords}
if after is not None:
search['after'] = after
diff --git a/nbxmpp/modules/nickname.py b/nbxmpp/modules/nickname.py
index 0262a64..1da02cb 100644
--- a/nbxmpp/modules/nickname.py
+++ b/nbxmpp/modules/nickname.py
@@ -44,7 +44,7 @@ class Nickname:
priority=40),
]
- def _process_nickname(self, _con, stanza, properties):
+ def _process_nickname(self, _client, stanza, properties):
if stanza.getName() == 'message':
properties.nickname = self._parse_nickname(stanza)
@@ -57,7 +57,7 @@ class Nickname:
return
properties.nickname = self._parse_nickname(stanza)
- def _process_pubsub_nickname(self, _con, _stanza, properties):
+ def _process_pubsub_nickname(self, _client, _stanza, properties):
if not properties.is_pubsub_event:
return
diff --git a/nbxmpp/modules/omemo.py b/nbxmpp/modules/omemo.py
index 859e37e..ba24d42 100644
--- a/nbxmpp/modules/omemo.py
+++ b/nbxmpp/modules/omemo.py
@@ -55,7 +55,7 @@ class OMEMO:
priority=7),
]
- def _process_omemo_message(self, _con, stanza, properties):
+ def _process_omemo_message(self, _client, stanza, properties):
try:
properties.omemo = self._parse_omemo_message(stanza)
log.info('Received message')
@@ -126,7 +126,7 @@ class OMEMO:
return OMEMOMessage(sid=sid, iv=iv, keys=keys, payload=payload)
- def _process_omemo_devicelist(self, _con, stanza, properties):
+ def _process_omemo_devicelist(self, _client, stanza, properties):
if not properties.is_pubsub_event:
return
diff --git a/nbxmpp/modules/oob.py b/nbxmpp/modules/oob.py
index c0de07f..0b1f210 100644
--- a/nbxmpp/modules/oob.py
+++ b/nbxmpp/modules/oob.py
@@ -34,7 +34,7 @@ class OOB:
priority=15),
]
- def _process_message_oob(self, _con, stanza, properties):
+ def _process_message_oob(self, _client, stanza, properties):
oob = stanza.getTag('x', namespace=NS_X_OOB)
if oob is None:
return
diff --git a/nbxmpp/modules/openpgp.py b/nbxmpp/modules/openpgp.py
index 8079098..0b2978d 100644
--- a/nbxmpp/modules/openpgp.py
+++ b/nbxmpp/modules/openpgp.py
@@ -59,7 +59,7 @@ class OpenPGP:
priority=7),
]
- def _process_openpgp_message(self, _con, stanza, properties):
+ def _process_openpgp_message(self, _client, stanza, properties):
openpgp = stanza.getTag('openpgp', namespace=NS_OPENPGP)
if openpgp is None:
log.warning('No openpgp node found')
@@ -80,7 +80,7 @@ class OpenPGP:
log.warning(stanza)
return
- def _process_pubsub_openpgp(self, _con, stanza, properties):
+ def _process_pubsub_openpgp(self, _client, stanza, properties):
if not properties.is_pubsub_event:
return
diff --git a/nbxmpp/modules/pgplegacy.py b/nbxmpp/modules/pgplegacy.py
index 256b89f..0f6f458 100644
--- a/nbxmpp/modules/pgplegacy.py
+++ b/nbxmpp/modules/pgplegacy.py
@@ -39,7 +39,7 @@ class PGPLegacy:
]
@staticmethod
- def _process_signed(_con, stanza, properties):
+ def _process_signed(_client, stanza, properties):
signed = stanza.getTag('x', namespace=NS_SIGNED)
if signed is None:
return
@@ -47,7 +47,7 @@ class PGPLegacy:
properties.signed = signed.getData()
@staticmethod
- def _process_pgplegacy_message(_con, stanza, properties):
+ def _process_pgplegacy_message(_client, stanza, properties):
pgplegacy = stanza.getTag('x', namespace=NS_ENCRYPTED)
if pgplegacy is None:
log.warning('No x node found')
diff --git a/nbxmpp/modules/presence.py b/nbxmpp/modules/presence.py
index f9e3bc2..bebb2fa 100644
--- a/nbxmpp/modules/presence.py
+++ b/nbxmpp/modules/presence.py
@@ -37,7 +37,7 @@ class BasePresence:
priority=10),
]
- def _process_presence_base(self, _con, stanza, properties):
+ def _process_presence_base(self, _client, stanza, properties):
properties.type = self._parse_type(stanza)
properties.priority = self._parse_priority(stanza)
properties.show = self._parse_show(stanza)
@@ -79,7 +79,7 @@ class BasePresence:
except ValueError:
log.warning('Presence with invalid type received')
log.warning(stanza)
- self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST))
+ self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST))
raise NodeProcessed
@staticmethod
diff --git a/nbxmpp/modules/pubsub.py b/nbxmpp/modules/pubsub.py
index 2f8d1a2..3967568 100644
--- a/nbxmpp/modules/pubsub.py
+++ b/nbxmpp/modules/pubsub.py
@@ -50,7 +50,7 @@ class PubSub:
priority=15),
]
- def _process_pubsub_base(self, _con, stanza, properties):
+ def _process_pubsub_base(self, _client, stanza, properties):
properties.pubsub = True
event = stanza.getTag('event', namespace=NS_PUBSUB_EVENT)
diff --git a/nbxmpp/modules/receipts.py b/nbxmpp/modules/receipts.py
index 329e88a..2056efe 100644
--- a/nbxmpp/modules/receipts.py
+++ b/nbxmpp/modules/receipts.py
@@ -38,7 +38,7 @@ class Receipts:
priority=15),
]
- def _process_message_receipt(self, _con, stanza, properties):
+ def _process_message_receipt(self, _client, stanza, properties):
request = stanza.getTag('request', namespace=NS_RECEIPTS)
if request is not None:
properties.receipt = ReceiptData(request.getName())
diff --git a/nbxmpp/modules/security_labels.py b/nbxmpp/modules/security_labels.py
index 04e119d..ac29d11 100644
--- a/nbxmpp/modules/security_labels.py
+++ b/nbxmpp/modules/security_labels.py
@@ -35,7 +35,7 @@ class SecurityLabels:
priority=15),
]
- def _process_message_security_label(self, _con, stanza, properties):
+ def _process_message_security_label(self, _client, stanza, properties):
security = stanza.getTag('securitylabel', namespace=NS_SECLABEL)
if security is None:
return
diff --git a/nbxmpp/modules/software_version.py b/nbxmpp/modules/software_version.py
index c23188c..23d06b5 100644
--- a/nbxmpp/modules/software_version.py
+++ b/nbxmpp/modules/software_version.py
@@ -101,5 +101,5 @@ class SoftwareVersion:
log.info('Send software version: %s %s %s',
self._name, self._version, self._os)
- self._client.send(iq)
+ self._client.send_stanza(iq)
raise NodeProcessed
diff --git a/nbxmpp/modules/tune.py b/nbxmpp/modules/tune.py
index 6ce4b0f..15bc610 100644
--- a/nbxmpp/modules/tune.py
+++ b/nbxmpp/modules/tune.py
@@ -37,7 +37,7 @@ class Tune:
priority=16),
]
- def _process_pubsub_tune(self, _con, _stanza, properties):
+ def _process_pubsub_tune(self, _client, _stanza, properties):
if not properties.is_pubsub_event:
return
diff --git a/nbxmpp/modules/user_avatar.py b/nbxmpp/modules/user_avatar.py
index fb79c96..b7a33ee 100644
--- a/nbxmpp/modules/user_avatar.py
+++ b/nbxmpp/modules/user_avatar.py
@@ -45,7 +45,7 @@ class UserAvatar:
priority=16),
]
- def _process_pubsub_avatar(self, _con, stanza, properties):
+ def _process_pubsub_avatar(self, _client, stanza, properties):
if not properties.is_pubsub_event:
return
diff --git a/nbxmpp/modules/vcard_avatar.py b/nbxmpp/modules/vcard_avatar.py
index c47d5ed..6e929be 100644
--- a/nbxmpp/modules/vcard_avatar.py
+++ b/nbxmpp/modules/vcard_avatar.py
@@ -36,7 +36,7 @@ class VCardAvatar:
]
@staticmethod
- def _process_avatar(_con, stanza, properties):
+ def _process_avatar(_client, stanza, properties):
if properties.type != PresenceType.AVAILABLE:
return
diff --git a/nbxmpp/old_dispatcher.py b/nbxmpp/old_dispatcher.py
new file mode 100644
index 0000000..60f68d5
--- /dev/null
+++ b/nbxmpp/old_dispatcher.py
@@ -0,0 +1,758 @@
+## dispatcher.py
+##
+## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
+## modified by Dimitur Kirov <dkirov@gmail.com>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2, or (at your option)
+## any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+
+
+"""
+Main xmpp decision making logic. Provides library with methods to assign
+different handlers to different XMPP stanzas and namespaces
+"""
+
+import sys
+import locale
+import re
+import uuid
+import logging
+import inspect
+from xml.parsers.expat import ExpatError
+
+from nbxmpp.simplexml import NodeBuilder
+from nbxmpp.plugin import PlugIn
+from nbxmpp.protocol import NS_STREAMS
+from nbxmpp.protocol import NS_HTTP_BIND
+from nbxmpp.protocol import NodeProcessed
+from nbxmpp.protocol import InvalidFrom
+from nbxmpp.protocol import InvalidJid
+from nbxmpp.protocol import InvalidStanza
+from nbxmpp.protocol import Iq
+from nbxmpp.protocol import Presence
+from nbxmpp.protocol import Message
+from nbxmpp.protocol import Protocol
+from nbxmpp.protocol import Node
+from nbxmpp.protocol import Error
+from nbxmpp.protocol import ERR_FEATURE_NOT_IMPLEMENTED
+from nbxmpp.modules.eme import EME
+from nbxmpp.modules.http_auth import HTTPAuth
+from nbxmpp.modules.presence import BasePresence
+from nbxmpp.modules.message import BaseMessage
+from nbxmpp.modules.iq import BaseIq
+from nbxmpp.modules.nickname import Nickname
+from nbxmpp.modules.delay import Delay
+from nbxmpp.modules.muc import MUC
+from nbxmpp.modules.idle import Idle
+from nbxmpp.modules.pgplegacy import PGPLegacy
+from nbxmpp.modules.vcard_avatar import VCardAvatar
+from nbxmpp.modules.captcha import Captcha
+from nbxmpp.modules.entity_caps import EntityCaps
+from nbxmpp.modules.blocking import Blocking
+from nbxmpp.modules.pubsub import PubSub
+from nbxmpp.modules.activity import Activity
+from nbxmpp.modules.tune import Tune
+from nbxmpp.modules.mood import Mood
+from nbxmpp.modules.location import Location
+from nbxmpp.modules.user_avatar import UserAvatar
+from nbxmpp.modules.bookmarks import Bookmarks
+from nbxmpp.modules.openpgp import OpenPGP
+from nbxmpp.modules.omemo import OMEMO
+from nbxmpp.modules.annotations import Annotations
+from nbxmpp.modules.muclumbus import Muclumbus
+from nbxmpp.modules.software_version import SoftwareVersion
+from nbxmpp.modules.adhoc import AdHoc
+from nbxmpp.modules.ibb import IBB
+from nbxmpp.modules.discovery import Discovery
+from nbxmpp.modules.chat_markers import ChatMarkers
+from nbxmpp.modules.receipts import Receipts
+from nbxmpp.modules.oob import OOB
+from nbxmpp.modules.correction import Correction
+from nbxmpp.modules.attention import Attention
+from nbxmpp.modules.security_labels import SecurityLabels
+from nbxmpp.modules.chatstates import Chatstates
+from nbxmpp.modules.register import Register
+from nbxmpp.modules.http_upload import HTTPUpload
+from nbxmpp.modules.misc import unwrap_carbon
+from nbxmpp.modules.misc import unwrap_mam
+from nbxmpp.util import get_properties_struct
+
+
+log = logging.getLogger('nbxmpp.dispatcher')
+
+#: default timeout to wait for response for our id
+DEFAULT_TIMEOUT_SECONDS = 25
+
+XML_DECLARATION = '<?xml version=\'1.0\'?>'
+
+# FIXME: ugly
+class Dispatcher:
+ """
+ Why is this here - I needed to redefine Dispatcher for BOSH and easiest way
+ was to inherit original Dispatcher (now renamed to XMPPDispatcher). Trouble
+ is that reference used to access dispatcher instance is in Client attribute
+ named by __class__.__name__ of the dispatcher instance .. long story short:
+
+ I wrote following to avoid changing each client.Dispatcher.whatever() in xmpp
+
+ If having two kinds of dispatcher will go well, I will rewrite the dispatcher
+ references in other scripts
+ """
+
+ def PlugIn(self, client_obj, after_SASL=False, old_features=None):
+ if client_obj.protocol_type == 'XMPP':
+ XMPPDispatcher().PlugIn(client_obj)
+ elif client_obj.protocol_type == 'BOSH':
+ BOSHDispatcher().PlugIn(client_obj, after_SASL, old_features)
+ else:
+ assert False # should never be reached
+
+ @classmethod
+ def get_instance(cls, *args, **kwargs):
+ """
+ Factory Method for object creation
+
+ Use this instead of directly initializing the class in order to make
+ unit testing much easier.
+ """
+ return cls(*args, **kwargs)
+
+
+class XMPPDispatcher(PlugIn):
+ """
+ Handles XMPP stream and is the first who takes control over a fresh stanza
+
+ Is plugged into NonBlockingClient but can be replugged to restart handled
+ stream headers (used by SASL f.e.).
+ """
+
+ def __init__(self):
+ PlugIn.__init__(self)
+ self.handlers = {}
+ self._modules = {}
+ self._expected = {}
+ self._defaultHandler = None
+ self._pendingExceptions = []
+ self._eventHandler = None
+ self._cycleHandlers = []
+ self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler,
+ self.RegisterEventHandler, self.UnregisterCycleHandler,
+ self.RegisterCycleHandler, self.RegisterHandlerOnce,
+ self.UnregisterHandler, self.RegisterProtocol,
+ self.SendAndCallForResponse,
+ self.getAnID, self.Event, self.send, self.get_module]
+
+ # \ufddo -> \ufdef range
+ c = '\ufdd0'
+ r = c
+ while c < '\ufdef':
+ c = chr(ord(c) + 1)
+ r += '|' + c
+
+ # \ufffe-\uffff, \u1fffe-\u1ffff, ..., \u10fffe-\u10ffff
+ c = '\ufffe'
+ r += '|' + c
+ r += '|' + chr(ord(c) + 1)
+ while c < '\U0010fffe':
+ c = chr(ord(c) + 0x10000)
+ r += '|' + c
+ r += '|' + chr(ord(c) + 1)
+
+ self.invalid_chars_re = re.compile(r)
+
+ def getAnID(self):
+ return str(uuid.uuid4())
+
+ def dumpHandlers(self):
+ """
+ Return set of user-registered callbacks in it's internal format. Used
+ within the library to carry user handlers set over Dispatcher replugins
+ """
+ return self.handlers
+
+ def restoreHandlers(self, handlers):
+ """
+ Restore user-registered callbacks structure from dump previously obtained
+ via dumpHandlers. Used within the library to carry user handlers set over
+ Dispatcher replugins.
+ """
+ self.handlers = handlers
+
+ def get_module(self, name):
+ return self._modules[name]
+
+ def _register_modules(self):
+ self._modules['BasePresence'] = BasePresence(self._owner)
+ self._modules['BaseMessage'] = BaseMessage(self._owner)
+ self._modules['BaseIq'] = BaseIq(self._owner)
+ self._modules['EME'] = EME(self._owner)
+ self._modules['HTTPAuth'] = HTTPAuth(self._owner)
+ self._modules['Nickname'] = Nickname(self._owner)
+ self._modules['MUC'] = MUC(self._owner)
+ self._modules['Delay'] = Delay(self._owner)
+ self._modules['Captcha'] = Captcha(self._owner)
+ self._modules['Idle'] = Idle(self._owner)
+ self._modules['PGPLegacy'] = PGPLegacy(self._owner)
+ self._modules['VCardAvatar'] = VCardAvatar(self._owner)
+ self._modules['EntityCaps'] = EntityCaps(self._owner)
+ self._modules['Blocking'] = Blocking(self._owner)
+ self._modules['PubSub'] = PubSub(self._owner)
+ self._modules['Mood'] = Mood(self._owner)
+ self._modules['Activity'] = Activity(self._owner)
+ self._modules['Tune'] = Tune(self._owner)
+ self._modules['Location'] = Location(self._owner)
+ self._modules['UserAvatar'] = UserAvatar(self._owner)
+ self._modules['Bookmarks'] = Bookmarks(self._owner)
+ self._modules['OpenPGP'] = OpenPGP(self._owner)
+ self._modules['OMEMO'] = OMEMO(self._owner)
+ self._modules['Annotations'] = Annotations(self._owner)
+ self._modules['Muclumbus'] = Muclumbus(self._owner)
+ self._modules['SoftwareVersion'] = SoftwareVersion(self._owner)
+ self._modules['AdHoc'] = AdHoc(self._owner)
+ self._modules['IBB'] = IBB(self._owner)
+ self._modules['Discovery'] = Discovery(self._owner)
+ self._modules['ChatMarkers'] = ChatMarkers(self._owner)
+ self._modules['Receipts'] = Receipts(self._owner)
+ self._modules['OOB'] = OOB(self._owner)
+ self._modules['Correction'] = Correction(self._owner)
+ self._modules['Attention'] = Attention(self._owner)
+ self._modules['SecurityLabels'] = SecurityLabels(self._owner)
+ self._modules['Chatstates'] = Chatstates(self._owner)
+ self._modules['Register'] = Register(self._owner)
+ self._modules['HTTPUpload'] = HTTPUpload(self._owner)
+
+ for instance in self._modules.values():
+ for handler in instance.handlers:
+ self.RegisterHandler(*handler)
+
+ def _init(self):
+ """
+ Register default namespaces/protocols/handlers. Used internally
+ """
+ # FIXME: inject dependencies, do not rely that they are defined by our
+ # owner
+ self.RegisterNamespace('unknown')
+ self.RegisterNamespace(NS_STREAMS)
+ self.RegisterNamespace(self._owner.defaultNamespace)
+ self.RegisterProtocol('iq', Iq)
+ self.RegisterProtocol('presence', Presence)
+ self.RegisterProtocol('message', Message)
+ self.RegisterDefaultHandler(self.returnStanzaHandler)
+ self.RegisterEventHandler(self._owner._caller._event_dispatcher)
+ self._register_modules()
+
+ def plugin(self, owner):
+ """
+ Plug the Dispatcher instance into Client class instance and send initial
+ stream header. Used internally
+ """
+ self._init()
+ self._owner.lastErrNode = None
+ self._owner.lastErr = None
+ self._owner.lastErrCode = None
+ if hasattr(self._owner, 'StreamInit'):
+ self._owner.StreamInit()
+ else:
+ self.StreamInit()
+
+ def plugout(self):
+ """
+ Prepare instance to be destructed
+ """
+ self._modules = {}
+ self.Stream.dispatch = None
+ self.Stream.features = None
+ self.Stream.destroy()
+ self._owner = None
+ self.Stream = None
+
+ def StreamInit(self):
+ """
+ Send an initial stream header
+ """
+ self._owner.Connection.sendqueue = []
+ self.Stream = NodeBuilder()
+ self.Stream.dispatch = self.dispatch
+ self.Stream._dispatch_depth = 2
+ self.Stream.stream_header_received = self._check_stream_start
+ self.Stream.features = None
+ self._metastream = Node('stream:stream')
+ self._metastream.setNamespace(self._owner.Namespace)
+ self._metastream.setAttr('version', '1.0')
+ self._metastream.setAttr('xmlns:stream', NS_STREAMS)
+ self._metastream.setAttr('to', self._owner.Server)
+ self._metastream.setAttr('xml:lang', self._owner.lang)
+ self._owner.send("%s%s>" % (XML_DECLARATION, str(self._metastream)[:-2]))
+
+ def _check_stream_start(self, ns, tag, attrs):
+ if ns != NS_STREAMS or tag!='stream':
+ raise ValueError('Incorrect stream start: (%s,%s). Terminating.'
+ % (tag, ns))
+
+ def replace_non_character(self, data):
+ return re.sub(self.invalid_chars_re, '\ufffd', data)
+
+ def ProcessNonBlocking(self, data):
+ """
+ Check incoming stream for data waiting
+
+ :param data: data received from transports/IO sockets
+ :return:
+ 1) length of processed data if some data were processed;
+ 2) '0' string if no data were processed but link is alive;
+ 3) 0 (zero) if underlying connection is closed.
+ """
+ # FIXME:
+ # When an error occurs we disconnect the transport directly. Client's
+ # disconnect method will never be called.
+ # Is this intended?
+ # also look at transports start_disconnect()
+ data = self.replace_non_character(data)
+ for handler in self._cycleHandlers:
+ handler(self)
+ if len(self._pendingExceptions) > 0:
+ _pendingException = self._pendingExceptions.pop()
+ sys.excepthook(*_pendingException)
+ return
+ try:
+ self.Stream.Parse(data)
+ # end stream:stream tag received
+ if self.Stream and self.Stream.has_received_endtag():
+ self._owner.disconnect(self.Stream.streamError)
+ return 0
+ except ExpatError as error:
+ log.error('Invalid XML received from server. Forcing disconnect.')
+ log.error(error)
+ self._owner.Connection.disconnect()
+ return 0
+ except ValueError as e:
+ log.debug('ValueError: %s' % str(e))
+ self._owner.Connection.pollend()
+ return 0
+ if len(self._pendingExceptions) > 0:
+ _pendingException = self._pendingExceptions.pop()
+ sys.excepthook(*_pendingException)
+ return
+ if len(data) == 0:
+ return '0'
+ return len(data)
+
+ def RegisterNamespace(self, xmlns, order='info'):
+ """
+ Create internal structures for newly registered namespace
+
+ You can register handlers for this namespace afterwards. By default
+ one namespace is already registered
+ (jabber:client or jabber:component:accept depending on context.
+ """
+ log.debug('Registering namespace "%s"' % xmlns)
+ self.handlers[xmlns] = {}
+ self.RegisterProtocol('unknown', Protocol, xmlns=xmlns)
+ self.RegisterProtocol('default', Protocol, xmlns=xmlns)
+
+ def RegisterProtocol(self, tag_name, proto, xmlns=None, order='info'):
+ """
+ Used to declare some top-level stanza name to dispatcher
+
+ Needed to start registering handlers for such stanzas. Iq, message and
+ presence protocols are registered by default.
+ """
+ if not xmlns:
+ xmlns = self._owner.defaultNamespace
+ log.debug('Registering protocol "%s" as %s(%s)', tag_name, proto, xmlns)
+ self.handlers[xmlns][tag_name] = {'type': proto, 'default': []}
+
+ def RegisterNamespaceHandler(self, xmlns, handler, typ='', ns='', system=0):
+ """
+ Register handler for processing all stanzas for specified namespace
+ """
+ self.RegisterHandler('default', handler, typ, ns, xmlns, system)
+
+ def RegisterHandler(self, name, handler, typ='', ns='', xmlns=None,
+ system=False, priority=50):
+ """
+ Register user callback as stanzas handler of declared type
+
+ Callback arguments:
+ dispatcher instance (for replying), incoming return of previous handlers.
+ The callback must raise xmpp.NodeProcessed just before return if it wants
+ to prevent other callbacks to be called with the same stanza as argument
+ _and_, more importantly library from returning stanza to sender with error set.
+
+ :param name: name of stanza. F.e. "iq".
+ :param handler: user callback.
+ :param typ: value of stanza's "type" attribute. If not specified any
+ value will match
+ :param ns: namespace of child that stanza must contain.
+ :param xmlns: xml namespace
+ :param system: call handler even if NodeProcessed Exception were raised
+ already.
+ :param priority: The priority of the handler, higher get called later
+ """
+ if not xmlns:
+ xmlns = self._owner.defaultNamespace
+
+ if not typ and not ns:
+ typ = 'default'
+
+ log.debug(
+ 'Registering handler %s for "%s" type->%s ns->%s(%s) priority->%s',
+ handler, name, typ, ns, xmlns, priority)
+
+ if xmlns not in self.handlers:
+ self.RegisterNamespace(xmlns, 'warn')
+ if name not in self.handlers[xmlns]:
+ self.RegisterProtocol(name, Protocol, xmlns, 'warn')
+
+ specific = typ + ns
+ if specific not in self.handlers[xmlns][name]:
+ self.handlers[xmlns][name][specific] = []
+
+ self.handlers[xmlns][name][specific].append(
+ {'func': handler,
+ 'system': system,
+ 'priority': priority,
+ 'specific': specific})
+
+ def RegisterHandlerOnce(self, name, handler, typ='', ns='', xmlns=None,
+ system=0):
+ """
+ Unregister handler after first call (not implemented yet)
+ """
+ # FIXME Drop or implement
+ if not xmlns:
+ xmlns = self._owner.defaultNamespace
+ self.RegisterHandler(name, handler, typ, ns, xmlns, system)
+
+ def UnregisterHandler(self, name, handler, typ='', ns='', xmlns=None):
+ """
+ Unregister handler. "typ" and "ns" must be specified exactly the same as
+ with registering.
+ """
+ if not xmlns:
+ xmlns = self._owner.defaultNamespace
+ if not typ and not ns:
+ typ = 'default'
+ if xmlns not in self.handlers:
+ return
+ if name not in self.handlers[xmlns]:
+ return
+
+ specific = typ + ns
+ if specific not in self.handlers[xmlns][name]:
+ return
+ for handler_dict in self.handlers[xmlns][name][specific]:
+ if handler_dict['func'] == handler:
+ try:
+ self.handlers[xmlns][name][specific].remove(handler_dict)
+ log.debug(
+ 'Unregister handler %s for "%s" type->%s ns->%s(%s)',
+ handler, name, typ, ns, xmlns)
+ except ValueError:
+ log.warning(
+ 'Unregister failed: %s for "%s" type->%s ns->%s(%s)',
+ handler, name, typ, ns, xmlns)
+ pass
+
+ def RegisterDefaultHandler(self, handler):
+ """
+ Specify the handler that will be used if no NodeProcessed exception were
+ raised. This is returnStanzaHandler by default.
+ """
+ self._defaultHandler = handler
+
+ def RegisterEventHandler(self, handler):
+ """
+ Register handler that will process events. F.e. "FILERECEIVED" event. See
+ common/connection: _event_dispatcher()
+ """
+ self._eventHandler = handler
+
+ def returnStanzaHandler(self, conn, stanza):
+ """
+ Return stanza back to the sender with <feature-not-implemented/> error
+ set
+ """
+ if stanza.getType() in ('get', 'set'):
+ conn._owner.send(Error(stanza, ERR_FEATURE_NOT_IMPLEMENTED))
+
+ def RegisterCycleHandler(self, handler):
+ """
+ Register handler that will be called on every Dispatcher.Process() call
+ """
+ if handler not in self._cycleHandlers:
+ self._cycleHandlers.append(handler)
+
+ def UnregisterCycleHandler(self, handler):
+ """
+ Unregister handler that will be called on every Dispatcher.Process() call
+ """
+ if handler in self._cycleHandlers:
+ self._cycleHandlers.remove(handler)
+
+ def Event(self, realm, event, data=None):
+ """
+ Raise some event
+
+ :param realm: scope of event. Usually a namespace.
+ :param event: the event itself. F.e. "SUCCESSFUL SEND".
+ :param data: data that comes along with event. Depends on event.
+ """
+ if self._eventHandler:
+ self._eventHandler(realm, event, data)
+ else:
+ log.warning('Received unhandled event: %s' % event)
+
+ def dispatch(self, stanza):
+ """
+ Main procedure that performs XMPP stanza recognition and calling
+ apppropriate handlers for it. Called by simplexml
+ """
+
+ self.Event('', 'STANZA RECEIVED', stanza)
+
+ self.Stream._mini_dom = None
+
+ # Count stanza
+ self._owner.Smacks.count_incoming(stanza.getName())
+
+ name = stanza.getName()
+ if name == 'features':
+ self._owner.got_features = True
+ self.Stream.features = stanza
+ elif name == 'error':
+ if stanza.getTag('see-other-host'):
+ self._owner.got_see_other_host = stanza
+
+ xmlns = stanza.getNamespace()
+
+ if xmlns not in self.handlers:
+ log.warning('Unknown namespace: %s', xmlns)
+ xmlns = 'unknown'
+ # features stanza has been handled before
+ if name not in self.handlers[xmlns]:
+ if name not in ('features', 'stream'):
+ log.warning('Unknown stanza: %s', stanza)
+ else:
+ log.debug('Got %s / %s stanza', xmlns, name)
+ name = 'unknown'
+ else:
+ log.debug('Got %s / %s stanza', xmlns, name)
+
+ # Convert simplexml to Protocol object
+ try:
+ stanza = self.handlers[xmlns][name]['type'](node=stanza)
+ except InvalidJid:
+ log.warning('Invalid JID, ignoring stanza')
+ log.warning(stanza)
+ return
+
+ own_jid = self._owner.get_bound_jid()
+ properties = get_properties_struct(name)
+
+ if name == 'iq':
+ if stanza.getFrom() is None and own_jid is not None:
+ stanza.setFrom(own_jid.getBare())
+
+ if name == 'message':
+ # https://tools.ietf.org/html/rfc6120#section-8.1.1.1
+ # If the stanza does not include a 'to' address then the client MUST
+ # treat it as if the 'to' address were included with a value of the
+ # client's full JID.
+
+ to = stanza.getTo()
+ if to is None:
+ stanza.setTo(own_jid)
+ elif not to.bareMatch(own_jid):
+ log.warning('Message addressed to someone else: %s', stanza)
+ return
+
+ if stanza.getFrom() is None:
+ stanza.setFrom(own_jid.getBare())
+
+ # Unwrap carbon
+ try:
+ stanza, properties.carbon = unwrap_carbon(stanza, own_jid)
+ except (InvalidFrom, InvalidJid) as exc:
+ log.warning(exc)
+ log.warning(stanza)
+ return
+ except NodeProcessed as exc:
+ log.info(exc)
+ return
+
+ # Unwrap mam
+ try:
+ stanza, properties.mam = unwrap_mam(stanza, own_jid)
+ except (InvalidStanza, InvalidJid) as exc:
+ log.warning(exc)
+ log.warning(stanza)
+ return
+
+ typ = stanza.getType()
+ if name == 'message' and not typ:
+ typ = 'normal'
+ elif not typ:
+ typ = ''
+
+ stanza.props = stanza.getProperties()
+ log.debug('type: %s, properties: %s', typ, stanza.props)
+
+ _id = stanza.getID()
+ processed = False
+ if _id in self._expected:
+ cb, args = self._expected[_id]
+ log.debug('Expected stanza arrived. Callback %s(%s) found',
+ cb, args)
+ try:
+ if args is None:
+ cb(self, stanza)
+ else:
+ cb(self, stanza, **args)
+ except NodeProcessed:
+ pass
+ return
+
+ # Gather specifics depending on stanza properties
+ specifics = ['default']
+ if typ and typ in self.handlers[xmlns][name]:
+ specifics.append(typ)
+ for prop in stanza.props:
+ if prop in self.handlers[xmlns][name]:
+ specifics.append(prop)
+ if typ and typ + prop in self.handlers[xmlns][name]:
+ specifics.append(typ + prop)
+
+ # Create the handler chain
+ chain = []
+ chain += self.handlers[xmlns]['default']['default']
+ for specific in specifics:
+ chain += self.handlers[xmlns][name][specific]
+
+ # Sort chain with priority
+ chain.sort(key=lambda x: x['priority'])
+
+ for handler in chain:
+ if not processed or handler['system']:
+ try:
+ log.info('Call handler: %s', handler['func'].__qualname__)
+ # Backwards compatibility until all handlers support
+ # properties
+ signature = inspect.signature(handler['func'])
+ if len(signature.parameters) > 2:
+ handler['func'](self, stanza, properties)
+ else:
+ handler['func'](self, stanza)
+ except NodeProcessed:
+ processed = True
+ except Exception:
+ self._pendingExceptions.insert(0, sys.exc_info())
+ return
+
+ # Stanza was not processed call default handler
+ if not processed and self._defaultHandler:
+ self._defaultHandler(self, stanza)
+
+ def SendAndCallForResponse(self, stanza, func=None, args=None):
+ """
+ Put stanza on the wire and call back when recipient replies. Additional
+ callback arguments can be specified in args
+ """
+ _waitid = self.send(stanza)
+ self._expected[_waitid] = (func, args)
+ return _waitid
+
+ def send(self, stanza, now=False):
+ """
+ Wrap transports send method when plugged into NonBlockingClient. Makes
+ sure stanzas get ID and from tag.
+ """
+ ID = None
+ if type(stanza) != str:
+ if isinstance(stanza, Protocol):
+ ID = stanza.getID()
+ if ID is None:
+ stanza.setID(self.getAnID())
+ ID = stanza.getID()
+ if self._owner._registered_name and not stanza.getAttr('from'):
+ stanza.setAttr('from', self._owner._registered_name)
+
+ self._owner.Connection.send(stanza, now)
+
+ # If no ID then it is a whitespace
+ if hasattr(self._owner, 'Smacks') and ID:
+ self._owner.Smacks.save_in_queue(stanza)
+
+ return ID
+
+
+class BOSHDispatcher(XMPPDispatcher):
+
+ def PlugIn(self, owner, after_SASL=False, old_features=None):
+ self.old_features = old_features
+ self.after_SASL = after_SASL
+ XMPPDispatcher.PlugIn(self, owner)
+
+ def StreamInit(self):
+ """
+ Send an initial stream header
+ """
+ self.Stream = NodeBuilder()
+ self.Stream.dispatch = self.dispatch
+ self.Stream._dispatch_depth = 2
+ self.Stream.stream_header_received = self._check_stream_start
+ self.Stream.features = self.old_features
+
+ self._metastream = Node('stream:stream')
+ self._metastream.setNamespace(self._owner.Namespace)
+ self._metastream.setAttr('version', '1.0')
+ self._metastream.setAttr('xmlns:stream', NS_STREAMS)
+ self._metastream.setAttr('to', self._owner.Server)
+ self._metastream.setAttr('xml:lang', self._owner.lang)
+
+ self.restart = True
+ self._owner.Connection.send_init(after_SASL=self.after_SASL)
+
+ def StreamTerminate(self):
+ """
+ Send a stream terminator
+ """
+ self._owner.Connection.send_terminator()
+
+ def ProcessNonBlocking(self, data=None):
+ if self.restart:
+ fromstream = self._metastream
+ fromstream.setAttr('from', fromstream.getAttr('to'))
+ fromstream.delAttr('to')
+ data = '%s%s>%s' % (XML_DECLARATION, str(fromstream)[:-2], data)
+ self.restart = False
+ return XMPPDispatcher.ProcessNonBlocking(self, data)
+
+ def dispatch(self, stanza):
+ if stanza.getName() == 'body' and stanza.getNamespace() == NS_HTTP_BIND:
+
+ stanza_attrs = stanza.getAttrs()
+ if 'authid' in stanza_attrs:
+ # should be only in init response
+ # auth module expects id of stream in document attributes
+ self.Stream._document_attrs['id'] = stanza_attrs['authid']
+ self._owner.Connection.handle_body_attrs(stanza_attrs)
+
+ children = stanza.getChildren()
+ if children:
+ for child in children:
+ # if child doesn't have any ns specified, simplexml (or expat)
+ # thinks it's of parent's (BOSH body) namespace, so we have to
+ # rewrite it to jabber:client
+ if child.getNamespace() == NS_HTTP_BIND:
+ child.setNamespace(self._owner.defaultNamespace)
+ XMPPDispatcher.dispatch(self, child)
+ else:
+ XMPPDispatcher.dispatch(self, stanza)
diff --git a/nbxmpp/protocol.py b/nbxmpp/protocol.py
index 206d65e..af0222b 100644
--- a/nbxmpp/protocol.py
+++ b/nbxmpp/protocol.py
@@ -139,6 +139,7 @@ NS_PUBSUB_PUBLISH_OPTIONS = NS_PUBSUB + '#publish-options' # XEP-0060
NS_PUBSUB_OWNER = 'http://jabber.org/protocol/pubsub#owner' # XEP-0060
NS_PUBSUB_CONFIG = 'http://jabber.org/protocol/pubsub#node_config' # XEP-0060
NS_REGISTER = 'jabber:iq:register'
+NS_REGISTER_FEATURE = 'http://jabber.org/features/iq-register'
NS_ROSTER = 'jabber:iq:roster'
NS_ROSTERNOTES = 'storage:rosternotes'
NS_ROSTERX = 'http://jabber.org/protocol/rosterx' # XEP-0144
@@ -204,6 +205,7 @@ NS_BOOKMARK_CONVERSION = 'urn:xmpp:bookmarks-conversion:0'
NS_DOMAIN_BASED_NAME = 'urn:xmpp:domain-based-name:1'
NS_HINTS = 'urn:xmpp:hints'
NS_MUCLUMBUS = 'https://xmlns.zombofant.net/muclumbus/search/1.0'
+NS_FRAMING = 'urn:ietf:params:xml:ns:xmpp-framing'
#xmpp_stream_error_conditions = '''
#bad-format -- -- -- The entity has sent XML that cannot be processed.
@@ -951,14 +953,39 @@ class JID:
return str(self)
-class BOSHBody(Node):
- """
- <body> tag that wraps usual XMPP stanzas in XMPP over BOSH
- """
+class StreamErrorNode(Node):
+ def __init__(self, node):
+ Node.__init__(self, node=node)
+
+ self._text = {}
+
+ text_elements = self.getTags('text', namespace=NS_XMPP_STREAMS)
+ for element in text_elements:
+ lang = element.getXmlLang()
+ text = element.getData()
+ self._text[lang] = text
- def __init__(self, attrs=None, payload=None, node=None):
- Node.__init__(self, tag='body', attrs=attrs, payload=payload, node=node)
- self.setNamespace(NS_HTTP_BIND)
+ def get_condition(self):
+ for tag in self.getChildren():
+ if tag.getName() != 'text' and tag.getNamespace() == NS_XMPP_STREAMS:
+ return tag.getName()
+
+ def get_text(self, pref_lang=None):
+ if pref_lang is not None:
+ text = self._text.get(pref_lang)
+ if text is not None:
+ return text
+
+ if self._text:
+ text = self._text.get('en')
+ if text is not None:
+ return text
+
+ text = self._text.get(None)
+ if text is not None:
+ return text
+ return self._text.popitem()[1]
+ return ''
class Protocol(Node):
@@ -1734,6 +1761,99 @@ class Hashes2(Node):
self.setData(hash_)
+class BindRequest(Iq):
+ def __init__(self, resource):
+ if resource is not None:
+ resource = Node('resource', payload=resource)
+ Iq.__init__(self, typ='set')
+ self.addChild(node=Node('bind', {'xmlns': NS_BIND}, payload=resource))
+
+
+class TLSRequest(Node):
+ def __init__(self):
+ Node.__init__(self, tag='starttls', attrs={'xmlns': NS_TLS})
+
+
+class SessionRequest(Iq):
+ def __init__(self):
+ Iq.__init__(self, typ='set')
+ self.addChild(node=Node('session', attrs={'xmlns': NS_SESSION}))
+
+
+class StreamHeader(Node):
+ def __init__(self, domain, lang=None):
+ if lang is None:
+ lang = 'en'
+ Node.__init__(self,
+ tag='stream:stream',
+ attrs={'xmlns': NS_CLIENT,
+ 'version': '1.0',
+ 'xmlns:stream': NS_STREAMS,
+ 'to': domain,
+ 'xml:lang': lang})
+
+
+class WebsocketOpenHeader(Node):
+ def __init__(self, domain, lang=None):
+ if lang is None:
+ lang = 'en'
+ Node.__init__(self,
+ tag='open',
+ attrs={'xmlns': NS_FRAMING,
+ 'version': '1.0',
+ 'to': domain,
+ 'xml:lang': lang})
+
+class WebsocketCloseHeader(Node):
+ def __init__(self):
+ Node.__init__(self, tag='close', attrs={'xmlns': NS_FRAMING})
+
+
+class Features(Node):
+ def __init__(self, node):
+ Node.__init__(self, node=node)
+
+ def has_starttls(self):
+ tls = self.getTag('starttls', namespace=NS_TLS)
+ if tls is not None:
+ required = tls.getTag('required') is not None
+ return True, required
+ return False, False
+
+ def has_sasl(self):
+ return self.getTag('mechanisms', namespace=NS_SASL) is not None
+
+ def get_mechs(self):
+ mechanisms = self.getTag('mechanisms', namespace=NS_SASL)
+ mechanisms = mechanisms.getTags('mechanism')
+ return set(mech.getData() for mech in mechanisms)
+
+ def get_domain_based_name(self):
+ hostname = self.getTag('hostname', namespace=NS_DOMAIN_BASED_NAME)
+ if hostname is not None:
+ return hostname.getData()
+
+ def has_bind(self):
+ return self.getTag('bind', namespace=NS_BIND) is not None
+
+ def session_required(self):
+ session = self.getTag('session', namespace=NS_SESSION)
+ if session is not None:
+ optional = session.getTag('optional') is not None
+ return not optional
+ return False
+
+ def has_sm(self):
+ return self.getTag('sm', namespace=NS_STREAM_MGMT) is not None
+
+ def has_roster_version(self):
+ return self.getTag('ver', namespace=NS_ROSTER_VER) is not None
+
+ def has_register(self):
+ return self.getTag(
+ 'register', namespace=NS_REGISTER_FEATURE) is not None
+
+
class ErrorNode(Node):
"""
XMPP-style error element
diff --git a/nbxmpp/resolver.py b/nbxmpp/resolver.py
new file mode 100644
index 0000000..0f43578
--- /dev/null
+++ b/nbxmpp/resolver.py
@@ -0,0 +1,150 @@
+# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com>
+#
+# This file is part of nbxmpp.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 3
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+
+from gi.repository import Gio
+from gi.repository import GLib
+
+
+log = logging.getLogger('nbxmpp.resolver')
+
+
+class DNSResolveRequest:
+ def __init__(self, cache, domain, callback):
+ self._domain = domain
+ self._result = self._lookup_cache(cache)
+ self._callback = callback
+
+ @property
+ def result(self):
+ return self._result
+
+ @result.setter
+ def result(self, value):
+ self._result = value
+
+ @property
+ def is_cached(self):
+ return self.result is not None
+
+ def _lookup_cache(self, cache):
+ cached_request = cache.get(self)
+ if cached_request is None:
+ return None
+ return cached_request.result
+
+ def finalize(self):
+ GLib.idle_add(self._callback, self.result)
+ self._callback = None
+
+ def __hash__(self):
+ raise NotImplementedError
+
+ def __eq__(self, other):
+ return hash(other) == hash(self)
+
+
+class AlternativeMethods(DNSResolveRequest):
+ def __init__(self, *args, **kwargs):
+ DNSResolveRequest.__init__(self, *args, **kwargs)
+
+ @property
+ def hostname(self):
+ return '_xmppconnect.%s' % self._domain
+
+ def __hash__(self):
+ return hash(self.hostname)
+
+
+class Singleton(type):
+ _instances = {}
+ def __call__(cls, *args, **kwargs):
+ if cls not in cls._instances:
+ cls._instances[cls] = super(Singleton, cls).__call__(*args,
+ **kwargs)
+ return cls._instances[cls]
+
+
+class GioResolver(metaclass=Singleton):
+ def __init__(self):
+ self._cache = {}
+
+ def _cache_request(self, request):
+ self._cache[request] = request
+
+ def resolve_alternatives(self, domain, callback):
+ request = AlternativeMethods(self._cache, domain, callback)
+ if request.is_cached:
+ request.finalize()
+ return
+
+ Gio.Resolver.get_default().lookup_records_async(
+ request.hostname,
+ Gio.ResolverRecordType.TXT,
+ None,
+ self._on_alternatives_result,
+ request)
+
+ def _on_alternatives_result(self, resolver, result, request):
+ try:
+ results = resolver.lookup_records_finish(result)
+ except GLib.Error as error:
+ log.warning(error.message)
+ log.warning(error.code)
+ request.finalize()
+ return
+
+ try:
+ websocket_uri = self._parse_alternative_methods(results)
+ except Exception:
+ log.exception('Failed to parse alternative '
+ 'connection methods: %s', results)
+ request.finalize()
+ return
+
+ request.result = websocket_uri
+ self._cache_request(request)
+ request.finalize()
+
+ @staticmethod
+ def _parse_alternative_methods(variant_results):
+ result_list = [res[0][0] for res in variant_results]
+ for result in result_list:
+ if result.startswith('_xmpp-client-websocket'):
+ return result.split('=')[1]
+
+
+if __name__ == '__main__':
+ import sys
+
+ try:
+ domain_ = sys.argv[1]
+ except Exception:
+ print('Provide domain name as argument')
+ sys.exit()
+
+ # Execute:
+ # > python3 -m nbxmpp.resolver domain
+
+ def on_result(result):
+ print('Result: ', result)
+ mainloop.quit()
+
+ GioResolver().resolve_alternatives(domain_, on_result)
+ mainloop = GLib.MainLoop()
+ mainloop.run()
diff --git a/nbxmpp/simplexml.py b/nbxmpp/simplexml.py
index a560cf4..dd83340 100644
--- a/nbxmpp/simplexml.py
+++ b/nbxmpp/simplexml.py
@@ -531,7 +531,8 @@ class NodeBuilder:
XML handler
"""
- def __init__(self, data=None, initial_node=None):
+ def __init__(self, data=None, initial_node=None,
+ dispatch_depth=1, finished=True):
"""
Take two optional parameters: "data" and "initial_node"
@@ -540,7 +541,6 @@ class NodeBuilder:
about it as of "node upgrade". "data" (if provided) feeded to parser
immidiatedly after instance init.
"""
- log.debug("Preparing to handle incoming XML stream.")
self._parser = xml.parsers.expat.ParserCreate()
self._parser.UseForeignDTD(False)
self._parser.StartElementHandler = self.starttag
@@ -559,16 +559,17 @@ class NodeBuilder:
self.__depth = 0
self.__last_depth = 0
self.__max_depth = 0
- self._dispatch_depth = 1
+ self._dispatch_depth = dispatch_depth
self._document_attrs = None
self._document_nsp = None
- self._mini_dom=initial_node
+ self._mini_dom = initial_node
self.last_is_data = 1
- self._ptr=None
+ self._ptr = None
self.data_buffer = None
self.streamError = ''
+ self._is_stream = not finished
if data:
- self._parser.Parse(data, 1)
+ self._parser.Parse(data, finished)
def check_data_buffer(self):
if self.data_buffer:
@@ -618,7 +619,7 @@ class NodeBuilder:
header = Node(tag=tag, attrs=attrs,
nsp=self._document_nsp, node_built=True)
self.dispatch(header)
- self.stream_header_received(ns, name, attrs)
+ self._check_stream_start(ns, name)
except ValueError as e:
self._document_attrs = None
raise ValueError(str(e))
@@ -626,7 +627,15 @@ class NodeBuilder:
self._ptr.parent.data.append('')
self.last_is_data = 0
- def endtag(self, tag ):
+ def _check_stream_start(self, ns, tag):
+ if self._is_stream:
+ if ns != 'http://etherx.jabber.org/streams' or tag != 'stream':
+ raise ValueError('Incorrect stream start: (%s,%s). Terminating.'
+ % (tag, ns))
+ else:
+ self.stream_header_received()
+
+ def endtag(self, tag):
"""
XML Parser callback. Used internally
"""
@@ -681,7 +690,7 @@ class NodeBuilder:
"""
pass
- def stream_header_received(self, ns, tag, attrs):
+ def stream_header_received(self):
"""
Method called when stream just opened
"""
diff --git a/nbxmpp/smacks.py b/nbxmpp/smacks.py
index bb08622..f1c373a 100644
--- a/nbxmpp/smacks.py
+++ b/nbxmpp/smacks.py
@@ -21,16 +21,13 @@ import logging
from nbxmpp.protocol import NS_STREAM_MGMT
from nbxmpp.protocol import NS_DELAY2
from nbxmpp.simplexml import Node
-from nbxmpp.transports import DISCONNECTING
-from nbxmpp.plugin import PlugIn
-from nbxmpp.const import Realm
-from nbxmpp.const import Event
+from nbxmpp.const import StreamState
log = logging.getLogger('nbxmpp.smacks')
-class Smacks(PlugIn):
+class Smacks:
"""
This is Smacks is the Stream Management class. It takes care of requesting
and sending acks. Also, it keeps track of the unhandled outgoing stanzas.
@@ -39,8 +36,8 @@ class Smacks(PlugIn):
number of handled stanzas
"""
- def __init__(self):
- PlugIn.__init__(self)
+ def __init__(self, client):
+ self._client = client
self._out_h = 0 # Outgoing stanzas handled
self._in_h = 0 # Incoming stanzas handled
self._acked_h = 0 # Last acked stanza
@@ -51,59 +48,54 @@ class Smacks(PlugIn):
# Max number of stanzas in queue before making a request
self.max_queue = 0
+ self._sm_supported = False
self.enabled = False # If SM is enabled
self._enable_sent = False # If we sent 'enable'
self.resumed = False # If the session was resumed
self.resume_in_progress = False
self.resume_supported = False # Does the session support resume
- self._resume_jid = None # The JID from the previous session
self._session_id = None
self._location = None
- def get_resume_data(self):
- if self.resume_supported:
- return {
- 'out': self._out_h,
- 'in': self._in_h,
- 'session_id': self._session_id,
- 'location': self._location,
- 'uqueue': self._uqueue,
- 'bound_jid': self._owner._registered_name
- }
-
- def set_resume_data(self, data):
- if data is None:
+ self.register_handlers()
+
+ @property
+ def sm_supported(self):
+ return self._sm_supported
+
+ @sm_supported.setter
+ def sm_supported(self, value):
+ log.info('Server supports detected: %s', value)
+ self._sm_supported = value
+
+ def delegate(self, stanza):
+ if stanza.getNamespace() != NS_STREAM_MGMT:
return
- log.debug('Resume data set')
- self._out_h = data.get('out')
- self._in_h = data.get('in')
- self._session_id = data.get('session_id')
- self._location = data.get('location')
- self._old_uqueue = data.get('uqueue')
- self._resume_jid = data.get('bound_jid')
- self.resume_supported = True
+ if stanza.getName() == 'resumed':
+ self._on_resumed(stanza)
+ elif stanza.getName() == 'failed':
+ self._on_failed(None, stanza, None)
def register_handlers(self):
- self._owner.Dispatcher.RegisterNamespace(NS_STREAM_MGMT)
- self._owner.Dispatcher.RegisterHandler(
+ self._client.register_handler(
'enabled', self._on_enabled, xmlns=NS_STREAM_MGMT)
- self._owner.Dispatcher.RegisterHandler(
+ self._client.register_handler(
+ 'failed', self._on_failed, xmlns=NS_STREAM_MGMT)
+ self._client.register_handler(
'r', self._send_ack, xmlns=NS_STREAM_MGMT)
- self._owner.Dispatcher.RegisterHandler(
+ self._client.register_handler(
'a', self._on_ack, xmlns=NS_STREAM_MGMT)
- self._owner.Dispatcher.RegisterHandler(
- 'resumed', self._on_resumed, xmlns=NS_STREAM_MGMT)
- self._owner.Dispatcher.RegisterHandler(
- 'failed', self._on_failed, xmlns=NS_STREAM_MGMT)
def send_enable(self):
+ if not self.sm_supported:
+ return
enable = Node(NS_STREAM_MGMT + ' enable', attrs={'resume': 'true'})
- self._owner.Connection.send(enable, now=False)
+ self._client.send_nonza(enable, now=False)
log.debug('Send enable')
self._enable_sent = True
- def _on_enabled(self, _disp, stanza):
+ def _on_enabled(self, _con, stanza, _properties):
if self.enabled:
log.error('Received "enabled", but SM is already enabled')
return
@@ -160,7 +152,7 @@ class Smacks(PlugIn):
log.info('Resend %s stanzas', len(self._old_uqueue))
for stanza in self._old_uqueue:
# Use dispatcher so we increment the counter
- self._owner.Dispatcher.send(stanza)
+ self._client.send_stanza(stanza)
self._old_uqueue = []
def resume_request(self):
@@ -181,9 +173,9 @@ class Smacks(PlugIn):
self._acked_h = self._in_h
self.resume_in_progress = True
- self._owner.Connection.send(resume, now=False)
+ self._client.send_nonza(resume, now=False)
- def _on_resumed(self, _disp, stanza):
+ def _on_resumed(self, stanza):
"""
Checks if the number of stanzas sent are the same as the
number of stanzas received by the server. Resends stanzas not received
@@ -197,29 +189,29 @@ class Smacks(PlugIn):
self.enabled = True
self.resumed = True
self.resume_in_progress = False
- self._owner.set_bound_jid(self._resume_jid)
- self._owner.Dispatcher.Event(Realm.CONNECTING, Event.RESUME_SUCCESSFUL)
+ self._client.set_state(StreamState.RESUME_SUCCESSFUL)
self._resend_queue()
def _send_ack(self, *args):
ack = Node(NS_STREAM_MGMT + ' a', attrs={'h': self._in_h})
self._acked_h = self._in_h
log.debug('Send ack, h: %s', self._in_h)
- self._owner.Connection.send(ack, now=False)
+ self._client.send_nonza(ack, now=False)
- def send_closing_ack(self):
- if self._owner.Connection.get_state() != DISCONNECTING:
- return
- ack = Node(NS_STREAM_MGMT + ' a', attrs={'h': self._in_h})
- log.debug('Send closing ack, h: %s', self._in_h)
- self._owner.Connection.send(ack, now=True)
+ def close_session(self):
+ # We end the connection deliberately
+ # Reset the state -> no resume
+ log.info('Close session')
+ self._reset_state()
def _request_ack(self):
request = Node(NS_STREAM_MGMT + ' r')
log.debug('Request ack')
- self._owner.Connection.send(request, now=False)
+ self._client.send_nonza(request, now=False)
- def _on_ack(self, _disp, stanza):
+ def _on_ack(self, _stream, stanza, _properties):
+ if not self.enabled:
+ return
log.debug('Ack received, h: %s', stanza.getAttr('h'))
self._validate_ack(stanza, self._uqueue)
@@ -250,17 +242,17 @@ class Smacks(PlugIn):
else:
log.debug('Validate ack, our h: %d, server h: %d, queue: %d',
self._out_h, count_server, queue_size)
- log.debug('removing %d stanzas from queue', queue_size - diff)
+ log.debug('Removing %d stanzas from queue', queue_size - diff)
while len(queue) > diff:
queue.pop(0)
- def _on_failed(self, _disp, stanza):
+ def _on_failed(self, _stream, stanza, _properties):
'''
This can be called after 'enable' and 'resume'
'''
- log.info('Stream Management negotiation failed')
+ log.info('Negotiation failed')
error_text = stanza.getTagData('text')
if error_text is not None:
log.info(error_text)
@@ -275,10 +267,12 @@ class Smacks(PlugIn):
log.info(tag.getName())
if self.resume_in_progress:
- self._owner.Dispatcher.Event(Realm.CONNECTING, Event.RESUME_FAILED)
- # We failed while resuming
- self.resume_supported = False
- self._owner.bind()
+ # Reset state before sending Bind, because otherwise stanza
+ # will be counted and ack will be requested.
+ # _reset_state() also resets resume_in_progress
+ self._reset_state()
+ self._client.set_state(StreamState.RESUME_FAILED)
+
self._reset_state()
def _reset_state(self):
diff --git a/nbxmpp/structs.py b/nbxmpp/structs.py
index 63ad950..c35f2e5 100644
--- a/nbxmpp/structs.py
+++ b/nbxmpp/structs.py
@@ -1,10 +1,10 @@
-# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com>
+# Copyright (C) 2018-2020 Philipp Hörist <philipp AT hoerist.com>
#
# This file is part of nbxmpp.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
+# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
@@ -19,8 +19,12 @@ import time
import random
from collections import namedtuple
+from gi.repository import Soup
+from gi.repository import Gio
+
from nbxmpp.protocol import JID
from nbxmpp.protocol import NS_STANZAS
+from nbxmpp.protocol import NS_STREAMS
from nbxmpp.protocol import NS_MAM_1
from nbxmpp.protocol import NS_MAM_2
from nbxmpp.protocol import NS_MUC
@@ -37,8 +41,8 @@ from nbxmpp.const import LOCATION_DATA
from nbxmpp.const import AdHocStatus
StanzaHandler = namedtuple('StanzaHandler',
- 'name callback typ ns xmlns system priority')
-StanzaHandler.__new__.__defaults__ = ('', '', None, False, 50)
+ 'name callback typ ns xmlns priority')
+StanzaHandler.__new__.__defaults__ = ('', '', None, 50)
CommonResult = namedtuple('CommonResult', 'jid')
CommonResult.__new__.__defaults__ = (None,)
@@ -387,6 +391,23 @@ class AdHocCommand(namedtuple('AdHocCommand', 'jid node name sessionid status da
return self.status == AdHocStatus.CANCELED
+class ProxyData(namedtuple('ProxyData', 'type host username password')):
+
+ __slots__ = []
+
+ def get_uri(self):
+ if self.username is not None:
+ user_pass = Soup.uri_encode('%s:%s' % (self.username,
+ self.password))
+ return '%s://%s@%s' % (self.type,
+ user_pass,
+ self.host)
+ return '%s://%s' % (self.type, self.host)
+
+ def get_resolver(self):
+ return Gio.SimpleProxyResolver.new(self.get_uri(), None)
+
+
class OMEMOBundle(namedtuple('OMEMOBundle', 'spk spk_signature ik otpks')):
def pick_prekey(self):
return random.SystemRandom().choice(self.otpks)
@@ -512,6 +533,52 @@ class StanzaMalformedError(CommonError):
raise NotImplementedError
+class StanzaTimeoutError(CommonError):
+ def __init__(self, id_):
+ self.condition = 'timeout'
+ self.id = id_
+
+ @classmethod
+ def from_string(cls, node_string):
+ raise NotImplementedError
+
+ def __str__(self):
+ return 'IQ with id %s reached timeout' % self.id
+
+ def serialize(self):
+ raise NotImplementedError
+
+
+class StreamError(CommonError):
+ def __init__(self, stanza):
+ self.condition = stanza.getError()
+ self.condition_data = self._error_node.getTagData(self.condition)
+ self.app_condition = stanza.getAppError()
+ self.type = stanza.getErrorType()
+ self.jid = stanza.getFrom()
+ self.id = stanza.getID()
+ self._text = {}
+
+ text_elements = self._error_node.getTags('text', namespace=NS_STREAMS)
+ for element in text_elements:
+ lang = element.getXmlLang()
+ text = element.getData()
+ self._text[lang] = text
+
+ @classmethod
+ def from_string(cls, node_string):
+ raise NotImplementedError
+
+ def __str__(self):
+ text = self.get_text('en') or ''
+ if text:
+ text = ' - %s' % text
+ return 'Error from %s: %s%s' % (self.jid, self.condition, text)
+
+ def serialize(self):
+ raise NotImplementedError
+
+
class TuneData(namedtuple('TuneData', 'artist length rating source title track uri')):
__slots__ = []
diff --git a/nbxmpp/tcp.py b/nbxmpp/tcp.py
new file mode 100644
index 0000000..fca36eb
--- /dev/null
+++ b/nbxmpp/tcp.py
@@ -0,0 +1,335 @@
+# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com>
+#
+# This file is part of nbxmpp.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 3
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+import struct
+from collections import deque
+
+from gi.repository import GLib
+from gi.repository import Gio
+from gi.repository import GObject
+
+from nbxmpp.const import TCPState
+from nbxmpp.const import ConnectionType
+from nbxmpp.util import utf8_decode
+from nbxmpp.util import convert_tls_error_flags
+from nbxmpp.connection import Connection
+
+log = logging.getLogger('nbxmpp.tcp')
+
+READ_BUFFER_SIZE = 8192
+
+
+class TCPConnection(Connection):
+ def __init__(self, *args, **kwargs):
+ Connection.__init__(self, *args, **kwargs)
+
+ self._client = Gio.SocketClient.new()
+
+ if self._address.proxy is not None:
+ self._proxy_resolver = self._address.proxy.get_resolver()
+ self._client.set_proxy_resolver(self._proxy_resolver)
+
+ GObject.Object.connect(self._client, 'event', self._on_event)
+
+ self._con = None
+
+ self._read_buffer = b''
+
+ self._write_queue = deque([])
+ self._write_stanza_buffer = None
+
+ self._connect_cancellable = Gio.Cancellable()
+ self._read_cancellable = Gio.Cancellable()
+
+ self._input_closed = False
+ self._output_closed = False
+
+ self._keepalive_id = None
+
+ def connect(self):
+ self.state = TCPState.CONNECTING
+
+ if self._address.is_service:
+ self._client.connect_to_service_async(self._address.domain,
+ self._address.service,
+ self._connect_cancellable,
+ self._on_connect_finished,
+ None)
+ elif self._address.is_host:
+ self._client.connect_to_host_async(self._address.host,
+ 0,
+ self._connect_cancellable,
+ self._on_connect_finished,
+ None)
+
+ else:
+ raise ValueError('Invalid Address')
+
+ def _on_event(self, _socket_client, event, _connectable, connection):
+ if event == Gio.SocketClientEvent.CONNECTING:
+ remote_address = connection.get_remote_address()
+ use_proxy = self._address.proxy is not None
+ target = 'proxy' if use_proxy else self._address.domain
+ log.info('Connecting to %s (%s)',
+ target,
+ remote_address.to_string())
+
+ def _check_certificate(self, _connection, certificate, errors):
+ self._peer_certificate = certificate
+ self._peer_certificate_errors = convert_tls_error_flags(errors)
+
+ if self._accept_certificate():
+ return True
+
+ self.notify('bad-certificate')
+ return False
+
+ def _on_certificate_set(self, connection, _param):
+ self._peer_certificate = connection.props.peer_certificate
+ self._peer_certificate_errors = convert_tls_error_flags(
+ connection.props.peer_certificate_errors)
+ self.notify('certificate-set')
+
+ def _on_connect_finished(self, client, result, _user_data):
+ try:
+ if self._address.proxy is not None:
+ self._con = client.connect_to_host_finish(result)
+ elif self._address.is_service:
+ self._con = client.connect_to_service_finish(result)
+ elif self._address.is_host:
+ self._con = client.connect_to_host_finish(result)
+ else:
+ raise ValueError('Address must be a service or host')
+ except GLib.Error as error:
+ log.error('Connect Error: %s', error)
+ self._finalize('connection-failed')
+ return
+
+ self._con.set_graceful_disconnect(True)
+ self._con.get_socket().set_keepalive(True)
+
+ self.state = TCPState.CONNECTED
+
+ use_proxy = self._address.proxy is not None
+ target = 'proxy' if use_proxy else self._address.domain
+ log.info('Connected to %s (%s)',
+ target,
+ self._con.get_remote_address().to_string())
+
+ self._on_connected()
+
+ def _on_connected(self):
+ self.notify('connected')
+ self._read_async()
+
+ def _remove_keepalive_timer(self):
+ if self._keepalive_id is not None:
+ GLib.source_remove(self._keepalive_id)
+ self._keepalive_id = None
+
+ def _renew_keepalive_timer(self):
+ self._remove_keepalive_timer()
+ self._keepalive_id = GLib.timeout_add_seconds(5, self._send_keepalive)
+
+ def _send_keepalive(self):
+ log.info('Send keepalive')
+ self._keepalive_id = None
+ if not self._con.get_output_stream().has_pending():
+ self._write_all_async(' '.encode())
+
+ def start_tls_negotiation(self):
+ log.info('Start TLS negotiation')
+ remote_address = self._con.get_remote_address()
+ identity = Gio.NetworkAddress.new(self._address.domain,
+ remote_address.props.port)
+
+ tls_client = Gio.TlsClientConnection.new(self._con, identity)
+
+ if self._address.type == ConnectionType.DIRECT_TLS:
+ tls_client.set_advertised_protocols(['xmpp-client'])
+ tls_client.set_rehandshake_mode(Gio.TlsRehandshakeMode.NEVER)
+ tls_client.set_validation_flags(Gio.TlsCertificateFlags.VALIDATE_ALL)
+ tls_client.connect('accept-certificate', self._check_certificate)
+ tls_client.connect('notify::peer-certificate', self._on_certificate_set)
+
+ # This Wraps the Gio.TlsClientConnection and the Gio.Socket together
+ # so we get back a Gio.SocketConnection
+ self._con = Gio.TcpWrapperConnection.new(tls_client,
+ self._con.get_socket())
+
+ def _read_async(self):
+ if self._input_closed:
+ return
+
+ self._con.get_input_stream().read_bytes_async(
+ READ_BUFFER_SIZE,
+ GLib.PRIORITY_DEFAULT,
+ self._read_cancellable,
+ self._on_read_async_finish,
+ None)
+
+ def _on_read_async_finish(self, stream, result, _user_data):
+ try:
+ data = stream.read_bytes_finish(result)
+ except GLib.Error as error:
+ quark = GLib.quark_try_string('g-io-error-quark')
+ if error.matches(quark, Gio.IOErrorEnum.CANCELLED):
+ if self._input_closed:
+ return
+
+ quark = GLib.quark_try_string('g-tls-error-quark')
+ if error.matches(quark, Gio.TlsError.EOF):
+ log.info('Incoming stream closed: TLS EOF')
+ self._finalize('disconnected')
+ return
+
+ if error.matches(quark, Gio.TlsError.BAD_CERTIFICATE):
+ log.info('Certificate Error: %s', error)
+ self._finalize('disconnected')
+ return
+
+ log.error('Read Error: %s', error)
+ return
+
+ data = data.get_data()
+ if not data:
+ if self._state == TCPState.DISCONNECTING:
+ log.info('Reveived zero data on _read_async()')
+ self._finalize('disconnected')
+ else:
+ log.warning('Reveived zero data on _read_async()')
+ return
+
+ self._renew_keepalive_timer()
+
+ self._read_buffer += data
+ data, self._read_buffer = utf8_decode(self._read_buffer)
+
+ self._log_stanza(data, received=True)
+ self.notify('data-received', data)
+
+ self._read_async()
+
+ def _write_stanzas(self):
+ self._write_stanza_buffer = self._write_queue
+ self._write_queue = deque([])
+ data = ''.join(map(str, self._write_stanza_buffer)).encode()
+ self._write_all_async(data)
+
+ def _write_all_async(self, data):
+ # We have to pass data to the callback, because GLib takes no
+ # reference on the passed data and python would gc collect it
+ # bevor GLib has a chance to write it to the stream
+ self._con.get_output_stream().write_all_async(
+ data,
+ GLib.PRIORITY_DEFAULT,
+ None,
+ self._on_write_all_async_finished,
+ data)
+
+ def _on_write_all_async_finished(self, stream, result, data):
+ try:
+ stream.write_all_finish(result)
+ except GLib.Error as error:
+ quark = GLib.quark_try_string('g-tls-error-quark')
+ if error.matches(quark, Gio.TlsError.BAD_CERTIFICATE):
+ self._write_stanza_buffer = None
+ return
+
+ log.error('Write Error: %s', error)
+ return
+
+ self._renew_keepalive_timer()
+
+ data = data.decode()
+ if data == ' ':
+ # keepalive whitespace
+ return
+
+ for stanza in self._write_stanza_buffer:
+ self._log_stanza(stanza, received=False)
+ self._write_stanza_buffer = None
+
+ self.notify('data-sent', data)
+
+ if self._output_closed and not self._write_queue:
+ self._check_for_shutdown()
+ return
+
+ if self._write_queue:
+ self._write_stanzas()
+
+ def send(self, stanza, now=False):
+ if self._state in (TCPState.DISCONNECTED, TCPState.DISCONNECTING):
+ log.warning('send() not possible in state: %s', self._state)
+ return
+
+ if now:
+ self._write_queue.appendleft(stanza)
+ else:
+ self._write_queue.append(stanza)
+
+ if not self._con.get_output_stream().has_pending():
+ self._write_stanzas()
+
+ def disconnect(self):
+ self._remove_keepalive_timer()
+ if self.state == TCPState.CONNECTING:
+ self.state = TCPState.DISCONNECTING
+ self._connect_cancellable.cancel()
+ return
+
+ if self._state in (TCPState.DISCONNECTED, TCPState.DISCONNECTING):
+ log.warning('Called disconnect on state: %s', self._state)
+ return
+
+ self.state = TCPState.DISCONNECTING
+ self._finalize('disconnected')
+
+ def _check_for_shutdown(self):
+ if self._input_closed and self._output_closed:
+ self._finalize('disconnected')
+
+ def shutdown_input(self):
+ self._remove_keepalive_timer()
+ log.info('Shutdown input')
+ self._input_closed = True
+ self._read_cancellable.cancel()
+ self._check_for_shutdown()
+
+ def shutdown_output(self):
+ self._remove_keepalive_timer()
+ self.state = TCPState.DISCONNECTING
+ log.info('Shutdown output')
+ self._output_closed = True
+
+ def _finalize(self, signal_name):
+ self._remove_keepalive_timer()
+ if self._con is not None:
+ try:
+ self._con.get_socket().shutdown(True, True)
+ except GLib.Error as error:
+ log.info(error)
+ self.state = TCPState.DISCONNECTED
+ self.notify(signal_name)
+ self.destroy()
+
+ def destroy(self):
+ super().destroy()
+ self._con = None
+ self._client = None
diff --git a/nbxmpp/util.py b/nbxmpp/util.py
index 47f1d86..f6b2d0f 100644
--- a/nbxmpp/util.py
+++ b/nbxmpp/util.py
@@ -20,15 +20,30 @@ import base64
import weakref
import hashlib
import uuid
+import binascii
+import ipaddress
+import os
+import re
+from collections import defaultdict
+
from functools import wraps
from functools import lru_cache
import precis_i18n.codec
+from gi.repository import Gio
from nbxmpp.protocol import DiscoInfoMalformed
from nbxmpp.protocol import isErrorNode
from nbxmpp.protocol import NS_DATA
from nbxmpp.protocol import NS_HTTPUPLOAD_0
+from nbxmpp.const import GIO_TLS_ERRORS
+from nbxmpp.const import StreamState
+from nbxmpp.protocol import NS_STREAMS
+from nbxmpp.protocol import NS_CLIENT
+from nbxmpp.protocol import NS_FRAMING
+from nbxmpp.protocol import StanzaMalformed
+from nbxmpp.protocol import StreamHeader
+from nbxmpp.protocol import WebsocketOpenHeader
from nbxmpp.structs import Properties
from nbxmpp.structs import IqProperties
from nbxmpp.structs import MessageProperties
@@ -93,9 +108,9 @@ def call_on_response(cb):
if callback_ is not None:
attrs['callback'] = weakref.WeakMethod(callback_)
- self._client.SendAndCallForResponse(stanza,
- getattr(self, cb),
- attrs)
+ self._client.send_stanza(stanza,
+ callback=getattr(self, cb),
+ user_data=attrs)
return func_wrapper
return response_decorator
@@ -343,3 +358,138 @@ def get_form(stanza, form_type):
if field.value == form_type:
return form
return None
+
+
+def validate_stream_header(stanza, domain, is_websocket):
+ attrs = stanza.getAttrs()
+ if attrs.get('from') != domain:
+ raise StanzaMalformed('Invalid from attr in stream header')
+
+ if is_websocket:
+ if attrs.get('xmlns') != NS_FRAMING:
+ raise StanzaMalformed('Invalid namespace in stream header')
+ else:
+ if attrs.get('xmlns:stream') != NS_STREAMS:
+ raise StanzaMalformed('Invalid stream namespace in stream header')
+ if attrs.get('xmlns') != NS_CLIENT:
+ raise StanzaMalformed('Invalid namespace in stream header')
+
+ if attrs.get('version') != '1.0':
+ raise StanzaMalformed('Invalid stream version in stream header')
+ stream_id = attrs.get('id')
+ if stream_id is None:
+ raise StanzaMalformed('No stream id found in stream header')
+ return stream_id
+
+
+def get_stream_header(domain, lang, is_websocket):
+ if is_websocket:
+ return WebsocketOpenHeader(domain, lang)
+ header = StreamHeader(domain, lang)
+ return "<?xml version='1.0'?>%s>" % str(header)[:-3]
+
+
+def get_stanza_id():
+ return str(uuid.uuid4())
+
+
+def utf8_decode(data):
+ '''
+ Decodes utf8 byte string to unicode string
+ Does handle invalid utf8 sequences by splitting
+ the invalid sequence at the end
+
+ returns (decoded unicode string, invalid byte sequence)
+ '''
+ try:
+ return data.decode(), b''
+ except UnicodeDecodeError:
+ for i in range(-1, -4, -1):
+ char = data[i]
+ if char & 0xc0 == 0x80:
+ continue
+ return data[:i].decode(), data[i:]
+ raise
+
+
+def get_rand_number():
+ return int(binascii.hexlify(os.urandom(6)), 16)
+
+
+def get_invalid_xml_regex():
+ # \ufddo -> \ufdef range
+ c = '\ufdd0'
+ r = c
+ while c < '\ufdef':
+ c = chr(ord(c) + 1)
+ r += '|' + c
+
+ # \ufffe-\uffff, \u1fffe-\u1ffff, ..., \u10fffe-\u10ffff
+ c = '\ufffe'
+ r += '|' + c
+ r += '|' + chr(ord(c) + 1)
+ while c < '\U0010fffe':
+ c = chr(ord(c) + 0x10000)
+ r += '|' + c
+ r += '|' + chr(ord(c) + 1)
+
+ return re.compile(r)
+
+
+def get_tls_error_phrase(tls_error):
+ phrase = GIO_TLS_ERRORS.get(tls_error)
+ if phrase is None:
+ return GIO_TLS_ERRORS.get(Gio.TlsCertificateFlags.GENERIC_ERROR)
+ return phrase
+
+
+def convert_tls_error_flags(flags):
+ if not flags:
+ return set()
+
+ # If GLib ever adds more flags GIO_TLS_ERRORS have to
+ # be extended, otherwise errors go unnoticed
+ if Gio.TlsCertificateFlags.VALIDATE_ALL != 127:
+ raise ValueError
+
+ return set(filter(lambda error: error & flags, GIO_TLS_ERRORS.keys()))
+
+
+def get_websocket_close_string(websocket):
+ data = websocket.get_close_data()
+ code = websocket.get_close_code()
+
+ if code is None and data is None:
+ return ''
+ return ' Data: %s Code: %s' % (data, code)
+
+
+def is_websocket_close(stanza):
+ return stanza.getName() == 'close' and stanza.getNamespace() == NS_FRAMING
+
+
+def is_websocket_stream_error(stanza):
+ return stanza.getName() == 'error' and stanza.getNamespace() == NS_STREAMS
+
+
+class Observable:
+ def __init__(self, log_):
+ self._log = log_
+ self._frozen = False
+ self._callbacks = defaultdict(list)
+
+ def remove_subscriptions(self):
+ self._callbacks = defaultdict(list)
+
+ def subscribe(self, signal_name, func):
+ self._callbacks[signal_name].append(func)
+
+ def notify(self, signal_name, *args, **kwargs):
+ if self._frozen:
+ self._frozen = False
+ return
+
+ self._log.info('Signal: %s', signal_name)
+ callbacks = self._callbacks.get(signal_name, [])
+ for func in callbacks:
+ func(self, signal_name, *args, **kwargs)
diff --git a/nbxmpp/websocket.py b/nbxmpp/websocket.py
new file mode 100644
index 0000000..8e34ddf
--- /dev/null
+++ b/nbxmpp/websocket.py
@@ -0,0 +1,182 @@
+# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com>
+#
+# This file is part of nbxmpp.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 3
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+
+from gi.repository import Soup
+from gi.repository import GLib
+from gi.repository import Gio
+
+from nbxmpp.const import TCPState
+from nbxmpp.const import ConnectionType
+from nbxmpp.util import get_websocket_close_string
+from nbxmpp.util import convert_tls_error_flags
+from nbxmpp.connection import Connection
+
+log = logging.getLogger('nbxmpp.websocket')
+
+
+class WebsocketConnection(Connection):
+ def __init__(self, *args, **kwargs):
+ Connection.__init__(self, *args, **kwargs)
+
+ self._session = Soup.Session()
+ self._session.props.ssl_strict = False
+
+ if log.getEffectiveLevel() == logging.INFO:
+ self._session.add_feature(
+ Soup.Logger.new(Soup.LoggerLogLevel.BODY, -1))
+
+ self._websocket = None
+ self._cancellable = Gio.Cancellable()
+
+ self._input_closed = False
+ self._output_closed = False
+
+ def connect(self):
+ log.info('Try to connect to %s', self._address.uri)
+
+ self.state = TCPState.CONNECTING
+
+ message = Soup.Message.new('GET', self._address.uri)
+ message.connect('starting', self._check_certificate)
+ message.set_flags(Soup.MessageFlags.NO_REDIRECT)
+ self._session.websocket_connect_async(message,
+ None,
+ ['xmpp'],
+ self._cancellable,
+ self._on_connect,
+ None)
+
+ def _on_connect(self, session, result, _user_data):
+ # TODO: check if protocol 'xmpp' is set
+ try:
+ self._websocket = session.websocket_connect_finish(result)
+ except GLib.Error as error:
+ quark = GLib.quark_try_string('g-io-error-quark')
+ if error.matches(quark, Gio.IOErrorEnum.CANCELLED):
+ self._finalize('disconnected')
+ return
+
+ log.error('Connection Error: %s', error)
+ self._finalize('connection-failed')
+ return
+
+ self._websocket.set_keepalive_interval(5)
+ self._websocket.connect('message', self._on_websocket_message)
+ self._websocket.connect('closed', self._on_websocket_closed)
+ self._websocket.connect('closing', self._on_websocket_closing)
+ self._websocket.connect('error', self._on_websocket_error)
+ self._websocket.connect('pong', self._on_websocket_pong)
+
+ self.state = TCPState.CONNECTED
+ self.notify('connected')
+
+ def start_tls_negotiation(self):
+ # Soup.Session does this automatically
+ raise NotImplementedError
+
+ def _check_certificate(self, message):
+ https_used, certificate, errors = message.get_https_status()
+ if not https_used and self._address.type == ConnectionType.PLAIN:
+ return
+
+ self._peer_certificate = certificate
+ self._peer_certificate_errors = convert_tls_error_flags(errors)
+
+ self.notify('certificate-set')
+
+ if self._accept_certificate():
+ return
+
+ self.notify('bad-certificate')
+ self._cancellable.cancel()
+
+ def _on_websocket_message(self, _websocket, _type, message):
+ data = message.get_data().decode()
+ self._log_stanza(data)
+
+ if self._input_closed:
+ log.warning('Received data after stream closed')
+ return
+
+ self.notify('data-received', data)
+
+ @staticmethod
+ def _on_websocket_pong(_websocket, _message):
+ log.info('Pong received')
+
+ def _on_websocket_closed(self, websocket):
+ log.info('Closed %s', get_websocket_close_string(websocket))
+ self._finalize('disconnected')
+
+ @staticmethod
+ def _on_websocket_closing(_websocket):
+ log.info('Closing')
+
+ def _on_websocket_error(self, websocket, error):
+ log.error(error)
+ if self._state not in (TCPState.DISCONNECTED, TCPState.DISCONNECTING):
+ self._finalize('disconnected')
+
+ def send(self, stanza, now=False):
+ if self._state in (TCPState.DISCONNECTED, TCPState.DISCONNECTING):
+ log.warning('send() not possible in state: %s', self._state)
+ return
+
+ data = str(stanza)
+ self._websocket.send_text(data)
+ self._log_stanza(data, received=False)
+ self.notify('data-sent', data)
+
+ def disconnect(self):
+ if self._state == TCPState.CONNECTING:
+ self.state = TCPState.DISCONNECTING
+ self._cancellable.cancel()
+ return
+
+ if self._state in (TCPState.DISCONNECTED, TCPState.DISCONNECTING):
+ log.warning('Called disconnect on state: %s', self._state)
+ return
+
+ self._websocket.close(Soup.WebsocketCloseCode.NORMAL, None)
+ self.state = TCPState.DISCONNECTING
+
+ def _check_for_shutdown(self):
+ if self._input_closed and self._output_closed:
+ self._websocket.close(Soup.WebsocketCloseCode.NORMAL, None)
+
+ def shutdown_input(self):
+ log.info('Shutdown input')
+ self._input_closed = True
+ self._check_for_shutdown()
+
+ def shutdown_output(self):
+ self.state = TCPState.DISCONNECTING
+ log.info('Shutdown output')
+ self._output_closed = True
+
+ def _finalize(self, signal_name):
+ self.state = TCPState.DISCONNECTED
+ self.notify(signal_name)
+ self.destroy()
+
+ def destroy(self):
+ super().destroy()
+ self._session.abort()
+ self._session = None
+ self._websocket = None