diff options
52 files changed, 4310 insertions, 896 deletions
diff --git a/nbxmpp/__init__.py b/nbxmpp/__init__.py index f2cbf11..18e8461 100644 --- a/nbxmpp/__init__.py +++ b/nbxmpp/__init__.py @@ -1,18 +1,3 @@ -""" -This is a fork of the xmpppy jabber python library. Most of the code is -inherited but has been extended by implementation of non-blocking transports -and new features like BOSH. - -Most of the xmpp classes are ancestors of PlugIn class to share a single set of methods in order to compile a featured and extensible XMPP client. - -Thanks and credits to the xmpppy developers. See: http://xmpppy.sourceforge.net/ -""" - from .protocol import * -from . import simplexml, protocol, auth, transports, roster -from . import dispatcher, features, idlequeue, bosh, tls, proxy_connectors -from .client import NonBlockingClient -from .plugin import PlugIn -from .smacks import Smacks __version__ = "0.9.93" diff --git a/nbxmpp/addresses.py b/nbxmpp/addresses.py new file mode 100644 index 0000000..7131778 --- /dev/null +++ b/nbxmpp/addresses.py @@ -0,0 +1,233 @@ +# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com> +# +# This file is part of nbxmpp. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 3 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; If not, see <http://www.gnu.org/licenses/>. + +import logging +from collections import namedtuple + +from nbxmpp.util import Observable +from nbxmpp.resolver import GioResolver +from nbxmpp.const import ConnectionType +from nbxmpp.const import ConnectionProtocol + + +log = logging.getLogger('nbxmpp.addresses') + + +class ServerAddress(namedtuple('ServerAddress', 'domain service host uri ' + 'protocol type proxy')): + + __slots__ = [] + + @property + def is_service(self): + return self.service is not None + + @property + def is_host(self): + return self.host is not None + + @property + def is_uri(self): + return self.uri is not None + + def has_proxy(self): + return self.proxy is not None + + +class ServerAddresses(Observable): + ''' + Signals: + + resolved + + ''' + + def __init__(self, domain): + Observable.__init__(self, log) + + self._domain = domain + self._custom_host = None + self._proxy = None + self._is_resolved = False + + self._addresses = [ + ServerAddress(domain=self._domain, + service='xmpps-client', + host=None, + uri=None, + protocol=ConnectionProtocol.TCP, + type=ConnectionType.DIRECT_TLS, + proxy=None), + + ServerAddress(domain=self._domain, + service='xmpp-client', + host=None, + uri=None, + protocol=ConnectionProtocol.TCP, + type=ConnectionType.START_TLS, + proxy=None), + + ServerAddress(domain=self._domain, + service='xmpp-client', + host=None, + uri=None, + protocol=ConnectionProtocol.TCP, + type=ConnectionType.PLAIN, + proxy=None) + ] + + self._fallback_addresses = [ + ServerAddress(domain=self._domain, + service=None, + host='%s:%s' % (self._domain, 5222), + uri=None, + protocol=ConnectionProtocol.TCP, + type=ConnectionType.START_TLS, + proxy=None), + + ServerAddress(domain=self._domain, + service=None, + host='%s:%s' % (self._domain, 5222), + uri=None, + protocol=ConnectionProtocol.TCP, + type=ConnectionType.PLAIN, + proxy=None) + ] + + @property + def domain(self): + return self._domain + + @property + def is_resolved(self): + return self._is_resolved + + def resolve(self): + if self._is_resolved: + self._on_request_resolved() + return + + if self._proxy is not None: + # Let the proxy resolve the domain + self._on_request_resolved() + return + + if self._custom_host is not None: + self._on_request_resolved() + return + + GioResolver().resolve_alternatives(self._domain, + self._on_alternatives_result) + + def cancel_resolve(self): + self.remove_subscriptions() + + def set_custom_host(self, address): + # Set a custom host, overwrites all other addresses + self._custom_host = address + if address is None: + return + + host, protocol, type_ = address + + self._fallback_addresses = [] + self._addresses = [ + ServerAddress(domain=self._domain, + service=None, + host=host, + uri=None, + protocol=protocol, + type=type_, + proxy=None)] + + def set_proxy(self, proxy): + self._proxy = proxy + + def _on_alternatives_result(self, uri): + if uri is None: + self._on_request_resolved() + return + + if uri.startswith('wss'): + type_ = ConnectionType.DIRECT_TLS + elif uri.startswith('ws'): + type_ = ConnectionType.PLAIN + else: + log.warning('Invalid websocket uri: %s', uri) + return + + addr = ServerAddress(domain=self._domain, + service=None, + host=None, + uri=uri, + protocol=ConnectionProtocol.WEBSOCKET, + type=type_, + proxy=None) + self._addresses.append(addr) + + self._on_request_resolved() + + def _on_request_resolved(self): + self._is_resolved = True + self.notify('resolved') + self.remove_subscriptions() + + def get_next_address(self, + allowed_types, + allowed_protocols): + ''' + Selects next address + ''' + + for addr in self._filter_allowed(self._addresses, + allowed_types, + allowed_protocols): + yield self._assure_proxy(addr) + + for addr in self._filter_allowed(self._fallback_addresses, + allowed_types, + allowed_protocols): + yield self._assure_proxy(addr) + + raise NoMoreAddresses + + def _assure_proxy(self, addr): + if self._proxy is None: + return addr + + if addr.protocol == ConnectionProtocol.TCP: + return addr._replace(proxy=self._proxy) + + return addr + + def _filter_allowed(self, addresses, allowed_types, allowed_protocols): + if self._proxy is not None: + addresses = filter(lambda addr: addr.host is not None, addresses) + + addresses = filter(lambda addr: addr.type in allowed_types, + addresses) + addresses = filter(lambda addr: addr.protocol in allowed_protocols, + addresses) + return addresses + + def __str__(self): + addresses = self._addresses + self._fallback_addresses + return '\n'.join([str(addr) for addr in addresses]) + + +class NoMoreAddresses(Exception): + pass diff --git a/nbxmpp/auth.py b/nbxmpp/auth.py index b46daf8..e279c41 100644 --- a/nbxmpp/auth.py +++ b/nbxmpp/auth.py @@ -1,10 +1,10 @@ -# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com> +# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com> # # This file is part of nbxmpp. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 +# as published by the Free Software Foundation; either version 3 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, @@ -21,18 +21,15 @@ import binascii import logging import hashlib from hashlib import pbkdf2_hmac -from functools import partial -from nbxmpp.plugin import PlugIn from nbxmpp.protocol import NS_SASL from nbxmpp.protocol import Node -from nbxmpp.protocol import NodeProcessed from nbxmpp.protocol import SASL_ERROR_CONDITIONS from nbxmpp.protocol import SASL_AUTH_MECHS -from nbxmpp.protocol import NS_DOMAIN_BASED_NAME from nbxmpp.util import b64decode from nbxmpp.util import b64encode from nbxmpp.const import GSSAPIState +from nbxmpp.const import StreamState log = logging.getLogger('nbxmpp.auth') @@ -44,174 +41,124 @@ except ImportError: KERBEROS_AVAILABLE = False -class SASL(PlugIn): +class SASL: """ - Implements SASL authentication. Can be plugged into NonBlockingClient - to start authentication + Implements SASL authentication. """ - - _default_mechs = set(['SCRAM-SHA-256-PLUS', - 'SCRAM-SHA-256', - 'SCRAM-SHA-1-PLUS', - 'SCRAM-SHA-1', - 'PLAIN']) - - def __init__(self, username, auth_mechs, get_password, on_finished): + def __init__(self, client): """ - :param username: XMPP username - :param auth_mechs: Set of valid authentication mechanisms. - Possible entries are: - 'ANONYMOUS', 'EXTERNAL', 'GSSAPI', 'SCRAM-SHA-1-PLUS', - 'SCRAM-SHA-1', 'SCRAM-SHA-256', 'SCRAM-SHA-256-PLUS', 'PLAIN' - :param on_finished: Callback after SASL is finished - :param get_password: Callback that must return the password for the - chosen mechanism + :param client: Client object """ - PlugIn.__init__(self) - self.username = username - self._on_finished = on_finished - self._get_password = get_password - - self._prefered_mechs = auth_mechs - self._enabled_mechs = self._prefered_mechs or self._default_mechs - self._chosen_mechanism = None - self._method = None + self._client = client - self._channel_binding = None - self._domain_based_name = None - - def _setup_mechs(self): - if self._owner.connected in ('ssl', 'tls'): - if self._owner.protocol_type == 'BOSH': - # PLUS would break if the server uses any kind of reverse proxy - self._enabled_mechs.discard('SCRAM-SHA-1-PLUS') - self._enabled_mechs.discard('SCRAM-SHA-256-PLUS') - else: - self._channel_binding = self._owner.Connection.NonBlockingTLS.get_channel_binding() - # TLS handshake is finished so channel binding data muss exist - if self._channel_binding is None: - raise ValueError('No channel binding data found') - - else: - self._enabled_mechs.discard('SCRAM-SHA-1-PLUS') - self._enabled_mechs.discard('SCRAM-SHA-256-PLUS') - if self._prefered_mechs is None: - # if the client didnt specify any auth mechs avoid - # sending the password over a plain connection - self._enabled_mechs.discard('PLAIN') + self._password = None - if not KERBEROS_AVAILABLE: - self._enabled_mechs.discard('GSSAPI') + self._allowed_mechs = None + self._enabled_mechs = None + self._method = None + self._error = None - def plugin(self, _owner): - self._setup_mechs() - self._owner.RegisterHandler( - 'challenge', self._on_challenge, xmlns=NS_SASL) - self._owner.RegisterHandler( - 'failure', self._on_failure, xmlns=NS_SASL) - self._owner.RegisterHandler( - 'success', self._on_success, xmlns=NS_SASL) + @property + def error(self): + return self._error - # Execute the Handler manually, we already received the features - self._on_features(None, self._owner.Dispatcher.Stream.features) + def set_password(self, password): + self._password = password - def plugout(self): - """ - Remove SASL handlers from owner's dispatcher. Used internally - """ - self._owner.UnregisterHandler( - 'challenge', self._on_challenge, xmlns=NS_SASL) - self._owner.UnregisterHandler( - 'failure', self._on_failure, xmlns=NS_SASL) - self._owner.UnregisterHandler( - 'success', self._on_success, xmlns=NS_SASL) - - def _on_features(self, _con, stanza): - """ - Used to determine if server supports SASL auth. Used internally - """ - if not stanza.getTag('mechanisms', namespace=NS_SASL): + def delegate(self, stanza): + if stanza.getNamespace() != NS_SASL: return + if stanza.getName() == 'challenge': + self._on_challenge(stanza) + elif stanza.getName() == 'failure': + self._on_failure(stanza) + elif stanza.getName() == 'success': + self._on_success(stanza) + + def start_auth(self, features): + self._allowed_mechs = self._client.mechs + self._enabled_mechs = self._allowed_mechs + self._method = None + self._error = None - mechanisms = stanza.getTag('mechanisms', namespace=NS_SASL) - mechanisms = mechanisms.getTags('mechanism') + # -PLUS variants need TLS channel binding data + # This is currently not supported via GLib + self._enabled_mechs.discard('SCRAM-SHA-1-PLUS') + self._enabled_mechs.discard('SCRAM-SHA-256-PLUS') + # channel_binding_data = None - mechs = set(mech.getData() for mech in mechanisms) - available_mechs = mechs & self._enabled_mechs + if not KERBEROS_AVAILABLE: + self._enabled_mechs.discard('GSSAPI') + available_mechs = features.get_mechs() & self._enabled_mechs log.info('Available mechanisms: %s', available_mechs) - hostname = stanza.getTag('hostname', namespace=NS_DOMAIN_BASED_NAME) - if hostname is not None: - self._domain_based_name = hostname.getData() - log.info('Found domain based name: %s', self._domain_based_name) + domain_based_name = features.get_domain_based_name() + if domain_based_name is not None: + log.info('Found domain based name: %s', domain_based_name) if not available_mechs: log.error('No available auth mechanisms found') self._abort_auth('invalid-mechanism') return + chosen_mechanism = None for mech in SASL_AUTH_MECHS: if mech in available_mechs: - self._chosen_mechanism = mech + chosen_mechanism = mech break - if self._chosen_mechanism is None: + if chosen_mechanism is None: log.error('No available auth mechanisms found') self._abort_auth('invalid-mechanism') return - log.info('Chosen auth mechanism: %s', self._chosen_mechanism) - self._auth() + log.info('Chosen auth mechanism: %s', chosen_mechanism) - def _auth(self): - password_cb = partial(self._on_password, self.username) + if chosen_mechanism in ('SCRAM-SHA-256', 'SCRAM-SHA-1', 'PLAIN'): + if not self._password: + self._on_sasl_finished(False, 'no-password') + return - if self._chosen_mechanism == 'SCRAM-SHA-256-PLUS': - self._method = SCRAM_SHA_256_PLUS(self._owner.Connection, - self._channel_binding) - self._get_password(self._chosen_mechanism, password_cb) + # if chosen_mechanism == 'SCRAM-SHA-256-PLUS': + # self._method = SCRAM_SHA_256_PLUS(self._client, + # channel_binding_data) + # self._method.initiate(self._client.username, self._password) - elif self._chosen_mechanism == 'SCRAM-SHA-256': - self._method = SCRAM_SHA_256(self._owner.Connection, None) - self._get_password(self._chosen_mechanism, password_cb) + # elif chosen_mechanism == 'SCRAM-SHA-1-PLUS': + # self._method = SCRAM_SHA_1_PLUS(self._client, + # channel_binding_data) + # self._method.initiate(self._client.username, self._password) - elif self._chosen_mechanism == 'SCRAM-SHA-1-PLUS': - self._method = SCRAM_SHA_1_PLUS(self._owner.Connection, - self._channel_binding) - self._get_password(self._chosen_mechanism, password_cb) + if chosen_mechanism == 'SCRAM-SHA-256': + self._method = SCRAM_SHA_256(self._client, None) + self._method.initiate(self._client.username, self._password) - elif self._chosen_mechanism == 'SCRAM-SHA-1': - self._method = SCRAM_SHA_1(self._owner.Connection, None) - self._get_password(self._chosen_mechanism, password_cb) + elif chosen_mechanism == 'SCRAM-SHA-1': + self._method = SCRAM_SHA_1(self._client, None) + self._method.initiate(self._client.username, self._password) - elif self._chosen_mechanism == 'PLAIN': - self._method = PLAIN(self._owner.Connection) - self._get_password(self._chosen_mechanism, password_cb) + elif chosen_mechanism == 'PLAIN': + self._method = PLAIN(self._client) + self._method.initiate(self._client.username, self._password) - elif self._chosen_mechanism == 'ANONYMOUS': - self._method = ANONYMOUS(self._owner.Connection) + elif chosen_mechanism == 'ANONYMOUS': + self._method = ANONYMOUS(self._client) self._method.initiate() - elif self._chosen_mechanism == 'EXTERNAL': - self._method = EXTERNAL(self._owner.Connection) - self._method.initiate(self.username, self._owner.Server) + elif chosen_mechanism == 'EXTERNAL': + self._method = EXTERNAL(self._client) + self._method.initiate(self._client.username, self._client.Server) - elif self._chosen_mechanism == 'GSSAPI': - self._method = GSSAPI(self._owner.Connection) - self._method.initiate(self._domain_based_name or - self._owner.xmpp_hostname) + elif chosen_mechanism == 'GSSAPI': + self._method = GSSAPI(self._client) + self._method.initiate(domain_based_name or + self._client.domain) else: log.error('Unknown auth mech') - def _on_password(self, username, password): - if password is None: - log.warning('No password supplied') - return - self._method.initiate(username, password) - - def _on_challenge(self, _con, stanza): + def _on_challenge(self, stanza): try: self._method.response(stanza.getData()) except AttributeError: @@ -220,9 +167,8 @@ class SASL(PlugIn): except AuthFail as error: log.error(error) self._abort_auth() - raise NodeProcessed - def _on_success(self, _con, stanza): + def _on_success(self, stanza): log.info('Successfully authenticated with remote server') try: self._method.success(stanza.getData()) @@ -231,14 +177,13 @@ class SASL(PlugIn): except AuthFail as error: log.error(error) self._abort_auth() - raise NodeProcessed + return - self._on_finished(True, None, None) - raise NodeProcessed + self._on_sasl_finished(True, None, None) - def _on_failure(self, _con, stanza): + def _on_failure(self, stanza): text = stanza.getTagData('text') - reason = 'not-authorized' + reason = 'unknown-error' childs = stanza.getChildren() for child in childs: name = child.getName() @@ -250,63 +195,68 @@ class SASL(PlugIn): log.info('Failed SASL authentification: %s %s', reason, text) self._abort_auth(reason, text) - raise NodeProcessed def _abort_auth(self, reason='malformed-request', text=None): node = Node('abort', attrs={'xmlns': NS_SASL}) - self._owner.send(node) - self._owner.Connection.start_disconnect() - self._on_finished(False, reason, text) + self._client.send_nonza(node) + self._on_sasl_finished(False, reason, text) + + def _on_sasl_finished(self, successful, reason, text=None): + if not successful: + self._error = (reason, text) + self._client.set_state(StreamState.AUTH_FAILED) + else: + self._client.set_state(StreamState.AUTH_SUCCESSFUL) class PLAIN: _mechanism = 'PLAIN' - def __init__(self, con): - self._con = con + def __init__(self, client): + self._client = client def initiate(self, username, password): payload = b64encode('\x00%s\x00%s' % (username, password)) node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'PLAIN'}, payload=[payload]) - self._con.send(node) + self._client.send_nonza(node) class EXTERNAL: _mechanism = 'EXTERNAL' - def __init__(self, con): - self._con = con + def __init__(self, client): + self._client = client def initiate(self, username, server): payload = b64encode('%s@%s' % (username, server)) node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'EXTERNAL'}, payload=[payload]) - self._con.send(node) + self._client.send_nonza(node) class ANONYMOUS: _mechanism = 'ANONYMOUS' - def __init__(self, con): - self._con = con + def __init__(self, client): + self._client = client def initiate(self): node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'ANONYMOUS'}) - self._con.send(node) + self._client.send_nonza(node) class GSSAPI: _mechanism = 'GSSAPI' - def __init__(self, con): - self._con = con + def __init__(self, client): + self._client = client self._gss_vc = None self._state = GSSAPIState.STEP @@ -317,7 +267,7 @@ class GSSAPI: node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'GSSAPI'}, payload=(response or '')) - self._con.send(node) + self._client.send_nonza(node) def response(self, server_message, *args, **kwargs): server_message = b64decode(server_message, bytes) @@ -339,7 +289,7 @@ class GSSAPI: node = Node('response', attrs={'xmlns': NS_SASL}, payload=response) - self._con.send(node) + self._client.send_nonza(node) class SCRAM: @@ -348,8 +298,8 @@ class SCRAM: _channel_binding = '' _hash_method = '' - def __init__(self, con, channel_binding): - self._con = con + def __init__(self, client, channel_binding): + self._client = client self._channel_binding_data = channel_binding self._client_nonce = '%x' % int(binascii.hexlify(os.urandom(24)), 16) self._client_first_message_bare = None @@ -377,11 +327,12 @@ class SCRAM: self._client_nonce) client_first_message = '%s%s' % (self._channel_binding, self._client_first_message_bare) + payload = b64encode(client_first_message) node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': self._mechanism}, payload=[payload]) - self._con.send(node) + self._client.send_nonza(node) def response(self, server_first_message): server_first_message = b64decode(server_first_message) @@ -429,7 +380,7 @@ class SCRAM: node = Node('response', attrs={'xmlns': NS_SASL}, payload=[payload]) - self._con.send(node) + self._client.send_nonza(node) def success(self, server_last_message): server_last_message = b64decode(server_last_message) @@ -454,7 +405,7 @@ class SCRAM: class SCRAM_SHA_1(SCRAM): _mechanism = 'SCRAM-SHA-1' - _channel_binding = 'y,,' + _channel_binding = 'n,,' _hash_method = 'sha1' @@ -467,7 +418,7 @@ class SCRAM_SHA_1_PLUS(SCRAM_SHA_1): class SCRAM_SHA_256(SCRAM): _mechanism = 'SCRAM-SHA-256' - _channel_binding = 'y,,' + _channel_binding = 'n,,' _hash_method = 'sha256' diff --git a/nbxmpp/client.py b/nbxmpp/client.py new file mode 100644 index 0000000..9583aad --- /dev/null +++ b/nbxmpp/client.py @@ -0,0 +1,757 @@ +# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com> +# +# This file is part of nbxmpp. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 3 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; If not, see <http://www.gnu.org/licenses/>. + +import logging + +from gi.repository import GLib + +from nbxmpp.protocol import NS_TLS +from nbxmpp.protocol import NS_PING +from nbxmpp.protocol import Features +from nbxmpp.protocol import StanzaMalformed +from nbxmpp.protocol import SessionRequest +from nbxmpp.protocol import BindRequest +from nbxmpp.protocol import TLSRequest +from nbxmpp.protocol import isResultNode +from nbxmpp.protocol import JID +from nbxmpp.protocol import Iq +from nbxmpp.protocol import Protocol +from nbxmpp.protocol import WebsocketCloseHeader +from nbxmpp.addresses import ServerAddresses +from nbxmpp.addresses import NoMoreAddresses +from nbxmpp.tcp import TCPConnection +from nbxmpp.websocket import WebsocketConnection +from nbxmpp.smacks import Smacks +from nbxmpp.auth import SASL +from nbxmpp.const import StreamState +from nbxmpp.const import StreamError +from nbxmpp.const import ConnectionType +from nbxmpp.const import ConnectionProtocol +from nbxmpp.const import Mode +from nbxmpp.dispatcher import StanzaDispatcher +from nbxmpp.util import get_stream_header +from nbxmpp.util import get_stanza_id +from nbxmpp.util import Observable +from nbxmpp.util import validate_stream_header +from nbxmpp.util import is_error_result + +log = logging.getLogger('nbxmpp.stream') + +# TODO: check if signals make sense + + +class Client(Observable): + def __init__(self): + ''' + Signals: + resume-failed + resume-successful + login-successful + disconnected + connected + connection-failed + stanza-sent + stanza-received + ''' + Observable.__init__(self, log) + self._jid = None + self._lang = 'en' + self._domain = None + self._username = None + self._resource = None + + self._custom_host = None + + self._addresses = None + self._current_address = None + self._address_generator = None + + self._client_cert = None + self._client_cert_pass = None + self._proxy = None + + self._allowed_con_types = None + self._allowed_protocols = None + self._allowed_mechs = None + + self._stream_id = None + self._stream_secure = False + self._stream_authenticated = False + self._stream_features = None + self._session_required = False + self._connect_successful = False + self._stream_close_initiated = False + self._error = None, None, None + + self._ignored_tls_errors = [] + self._ignore_tls_errors = False + self._accepted_certificates = [] + self._peer_certificate = None + self._peer_certificate_errors = None + + self._con = None + self._mode = Mode.CLIENT + + self._ping_source_id = None + + self._dispatcher = StanzaDispatcher(self) + self._dispatcher.subscribe('before-dispatch', self._on_before_dispatch) + self._dispatcher.subscribe('parsing-error', self._on_parsing_error) + self._dispatcher.subscribe('stream-end', self._on_stream_end) + + self._smacks = Smacks(self) + self._sasl = SASL(self) + + self._state = StreamState.DISCONNECTED + + @property + def features(self): + return self._stream_features + + @property + def sm_supported(self): + return self._smacks.sm_supported + + @property + def lang(self): + return self._lang + + @property + def username(self): + return self._username + + @property + def domain(self): + return self._domain + + @property + def resource(self): + return self._resource + + def set_username(self, username): + self._username = username + + def set_domain(self, domain): + self._domain = domain + + def set_resource(self, resource): + self._resource = resource + + def set_mode(self, mode): + self._mode = mode + + def set_custom_host(self, host, protocol, type_): + if self._domain is None: + raise ValueError('Call set_domain() first before set_custom_host()') + self._custom_host = (host, protocol, type_) + + def set_accepted_certificates(self, certificates): + self._accepted_certificates = certificates + + @property + def ignored_tls_errors(self): + return self._ignored_tls_errors + + def set_ignored_tls_errors(self, errors): + self._ignored_tls_errors = errors + + @property + def ignore_tls_errors(self): + return self._ignore_tls_errors + + def set_ignore_tls_errors(self, ignore): + self._ignore_tls_errors = ignore + + def set_password(self, password): + self._sasl.set_password(password) + + @property + def peer_certificate(self): + return self._peer_certificate, self._peer_certificate_errors + + @property + def current_connection_type(self): + return self._current_address.type + + @property + def is_websocket(self): + return self._current_address.protocol == ConnectionProtocol.WEBSOCKET + + @property + def stream_id(self): + return self._stream_id + + @property + def is_stream_secure(self): + direct_tls = self.current_connection_type == ConnectionType.DIRECT_TLS + return self._stream_secure or direct_tls + + @property + def is_stream_authenticated(self): + return self._stream_authenticated + + @property + def state(self): + return self._state + + @state.setter + def state(self, value): + self._state = value + log.info('Set state: %s', value) + + def set_state(self, state): + self.state = state + self._xmpp_state_machine() + + @property + def connection_types(self): + return list(self._allowed_con_types or [ConnectionType.DIRECT_TLS, + ConnectionType.START_TLS]) + + def set_connection_types(self, con_types): + self._allowed_con_types = con_types + + @property + def mechs(self): + return set(self._allowed_mechs or set(['SCRAM-SHA-256', + 'SCRAM-SHA-1', + # 'SCRAM-SHA-256-PLUS', + # 'SCRAM-SHA-1-PLUS', + 'PLAIN'])) + + def set_mechs(self, mechs): + self._allowed_mechs = mechs + + @property + def protocols(self): + return list(self._allowed_protocols or [ConnectionProtocol.TCP, + ConnectionProtocol.WEBSOCKET]) + + def set_protocols(self, protocols): + self._allowed_protocols = protocols + + @property + def client_cert(self): + return self._client_cert, self._client_cert_pass + + def set_client_cert(self, client_cert, client_cert_pass): + self._client_cert = client_cert + self._client_cert_pass = client_cert_pass + + def set_proxy(self, proxy): + self._proxy = proxy + self._dispatcher.get_module('Muclumbus').set_proxy(proxy) + + def get_bound_jid(self): + return self._jid + + def _set_bound_jid(self, jid): + self._jid = JID(jid) + + @property + def has_error(self): + return self._error[0] is not None + + def get_error(self): + return self._error + + def _reset_error(self): + self._error = None, None, None + + def _set_error(self, domain, error, text=None): + log.info('Set error: %s, %s, %s', domain, error, text) + self._error = domain, error, text + + def _connect(self): + if self._state not in (StreamState.DISCONNECTED, StreamState.RESOLVED): + log.error('Stream can\'t connect, stream state: %s', self._state) + return + + self.state = StreamState.CONNECTING + self._reset_error() + + self._con = self._get_connection(self._current_address, + self._accepted_certificates, + self._ignore_tls_errors, + self._ignored_tls_errors, + self.client_cert) + + self._con.subscribe('connected', self._on_connected) + self._con.subscribe('connection-failed', self._on_connection_failed) + self._con.subscribe('disconnected', self._on_disconnected) + self._con.subscribe('data-sent', self._on_data_sent) + self._con.subscribe('data-received', self._on_data_received) + self._con.subscribe('bad-certificate', self._on_bad_certificate) + self._con.subscribe('certificate-set', self._on_certificate_set) + self._con.connect() + + def _get_connection(self, *args): + if self.is_websocket: + return WebsocketConnection(*args) + return TCPConnection(*args) + + def connect(self): + if self._state != StreamState.DISCONNECTED: + log.error('Stream can\'t reconnect, stream state: %s', self._state) + return + + if self._connect_successful: + log.info('Reconnect') + self._connect() + return + + log.info('Connect') + self._reset_error() + self.state = StreamState.RESOLVE + + self._addresses = ServerAddresses(self._domain) + self._addresses.set_custom_host(self._custom_host) + self._addresses.set_proxy(self._proxy) + self._addresses.subscribe('resolved', self._on_addresses_resolved) + self._addresses.resolve() + + def _on_addresses_resolved(self, _addresses, _signal_name): + log.info('Domain resolved') + log.info(self._addresses) + self.state = StreamState.RESOLVED + self._address_generator = self._addresses.get_next_address( + self.connection_types, + self.protocols) + + self._try_next_ip() + + def _try_next_ip(self, *args): + try: + self._current_address = next(self._address_generator) + except NoMoreAddresses: + self._current_address = None + self.state = StreamState.DISCONNECTED + log.error('Unable to connect to %s', self._addresses.domain) + self._set_error(StreamError.CONNECTION_FAILED, + 'connection-failed', + 'Unable to connect to %s' % self._addresses.domain) + self.notify('connection-failed') + return + + log.info('Current address: %s', self._current_address) + self._connect() + + def disconnect(self, immediate=False): + if self._state == StreamState.RESOLVE: + self._addresses.cancel_resolve() + self.state = StreamState.DISCONNECTED + return + + if self._state == StreamState.CONNECTING: + self._disconnect() + return + + if self._state in (StreamState.DISCONNECTED, + StreamState.DISCONNECTING): + log.warning('Stream can\'t disconnect, stream state: %s', + self._state) + return + + self._disconnect(immediate=immediate) + + def _disconnect(self, immediate=True): + self.state = StreamState.DISCONNECTING + self._remove_ping_timer() + if not immediate: + self._stream_close_initiated = True + self._smacks.close_session() + self._end_stream() + self._con.shutdown_output() + else: + self._con.disconnect() + + def send(self, stanza, *args, **kwargs): + # Alias for backwards compat + return self.send_stanza(stanza) + + def _on_connected(self, _connection, _signal_name): + self._connect_successful = True + self.set_state(StreamState.CONNECTED) + + def _on_disconnected(self, _connection, _signal_name): + self.state = StreamState.DISCONNECTED + self._reset_stream() + self.notify('disconnected') + + def _on_connection_failed(self, _connection, _signal_name): + self.state = StreamState.DISCONNECTED + self._reset_stream() + if not self._connect_successful: + self._try_next_ip() + else: + self._set_error(StreamError.CONNECTION_FAILED, + 'connection-failed', + 'Unable to connect to last successful address: %s' % str(self._current_address)) + self.notify('connection-failed') + + def _disconnect_with_error(self, error_domain, error, text=None): + self._set_error(error_domain, error, text) + self.disconnect() + + def _on_parsing_error(self, _dispatcher, _signal_name, error): + self._disconnect_with_error(StreamError.PARSING, 'parsing-error', error) + + def _on_stream_end(self, _dispatcher, _signal_name, error): + if not self.has_error: + self._set_error(StreamError.STREAM, 'stream-end', error) + + self._con.shutdown_input() + if not self._stream_close_initiated: + self.state = StreamState.DISCONNECTING + self._smacks.close_session() + self._end_stream() + self._con.shutdown_output() + + def _reset_stream(self): + self._stream_id = None + self._stream_secure = False + self._stream_authenticated = False + self._stream_features = None + self._session_required = False + self._con = None + + def _end_stream(self): + if self.is_websocket: + nonza = WebsocketCloseHeader() + else: + nonza = '</stream:stream>' + self.send_nonza(nonza) + + def get_module(self, name): + return self._dispatcher.get_module(name) + + def _on_bad_certificate(self, connection, _signal_name): + self._peer_certificate, self._peer_certificate_errors = connection.peer_certificate + self._set_error(StreamError.BAD_CERTIFICATE, 'bad certificate') + + def _on_certificate_set(self, connection, _signal_name): + self._peer_certificate, self._peer_certificate_errors = connection.peer_certificate + + def accept_certificate(self): + log.info('Certificate accepted') + self._accepted_certificates.append(self._peer_certificate) + self._connect() + + def _on_data_sent(self, _connection, _signal_name, data): + self.notify('stanza-sent', data) + + def _on_before_dispatch(self, _dispatcher, _signal_name, data): + self.notify('stanza-received', data) + + def _on_data_received(self, _connection, _signal_name, data): + self._dispatcher.process_data(data) + self._reset_ping_timer() + + def _reset_ping_timer(self): + if self.is_websocket: + return + + if not self._mode.is_client: + return + + if self.state != StreamState.ACTIVE: + return + + if self._ping_source_id is not None: + log.info('Remove ping timer') + GLib.source_remove(self._ping_source_id) + self._ping_source_id = None + + log.info('Start ping timer') + self._ping_source_id = GLib.timeout_add_seconds(180, self._ping) + + def _remove_ping_timer(self): + if self._ping_source_id is None: + return + log.info('Remove ping timer') + GLib.source_remove(self._ping_source_id) + self._ping_source_id = None + + def send_stanza(self, stanza, now=False, callback=None, + timeout=None, user_data=None): + if user_data is not None and not isinstance(user_data, dict): + raise ValueError('arg user_data must be of dict type') + + if not isinstance(stanza, Protocol): + raise ValueError('Nonzas not allowed, use send_nonza()') + + id_ = stanza.getID() + if id_ is None: + id_ = get_stanza_id() + stanza.setID(id_) + + if callback is not None: + self._dispatcher.add_callback_for_id( + id_, callback, timeout, user_data) + self._con.send(stanza, now) + self._smacks.save_in_queue(stanza) + return id_ + + def SendAndCallForResponse(self, stanza, callback, user_data=None): + self.send_stanza(stanza, callback=callback, user_data=user_data) + + def send_nonza(self, nonza, now=False): + self._con.send(nonza, now) + + def _xmpp_state_machine(self, stanza=None): + log.info('Execute state machine') + if stanza is not None: + if stanza.getName() == 'error': + log.info('Stream error') + # TODO: + # self._disconnect_with_error(StreamError.SASL, + # stanza.get_condition()) + return + + if self.state == StreamState.CONNECTED: + self._dispatcher.set_dispatch_callback(self._xmpp_state_machine) + if (self.current_connection_type == ConnectionType.DIRECT_TLS and + not self.is_websocket): + self._con.start_tls_negotiation() + self._stream_secure = True + self._start_stream() + return + + self._start_stream() + + elif self.state == StreamState.WAIT_FOR_STREAM_START: + try: + self._stream_id = validate_stream_header(stanza, + self._domain, + self.is_websocket) + except StanzaMalformed as error: + log.error(error) + self._disconnect_with_error(StreamError.STREAM, + 'stanza-malformed', + 'Invalid stream header') + return + + self.state = StreamState.WAIT_FOR_FEATURES + + elif self.state == StreamState.WAIT_FOR_FEATURES: + if stanza.getName() != 'features': + log.error('Invalid response: %s', stanza) + self._disconnect_with_error( + StreamError.STREAM, + 'stanza-malformed', + 'Invalid response, expected features') + return + self._on_stream_features(Features(stanza)) + + elif self.state == StreamState.WAIT_FOR_TLS_PROCEED: + if stanza.getNamespace() != NS_TLS: + self._disconnect_with_error( + StreamError.TLS, + 'stanza-malformed', + 'Invalid namespace for TLS response') + return + + if stanza.getName() == 'failure': + self._disconnect_with_error(StreamError.TLS, + 'negotiation-failed') + return + + if stanza.getName() == 'proceed': + self._con.start_tls_negotiation() + self._stream_secure = True + self._start_stream() + return + + log.error('Invalid response') + self._disconnect_with_error(StreamError.TLS, + 'stanza-malformed', + 'Invalid TLS response') + return + + elif self.state == StreamState.PROCEED_WITH_AUTH: + self._sasl.delegate(stanza) + + elif self.state == StreamState.AUTH_SUCCESSFUL: + self._stream_authenticated = True + if self._mode.is_login_test: + self.notify('login-successful') + self.disconnect() + return + + self._start_stream() + + elif self.state == StreamState.AUTH_FAILED: + self._disconnect_with_error(StreamError.SASL, + *self._sasl.error) + + elif self.state == StreamState.WAIT_FOR_BIND: + self._on_bind(stanza) + + elif self.state == StreamState.BIND_FAILED: + self._disconnect_with_error(StreamError.BIND, 'bind-failed') + + elif self.state == StreamState.BIND_SUCCESSFUL: + self._smacks.send_enable() + self._dispatcher.set_dispatch_callback(None) + self.state = StreamState.ACTIVE + self.notify('connected') + + elif self.state == StreamState.WAIT_FOR_SESSION: + self._on_session(stanza) + + elif self.state == StreamState.SESSION_FAILED: + self._disconnect_with_error(StreamError.SESSION, 'session-failed') + + elif self.state == StreamState.WAIT_FOR_RESUMED: + self._smacks.delegate(stanza) + + elif self.state == StreamState.RESUME_FAILED: + self.notify('resume-failed') + self._start_bind() + + elif self.state == StreamState.RESUME_SUCCESSFUL: + self._dispatcher.set_dispatch_callback(None) + self.state = StreamState.ACTIVE + self.notify('resume-successful') + + def _on_stream_features(self, features): + if self.is_stream_authenticated: + self._stream_features = features + self._smacks.sm_supported = features.has_sm() + self._session_required = features.session_required() + if self._smacks.resume_supported: + self._smacks.resume_request() + self.state = StreamState.WAIT_FOR_RESUMED + else: + self._start_bind() + + elif self.is_stream_secure: + if self._mode.is_register: + if features.has_register: + self.state = StreamState.ACTIVE + self._dispatcher.set_dispatch_callback(None) + self.notify('connected') + else: + self._disconnect_with_error(StreamError.REGISTER, + 'register-not-supported') + return + + self._start_auth(features) + + else: + tls_supported, required = features.has_starttls() + if self._current_address.type == ConnectionType.PLAIN: + if tls_supported and required: + log.error('Server requires TLS') + self._disconnect_with_error(StreamError.TLS, 'tls-required') + return + self._start_auth(features) + return + + if not tls_supported: + log.error('Server does not support TLS') + self._disconnect_with_error(StreamError.TLS, + 'tls-not-supported') + return + self._start_tls() + + def _start_stream(self): + log.info('Start stream') + self._stream_id = None + self._dispatcher.reset_parser() + header = get_stream_header(self._domain, self._lang, self.is_websocket) + self.send_nonza(header) + self.state = StreamState.WAIT_FOR_STREAM_START + + def _start_tls(self): + self.send_nonza(TLSRequest()) + self.state = StreamState.WAIT_FOR_TLS_PROCEED + + def _start_auth(self, features): + if not features.has_sasl(): + log.error('Server does not support SASL') + self._disconnect_with_error(StreamError.SASL, + 'sasl-not-supported') + return + self.state = StreamState.PROCEED_WITH_AUTH + self._sasl.start_auth(features) + + def _start_bind(self): + log.info('Send bind') + bind_request = BindRequest(self.resource) + self.send_stanza(bind_request) + self.state = StreamState.WAIT_FOR_BIND + + def _on_bind(self, stanza): + if not isResultNode(stanza): + log.error('Binding failed: %s.', stanza.getTag('error')) + if stanza.getError() == 'conflict' and self.resource is not None: + log.info('Try to request server generated resource') + self._start_bind() + return + self.set_state(StreamState.BIND_FAILED) + return + + jid = stanza.getTag('bind').getTagData('jid') + log.info('Successfully bound %s', jid) + self._set_bound_jid(jid) + + if not self._session_required: + # Server don't want us to initialize a session + log.info('No session required') + self.set_state(StreamState.BIND_SUCCESSFUL) + else: + session_request = SessionRequest() + self.send_stanza(session_request) + self.state = StreamState.WAIT_FOR_SESSION + + def _on_session(self, stanza): + if isResultNode(stanza): + log.info('Successfully started session') + self.set_state(StreamState.BIND_SUCCESSFUL) + else: + log.error('Session open failed') + self.set_state(StreamState.SESSION_FAILED) + + def _ping(self): + iq = Iq('get', to=self._jid.getBare()) + iq.addChild(name='ping', namespace=NS_PING) + self.send_stanza(iq, timeout=10, callback=self._on_pong) + log.info('Ping') + + def _on_pong(self, _client, result): + self._ping_source_id = None + if is_error_result(result): + if result.condition == 'timeout': + log.info('Ping timeout') + self._disconnect(immediate=True) + return + log.info('Pong') + + def register_handler(self, *args, **kwargs): + self._dispatcher.register_handler(*args, **kwargs) + + def unregister_handler(self, *args, **kwargs): + self._dispatcher.unregister_handler(*args, **kwargs) + + def destroy(self): + self._remove_ping_timer() + self._smacks = None + self._sasl = None + self._dispatcher.cleanup() + self._dispatcher = None + self.remove_subscriptions() diff --git a/nbxmpp/connection.py b/nbxmpp/connection.py new file mode 100644 index 0000000..70a79d0 --- /dev/null +++ b/nbxmpp/connection.py @@ -0,0 +1,130 @@ +# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com> +# +# This file is part of nbxmpp. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 3 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; If not, see <http://www.gnu.org/licenses/>. + +import logging + +from gi.repository import Gio + +from nbxmpp.const import TCPState +from nbxmpp.util import Observable + +log = logging.getLogger('nbxmpp.connection') + + +class Connection(Observable): + ''' + Base Connection Class + + Signals: + + data-sent + data-received + bad-certificate + certificate-set + connection-failed + disconnected + ''' + def __init__(self, + address, + accepted_certificates, + ignore_tls_errors, + ignored_tls_errors, + client_cert): + Observable.__init__(self, log) + + self._client_cert = client_cert + self._address = address + self._state = None + + self._state = TCPState.DISCONNECTED + + self._peer_certificate = None + self._peer_certificate_errors = None + self._accepted_certificates = accepted_certificates + self._ignore_tls_errors = ignore_tls_errors + self._ignored_tls_errors = ignored_tls_errors + + @property + def peer_certificate(self): + return (self._peer_certificate, self._peer_certificate_errors) + + @property + def connection_type(self): + return self._address.type + + @property + def state(self): + return self._state + + @state.setter + def state(self, value): + log.info('Set Connection State: %s', value) + self._state = value + + def _accept_certificate(self): + if not self._peer_certificate_errors: + return True + + log.info('Found TLS certificate errors: %s', + self._peer_certificate_errors) + + if self._ignore_tls_errors: + log.warning('Ignore all errors') + return True + + if self._ignored_tls_errors: + log.warning('Ignore TLS certificate errors: %s', + self._ignored_tls_errors) + self._peer_certificate_errors -= self._ignored_tls_errors + + if Gio.TlsCertificateFlags.UNKNOWN_CA in self._peer_certificate_errors: + for accepted_certificate in self._accepted_certificates: + if certificate.is_same(accepted_certificate): + self._peer_certificate_errors.discard( + Gio.TlsCertificateFlags.UNKNOWN_CA) + break + + if not self._peer_certificate_errors: + return True + return False + + def disconnect(self): + raise NotImplementedError + + def connect(self): + raise NotImplementedError + + def send(self, stanza, now=False): + raise NotImplementedError + + @staticmethod + def _log_stanza(data, received=True): + direction = 'RECEIVED' if received else 'SENT' + message = ('::::: DATA %s ::::' + '\n_____________\n' + '%s' + '\n_____________') + log.info(message, direction, data) + + def start_tls_negotiation(self): + raise NotImplementedError + + def destroy(self): + self.remove_subscriptions() + self._peer_certificate = None + self._client_cert = None + self._address = None diff --git a/nbxmpp/const.py b/nbxmpp/const.py index 9877a35..ef19585 100644 --- a/nbxmpp/const.py +++ b/nbxmpp/const.py @@ -17,31 +17,9 @@ from enum import Enum from enum import IntEnum -from enum import unique from functools import total_ordering - -@unique -class Realm(Enum): - CONNECTING = 'Connecting' - - def __str__(self): - return self.value - - -@unique -class Event(Enum): - AUTH_SUCCESSFUL = 'Auth successful' - AUTH_FAILED = 'Auth failed' - BIND_FAILED = 'Bind failed' - SESSION_FAILED = 'Session failed' - RESUME_SUCCESSFUL = 'Resume successful' - RESUME_FAILED = 'Resume failed' - CONNECTION_ACTIVE = 'Connection active' - - def __str__(self): - return self.value - +from gi.repository import Gio class GSSAPIState(IntEnum): STEP = 0 @@ -341,6 +319,94 @@ class AdHocNoteType(Enum): ERROR = 'error' +class ConnectionType(Enum): + DIRECT_TLS = 'ssl' + START_TLS = 'tls' + PLAIN = 'plain' + + @property + def is_direct_tls(self): + return self == ConnectionType.DIRECT_TLS + + @property + def is_start_tls(self): + return self == ConnectionType.START_TLS + + @property + def is_plain(self): + return self == ConnectionType.PLAIN + + +class ConnectionProtocol(IntEnum): + TCP = 0 + WEBSOCKET = 1 + + +class StreamState(Enum): + RESOLVE = 'resolve' + RESOLVED = 'resolved' + CONNECTING = 'connecting' + CONNECTED = 'connected' + DISCONNECTED = 'disconnected' + DISCONNECTING = 'disconnecting' + STREAM_START = 'stream start' + WAIT_FOR_STREAM_START = 'wait for stream start' + WAIT_FOR_FEATURES = 'wait for features' + WAIT_FOR_TLS_PROCEED = 'wait for tls proceed' + TLS_START_SUCCESSFUL = 'tls start successful' + PROCEED_WITH_AUTH = 'proceed with auth' + AUTH_SUCCESSFUL = 'auth successful' + AUTH_FAILED = 'auth failed' + SESSION_FAILED = 'session failed' + WAIT_FOR_RESUMED = 'wait for resumed' + RESUME_FAILED = 'resume failed' + RESUME_SUCCESSFUL = 'resume successful' + PROCEED_WITH_BIND = 'proceed with bind' + BIND_SUCCESSFUL = 'bind successful' + BIND_FAILED = 'bind failed' + WAIT_FOR_BIND = 'wait for bind' + WAIT_FOR_SESSION = 'wait for session' + ACTIVE = 'active' + + +class StreamError(Enum): + PARSING = 0 + CONNECTION_FAILED = 1 + SESSION = 2 + BIND = 3 + TLS = 4 + BAD_CERTIFICATE = 5 + STREAM = 6 + SASL = 7 + REGISTER = 8 + END = 9 + + +class TCPState(Enum): + DISCONNECTED = 'disconnected' + DISCONNECTING = 'disconnecting' + CONNECTING = 'connecting' + CONNECTED = 'connected' + + +class Mode(IntEnum): + CLIENT = 0 + REGISTER = 1 + LOGIN_TEST = 2 + + @property + def is_client(self): + return self == Mode.CLIENT + + @property + def is_register(self): + return self == Mode.REGISTER + + @property + def is_login_test(self): + return self == Mode.LOGIN_TEST + + MOODS = [ 'afraid', 'amazed', @@ -571,3 +637,15 @@ REGISTER_FIELDS = [ 'url', 'date', ] + +# pylint: disable=line-too-long +GIO_TLS_ERRORS = { + Gio.TlsCertificateFlags.UNKNOWN_CA: 'The signing certificate authority is not known', + Gio.TlsCertificateFlags.REVOKED: 'The certificate has been revoked', + Gio.TlsCertificateFlags.BAD_IDENTITY: 'The certificate does not match the expected identity of the site', + Gio.TlsCertificateFlags.INSECURE: 'The certificate’s algorithm is insecure', + Gio.TlsCertificateFlags.NOT_ACTIVATED: 'The certificate’s activation time is in the future', + Gio.TlsCertificateFlags.GENERIC_ERROR: 'Unknown validation error', + Gio.TlsCertificateFlags.EXPIRED: 'The certificate has expired', +} +# pylint: enable=line-too-long diff --git a/nbxmpp/dispatcher.py b/nbxmpp/dispatcher.py index 60f68d5..c91be32 100644 --- a/nbxmpp/dispatcher.py +++ b/nbxmpp/dispatcher.py @@ -1,36 +1,32 @@ -## dispatcher.py -## -## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov -## modified by Dimitur Kirov <dkirov@gmail.com> -## -## This program is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published by -## the Free Software Foundation; either version 2, or (at your option) -## any later version. -## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. - - -""" -Main xmpp decision making logic. Provides library with methods to assign -different handlers to different XMPP stanzas and namespaces -""" - -import sys -import locale -import re -import uuid +# Copyright (C) 2019 Philipp Hörist <philipp AT hoerist.com> +# +# This file is part of nbxmpp. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 3 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; If not, see <http://www.gnu.org/licenses/>. + import logging -import inspect +import re +import time from xml.parsers.expat import ExpatError +from gi.repository import GLib + from nbxmpp.simplexml import NodeBuilder -from nbxmpp.plugin import PlugIn +from nbxmpp.simplexml import Node from nbxmpp.protocol import NS_STREAMS -from nbxmpp.protocol import NS_HTTP_BIND +from nbxmpp.protocol import NS_CLIENT +from nbxmpp.protocol import NS_XMPP_STREAMS from nbxmpp.protocol import NodeProcessed from nbxmpp.protocol import InvalidFrom from nbxmpp.protocol import InvalidJid @@ -39,8 +35,8 @@ from nbxmpp.protocol import Iq from nbxmpp.protocol import Presence from nbxmpp.protocol import Message from nbxmpp.protocol import Protocol -from nbxmpp.protocol import Node from nbxmpp.protocol import Error +from nbxmpp.protocol import StreamErrorNode from nbxmpp.protocol import ERR_FEATURE_NOT_IMPLEMENTED from nbxmpp.modules.eme import EME from nbxmpp.modules.http_auth import HTTPAuth @@ -82,479 +78,288 @@ from nbxmpp.modules.register import Register from nbxmpp.modules.http_upload import HTTPUpload from nbxmpp.modules.misc import unwrap_carbon from nbxmpp.modules.misc import unwrap_mam +from nbxmpp.structs import StanzaTimeoutError from nbxmpp.util import get_properties_struct +from nbxmpp.util import get_invalid_xml_regex +from nbxmpp.util import is_websocket_close +from nbxmpp.util import is_websocket_stream_error +from nbxmpp.util import Observable log = logging.getLogger('nbxmpp.dispatcher') -#: default timeout to wait for response for our id -DEFAULT_TIMEOUT_SECONDS = 25 -XML_DECLARATION = '<?xml version=\'1.0\'?>' - -# FIXME: ugly -class Dispatcher: +class StanzaDispatcher(Observable): """ - Why is this here - I needed to redefine Dispatcher for BOSH and easiest way - was to inherit original Dispatcher (now renamed to XMPPDispatcher). Trouble - is that reference used to access dispatcher instance is in Client attribute - named by __class__.__name__ of the dispatcher instance .. long story short: + Dispatches stanzas to handlers - I wrote following to avoid changing each client.Dispatcher.whatever() in xmpp + Signals: + before-dispatch + parsing-error + stream-end - If having two kinds of dispatcher will go well, I will rewrite the dispatcher - references in other scripts """ - def PlugIn(self, client_obj, after_SASL=False, old_features=None): - if client_obj.protocol_type == 'XMPP': - XMPPDispatcher().PlugIn(client_obj) - elif client_obj.protocol_type == 'BOSH': - BOSHDispatcher().PlugIn(client_obj, after_SASL, old_features) - else: - assert False # should never be reached + def __init__(self, client): + Observable.__init__(self, log) + self._client = client + self._modules = {} + self._parser = None + self._websocket_stream_error = None - @classmethod - def get_instance(cls, *args, **kwargs): - """ - Factory Method for object creation + self._handlers = {} - Use this instead of directly initializing the class in order to make - unit testing much easier. - """ - return cls(*args, **kwargs) + self._id_callbacks = {} + self._dispatch_callback = None + self._timeout_id = None + self._stanza_types = { + 'iq': Iq, + 'message': Message, + 'presence': Presence, + 'error': StreamErrorNode, + } -class XMPPDispatcher(PlugIn): - """ - Handles XMPP stream and is the first who takes control over a fresh stanza + self.invalid_chars_re = get_invalid_xml_regex() - Is plugged into NonBlockingClient but can be replugged to restart handled - stream headers (used by SASL f.e.). - """ + self._register_namespace('unknown') + self._register_namespace(NS_STREAMS) + self._register_namespace(NS_CLIENT) + self._register_protocol('iq', Iq) + self._register_protocol('presence', Presence) + self._register_protocol('message', Message) - def __init__(self): - PlugIn.__init__(self) - self.handlers = {} - self._modules = {} - self._expected = {} - self._defaultHandler = None - self._pendingExceptions = [] - self._eventHandler = None - self._cycleHandlers = [] - self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler, - self.RegisterEventHandler, self.UnregisterCycleHandler, - self.RegisterCycleHandler, self.RegisterHandlerOnce, - self.UnregisterHandler, self.RegisterProtocol, - self.SendAndCallForResponse, - self.getAnID, self.Event, self.send, self.get_module] - - # \ufddo -> \ufdef range - c = '\ufdd0' - r = c - while c < '\ufdef': - c = chr(ord(c) + 1) - r += '|' + c - - # \ufffe-\uffff, \u1fffe-\u1ffff, ..., \u10fffe-\u10ffff - c = '\ufffe' - r += '|' + c - r += '|' + chr(ord(c) + 1) - while c < '\U0010fffe': - c = chr(ord(c) + 0x10000) - r += '|' + c - r += '|' + chr(ord(c) + 1) - - self.invalid_chars_re = re.compile(r) - - def getAnID(self): - return str(uuid.uuid4()) - - def dumpHandlers(self): - """ - Return set of user-registered callbacks in it's internal format. Used - within the library to carry user handlers set over Dispatcher replugins - """ - return self.handlers + self._register_modules() - def restoreHandlers(self, handlers): - """ - Restore user-registered callbacks structure from dump previously obtained - via dumpHandlers. Used within the library to carry user handlers set over - Dispatcher replugins. - """ - self.handlers = handlers + def set_dispatch_callback(self, callback): + log.info('Set dispatch callback: %s', callback) + self._dispatch_callback = callback def get_module(self, name): return self._modules[name] def _register_modules(self): - self._modules['BasePresence'] = BasePresence(self._owner) - self._modules['BaseMessage'] = BaseMessage(self._owner) - self._modules['BaseIq'] = BaseIq(self._owner) - self._modules['EME'] = EME(self._owner) - self._modules['HTTPAuth'] = HTTPAuth(self._owner) - self._modules['Nickname'] = Nickname(self._owner) - self._modules['MUC'] = MUC(self._owner) - self._modules['Delay'] = Delay(self._owner) - self._modules['Captcha'] = Captcha(self._owner) - self._modules['Idle'] = Idle(self._owner) - self._modules['PGPLegacy'] = PGPLegacy(self._owner) - self._modules['VCardAvatar'] = VCardAvatar(self._owner) - self._modules['EntityCaps'] = EntityCaps(self._owner) - self._modules['Blocking'] = Blocking(self._owner) - self._modules['PubSub'] = PubSub(self._owner) - self._modules['Mood'] = Mood(self._owner) - self._modules['Activity'] = Activity(self._owner) - self._modules['Tune'] = Tune(self._owner) - self._modules['Location'] = Location(self._owner) - self._modules['UserAvatar'] = UserAvatar(self._owner) - self._modules['Bookmarks'] = Bookmarks(self._owner) - self._modules['OpenPGP'] = OpenPGP(self._owner) - self._modules['OMEMO'] = OMEMO(self._owner) - self._modules['Annotations'] = Annotations(self._owner) - self._modules['Muclumbus'] = Muclumbus(self._owner) - self._modules['SoftwareVersion'] = SoftwareVersion(self._owner) - self._modules['AdHoc'] = AdHoc(self._owner) - self._modules['IBB'] = IBB(self._owner) - self._modules['Discovery'] = Discovery(self._owner) - self._modules['ChatMarkers'] = ChatMarkers(self._owner) - self._modules['Receipts'] = Receipts(self._owner) - self._modules['OOB'] = OOB(self._owner) - self._modules['Correction'] = Correction(self._owner) - self._modules['Attention'] = Attention(self._owner) - self._modules['SecurityLabels'] = SecurityLabels(self._owner) - self._modules['Chatstates'] = Chatstates(self._owner) - self._modules['Register'] = Register(self._owner) - self._modules['HTTPUpload'] = HTTPUpload(self._owner) + self._modules['BasePresence'] = BasePresence(self._client) + self._modules['BaseMessage'] = BaseMessage(self._client) + self._modules['BaseIq'] = BaseIq(self._client) + self._modules['EME'] = EME(self._client) + self._modules['HTTPAuth'] = HTTPAuth(self._client) + self._modules['Nickname'] = Nickname(self._client) + self._modules['MUC'] = MUC(self._client) + self._modules['Delay'] = Delay(self._client) + self._modules['Captcha'] = Captcha(self._client) + self._modules['Idle'] = Idle(self._client) + self._modules['PGPLegacy'] = PGPLegacy(self._client) + self._modules['VCardAvatar'] = VCardAvatar(self._client) + self._modules['EntityCaps'] = EntityCaps(self._client) + self._modules['Blocking'] = Blocking(self._client) + self._modules['PubSub'] = PubSub(self._client) + self._modules['Mood'] = Mood(self._client) + self._modules['Activity'] = Activity(self._client) + self._modules['Tune'] = Tune(self._client) + self._modules['Location'] = Location(self._client) + self._modules['UserAvatar'] = UserAvatar(self._client) + self._modules['Bookmarks'] = Bookmarks(self._client) + self._modules['OpenPGP'] = OpenPGP(self._client) + self._modules['OMEMO'] = OMEMO(self._client) + self._modules['Annotations'] = Annotations(self._client) + self._modules['Muclumbus'] = Muclumbus(self._client) + self._modules['SoftwareVersion'] = SoftwareVersion(self._client) + self._modules['AdHoc'] = AdHoc(self._client) + self._modules['IBB'] = IBB(self._client) + self._modules['Discovery'] = Discovery(self._client) + self._modules['ChatMarkers'] = ChatMarkers(self._client) + self._modules['Receipts'] = Receipts(self._client) + self._modules['OOB'] = OOB(self._client) + self._modules['Correction'] = Correction(self._client) + self._modules['Attention'] = Attention(self._client) + self._modules['SecurityLabels'] = SecurityLabels(self._client) + self._modules['Chatstates'] = Chatstates(self._client) + self._modules['Register'] = Register(self._client) + self._modules['HTTPUpload'] = HTTPUpload(self._client) for instance in self._modules.values(): for handler in instance.handlers: - self.RegisterHandler(*handler) + self.register_handler(*handler) - def _init(self): - """ - Register default namespaces/protocols/handlers. Used internally - """ - # FIXME: inject dependencies, do not rely that they are defined by our - # owner - self.RegisterNamespace('unknown') - self.RegisterNamespace(NS_STREAMS) - self.RegisterNamespace(self._owner.defaultNamespace) - self.RegisterProtocol('iq', Iq) - self.RegisterProtocol('presence', Presence) - self.RegisterProtocol('message', Message) - self.RegisterDefaultHandler(self.returnStanzaHandler) - self.RegisterEventHandler(self._owner._caller._event_dispatcher) - self._register_modules() + def reset_parser(self): + self._remove_timeout_source() + if self._parser is not None: + self._parser.dispatch = None + self._parser.destroy() + self._parser = None - def plugin(self, owner): - """ - Plug the Dispatcher instance into Client class instance and send initial - stream header. Used internally - """ - self._init() - self._owner.lastErrNode = None - self._owner.lastErr = None - self._owner.lastErrCode = None - if hasattr(self._owner, 'StreamInit'): - self._owner.StreamInit() - else: - self.StreamInit() - - def plugout(self): - """ - Prepare instance to be destructed - """ - self._modules = {} - self.Stream.dispatch = None - self.Stream.features = None - self.Stream.destroy() - self._owner = None - self.Stream = None + self._id_callbacks.clear() - def StreamInit(self): - """ - Send an initial stream header - """ - self._owner.Connection.sendqueue = [] - self.Stream = NodeBuilder() - self.Stream.dispatch = self.dispatch - self.Stream._dispatch_depth = 2 - self.Stream.stream_header_received = self._check_stream_start - self.Stream.features = None - self._metastream = Node('stream:stream') - self._metastream.setNamespace(self._owner.Namespace) - self._metastream.setAttr('version', '1.0') - self._metastream.setAttr('xmlns:stream', NS_STREAMS) - self._metastream.setAttr('to', self._owner.Server) - self._metastream.setAttr('xml:lang', self._owner.lang) - self._owner.send("%s%s>" % (XML_DECLARATION, str(self._metastream)[:-2])) - - def _check_stream_start(self, ns, tag, attrs): - if ns != NS_STREAMS or tag!='stream': - raise ValueError('Incorrect stream start: (%s,%s). Terminating.' - % (tag, ns)) + self._parser = NodeBuilder(dispatch_depth=2, + finished=False) + self._parser.dispatch = self.dispatch def replace_non_character(self, data): return re.sub(self.invalid_chars_re, '\ufffd', data) - def ProcessNonBlocking(self, data): - """ - Check incoming stream for data waiting + def process_data(self, data): + # Parse incoming data - :param data: data received from transports/IO sockets - :return: - 1) length of processed data if some data were processed; - 2) '0' string if no data were processed but link is alive; - 3) 0 (zero) if underlying connection is closed. - """ - # FIXME: - # When an error occurs we disconnect the transport directly. Client's - # disconnect method will never be called. - # Is this intended? - # also look at transports start_disconnect() data = self.replace_non_character(data) - for handler in self._cycleHandlers: - handler(self) - if len(self._pendingExceptions) > 0: - _pendingException = self._pendingExceptions.pop() - sys.excepthook(*_pendingException) + + if self._client.is_websocket: + stanza = Node(node=data) + if is_websocket_stream_error(stanza): + for tag in stanza.getChildren(): + name = tag.getName() + if name != 'text' and tag.getNamespace() == NS_XMPP_STREAMS: + self._websocket_stream_error = name + + elif is_websocket_close(stanza): + log.info('Stream <close> received') + self.notify('stream-end', self._websocket_stream_error) + return + + self.dispatch(stanza) return + try: - self.Stream.Parse(data) - # end stream:stream tag received - if self.Stream and self.Stream.has_received_endtag(): - self._owner.disconnect(self.Stream.streamError) - return 0 - except ExpatError as error: - log.error('Invalid XML received from server. Forcing disconnect.') - log.error(error) - self._owner.Connection.disconnect() - return 0 - except ValueError as e: - log.debug('ValueError: %s' % str(e)) - self._owner.Connection.pollend() - return 0 - if len(self._pendingExceptions) > 0: - _pendingException = self._pendingExceptions.pop() - sys.excepthook(*_pendingException) + self._parser.Parse(data) + except (ExpatError, ValueError) as error: + log.error('XML parsing error: %s', error) + self.notify('parsing-error', error) return - if len(data) == 0: - return '0' - return len(data) - def RegisterNamespace(self, xmlns, order='info'): - """ - Create internal structures for newly registered namespace - - You can register handlers for this namespace afterwards. By default - one namespace is already registered - (jabber:client or jabber:component:accept depending on context. - """ - log.debug('Registering namespace "%s"' % xmlns) - self.handlers[xmlns] = {} - self.RegisterProtocol('unknown', Protocol, xmlns=xmlns) - self.RegisterProtocol('default', Protocol, xmlns=xmlns) + # end stream:stream tag received + if self._parser.has_received_endtag(): + log.info('End of stream: %s', self._parser.streamError) + self.notify('stream-end', self._parser.streamError) + return - def RegisterProtocol(self, tag_name, proto, xmlns=None, order='info'): + def _register_namespace(self, xmlns): """ - Used to declare some top-level stanza name to dispatcher - - Needed to start registering handlers for such stanzas. Iq, message and - presence protocols are registered by default. + Setup handler structure for namespace """ - if not xmlns: - xmlns = self._owner.defaultNamespace - log.debug('Registering protocol "%s" as %s(%s)', tag_name, proto, xmlns) - self.handlers[xmlns][tag_name] = {'type': proto, 'default': []} + log.debug('Register namespace "%s"', xmlns) + self._handlers[xmlns] = {} + self._register_protocol('error', Protocol, xmlns=xmlns) + self._register_protocol('unknown', Protocol, xmlns=xmlns) + self._register_protocol('default', Protocol, xmlns=xmlns) - def RegisterNamespaceHandler(self, xmlns, handler, typ='', ns='', system=0): + def _register_protocol(self, tag_name, protocol, xmlns=None): """ - Register handler for processing all stanzas for specified namespace + Register protocol for top level tag names """ - self.RegisterHandler('default', handler, typ, ns, xmlns, system) + if xmlns is None: + xmlns = NS_CLIENT + log.debug('Register protocol "%s (%s)" as %s', + tag_name, xmlns, protocol) + self._handlers[xmlns][tag_name] = {'type': protocol, 'default': []} - def RegisterHandler(self, name, handler, typ='', ns='', xmlns=None, - system=False, priority=50): + def register_handler(self, name, handler, typ='', ns='', + xmlns=None, priority=50): """ - Register user callback as stanzas handler of declared type - - Callback arguments: - dispatcher instance (for replying), incoming return of previous handlers. - The callback must raise xmpp.NodeProcessed just before return if it wants - to prevent other callbacks to be called with the same stanza as argument - _and_, more importantly library from returning stanza to sender with error set. - - :param name: name of stanza. F.e. "iq". - :param handler: user callback. - :param typ: value of stanza's "type" attribute. If not specified any - value will match - :param ns: namespace of child that stanza must contain. - :param xmlns: xml namespace - :param system: call handler even if NodeProcessed Exception were raised - already. + Register handler + + :param name: name of top level tag, example: iq, message, presence + :param handler: callback + :param typ: value of stanza's "type" attribute. + If not specified any value will match + :param ns: Namespace of child that stanza must contain + :param xmlns: XML namespace, only needed if not jabber:client :param priority: The priority of the handler, higher get called later """ + if not xmlns: - xmlns = self._owner.defaultNamespace + xmlns = NS_CLIENT if not typ and not ns: typ = 'default' - log.debug( - 'Registering handler %s for "%s" type->%s ns->%s(%s) priority->%s', - handler, name, typ, ns, xmlns, priority) + log.debug('Register handler %s for "%s" type->%s ns->%s(%s) priority->%s', + handler, name, typ, ns, xmlns, priority) - if xmlns not in self.handlers: - self.RegisterNamespace(xmlns, 'warn') - if name not in self.handlers[xmlns]: - self.RegisterProtocol(name, Protocol, xmlns, 'warn') + if xmlns not in self._handlers: + self._register_namespace(xmlns) + if name not in self._handlers[xmlns]: + self._register_protocol(name, Protocol, xmlns) specific = typ + ns - if specific not in self.handlers[xmlns][name]: - self.handlers[xmlns][name][specific] = [] + if specific not in self._handlers[xmlns][name]: + self._handlers[xmlns][name][specific] = [] - self.handlers[xmlns][name][specific].append( + self._handlers[xmlns][name][specific].append( {'func': handler, - 'system': system, 'priority': priority, 'specific': specific}) - def RegisterHandlerOnce(self, name, handler, typ='', ns='', xmlns=None, - system=0): + def unregister_handler(self, name, handler, typ='', ns='', xmlns=None): """ - Unregister handler after first call (not implemented yet) + Unregister handler """ - # FIXME Drop or implement - if not xmlns: - xmlns = self._owner.defaultNamespace - self.RegisterHandler(name, handler, typ, ns, xmlns, system) - def UnregisterHandler(self, name, handler, typ='', ns='', xmlns=None): - """ - Unregister handler. "typ" and "ns" must be specified exactly the same as - with registering. - """ if not xmlns: - xmlns = self._owner.defaultNamespace + xmlns = NS_CLIENT + if not typ and not ns: typ = 'default' - if xmlns not in self.handlers: - return - if name not in self.handlers[xmlns]: - return specific = typ + ns - if specific not in self.handlers[xmlns][name]: + try: + self._handlers[xmlns][name][specific] + except KeyError: return - for handler_dict in self.handlers[xmlns][name][specific]: - if handler_dict['func'] == handler: - try: - self.handlers[xmlns][name][specific].remove(handler_dict) - log.debug( - 'Unregister handler %s for "%s" type->%s ns->%s(%s)', - handler, name, typ, ns, xmlns) - except ValueError: - log.warning( - 'Unregister failed: %s for "%s" type->%s ns->%s(%s)', - handler, name, typ, ns, xmlns) - pass - - def RegisterDefaultHandler(self, handler): - """ - Specify the handler that will be used if no NodeProcessed exception were - raised. This is returnStanzaHandler by default. - """ - self._defaultHandler = handler - def RegisterEventHandler(self, handler): - """ - Register handler that will process events. F.e. "FILERECEIVED" event. See - common/connection: _event_dispatcher() - """ - self._eventHandler = handler + for handler_dict in self._handlers[xmlns][name][specific]: + if handler_dict['func'] != handler: + return - def returnStanzaHandler(self, conn, stanza): + try: + self._handlers[xmlns][name][specific].remove(handler_dict) + except ValueError: + log.warning('Unregister failed: %s for "%s" type->%s ns->%s(%s)', + handler, name, typ, ns, xmlns) + else: + log.debug('Unregister handler %s for "%s" type->%s ns->%s(%s)', + handler, name, typ, ns, xmlns) + + def _default_handler(self, stanza): """ Return stanza back to the sender with <feature-not-implemented/> error - set """ if stanza.getType() in ('get', 'set'): - conn._owner.send(Error(stanza, ERR_FEATURE_NOT_IMPLEMENTED)) - - def RegisterCycleHandler(self, handler): - """ - Register handler that will be called on every Dispatcher.Process() call - """ - if handler not in self._cycleHandlers: - self._cycleHandlers.append(handler) - - def UnregisterCycleHandler(self, handler): - """ - Unregister handler that will be called on every Dispatcher.Process() call - """ - if handler in self._cycleHandlers: - self._cycleHandlers.remove(handler) - - def Event(self, realm, event, data=None): - """ - Raise some event - - :param realm: scope of event. Usually a namespace. - :param event: the event itself. F.e. "SUCCESSFUL SEND". - :param data: data that comes along with event. Depends on event. - """ - if self._eventHandler: - self._eventHandler(realm, event, data) - else: - log.warning('Received unhandled event: %s' % event) + self._client.send_stanza(Error(stanza, ERR_FEATURE_NOT_IMPLEMENTED)) def dispatch(self, stanza): - """ - Main procedure that performs XMPP stanza recognition and calling - apppropriate handlers for it. Called by simplexml - """ - - self.Event('', 'STANZA RECEIVED', stanza) - - self.Stream._mini_dom = None + self.notify('before-dispatch', stanza) + + if self._dispatch_callback is not None: + name = stanza.getName() + protocol_class = self._stanza_types.get(name) + if protocol_class is not None: + stanza = protocol_class(node=stanza) + self._dispatch_callback(stanza) + return # Count stanza - self._owner.Smacks.count_incoming(stanza.getName()) + self._client._smacks.count_incoming(stanza.getName()) name = stanza.getName() - if name == 'features': - self._owner.got_features = True - self.Stream.features = stanza - elif name == 'error': - if stanza.getTag('see-other-host'): - self._owner.got_see_other_host = stanza - xmlns = stanza.getNamespace() - if xmlns not in self.handlers: + if xmlns not in self._handlers: log.warning('Unknown namespace: %s', xmlns) xmlns = 'unknown' - # features stanza has been handled before - if name not in self.handlers[xmlns]: - if name not in ('features', 'stream'): - log.warning('Unknown stanza: %s', stanza) - else: - log.debug('Got %s / %s stanza', xmlns, name) + + if name not in self._handlers[xmlns]: + log.warning('Unknown stanza: %s', stanza) name = 'unknown' - else: - log.debug('Got %s / %s stanza', xmlns, name) # Convert simplexml to Protocol object try: - stanza = self.handlers[xmlns][name]['type'](node=stanza) + stanza = self._handlers[xmlns][name]['type'](node=stanza) except InvalidJid: log.warning('Invalid JID, ignoring stanza') log.warning(stanza) return - own_jid = self._owner.get_bound_jid() + own_jid = self._client.get_bound_jid() properties = get_properties_struct(name) if name == 'iq': @@ -605,154 +410,91 @@ class XMPPDispatcher(PlugIn): stanza.props = stanza.getProperties() log.debug('type: %s, properties: %s', typ, stanza.props) + # Process callbacks _id = stanza.getID() - processed = False - if _id in self._expected: - cb, args = self._expected[_id] - log.debug('Expected stanza arrived. Callback %s(%s) found', - cb, args) + func, _timeout, user_data = self._id_callbacks.pop( + _id, (None, None, {})) + if user_data is None: + user_data = {} + + if func is not None: try: - if args is None: - cb(self, stanza) - else: - cb(self, stanza, **args) - except NodeProcessed: - pass + func(self._client, stanza, **user_data) + except Exception: + log.exception('Error while handling stanza') return # Gather specifics depending on stanza properties specifics = ['default'] - if typ and typ in self.handlers[xmlns][name]: + if typ and typ in self._handlers[xmlns][name]: specifics.append(typ) for prop in stanza.props: - if prop in self.handlers[xmlns][name]: + if prop in self._handlers[xmlns][name]: specifics.append(prop) - if typ and typ + prop in self.handlers[xmlns][name]: + if typ and typ + prop in self._handlers[xmlns][name]: specifics.append(typ + prop) # Create the handler chain chain = [] - chain += self.handlers[xmlns]['default']['default'] + chain += self._handlers[xmlns]['default']['default'] for specific in specifics: - chain += self.handlers[xmlns][name][specific] + chain += self._handlers[xmlns][name][specific] # Sort chain with priority chain.sort(key=lambda x: x['priority']) for handler in chain: - if not processed or handler['system']: - try: - log.info('Call handler: %s', handler['func'].__qualname__) - # Backwards compatibility until all handlers support - # properties - signature = inspect.signature(handler['func']) - if len(signature.parameters) > 2: - handler['func'](self, stanza, properties) - else: - handler['func'](self, stanza) - except NodeProcessed: - processed = True - except Exception: - self._pendingExceptions.insert(0, sys.exc_info()) - return + log.info('Call handler: %s', handler['func'].__qualname__) + try: + handler['func'](self._client, stanza, properties) + except NodeProcessed: + return + except Exception: + log.exception('Handler exception:') + return # Stanza was not processed call default handler - if not processed and self._defaultHandler: - self._defaultHandler(self, stanza) - - def SendAndCallForResponse(self, stanza, func=None, args=None): - """ - Put stanza on the wire and call back when recipient replies. Additional - callback arguments can be specified in args - """ - _waitid = self.send(stanza) - self._expected[_waitid] = (func, args) - return _waitid - - def send(self, stanza, now=False): - """ - Wrap transports send method when plugged into NonBlockingClient. Makes - sure stanzas get ID and from tag. - """ - ID = None - if type(stanza) != str: - if isinstance(stanza, Protocol): - ID = stanza.getID() - if ID is None: - stanza.setID(self.getAnID()) - ID = stanza.getID() - if self._owner._registered_name and not stanza.getAttr('from'): - stanza.setAttr('from', self._owner._registered_name) - - self._owner.Connection.send(stanza, now) - - # If no ID then it is a whitespace - if hasattr(self._owner, 'Smacks') and ID: - self._owner.Smacks.save_in_queue(stanza) - - return ID - - -class BOSHDispatcher(XMPPDispatcher): - - def PlugIn(self, owner, after_SASL=False, old_features=None): - self.old_features = old_features - self.after_SASL = after_SASL - XMPPDispatcher.PlugIn(self, owner) - - def StreamInit(self): - """ - Send an initial stream header - """ - self.Stream = NodeBuilder() - self.Stream.dispatch = self.dispatch - self.Stream._dispatch_depth = 2 - self.Stream.stream_header_received = self._check_stream_start - self.Stream.features = self.old_features - - self._metastream = Node('stream:stream') - self._metastream.setNamespace(self._owner.Namespace) - self._metastream.setAttr('version', '1.0') - self._metastream.setAttr('xmlns:stream', NS_STREAMS) - self._metastream.setAttr('to', self._owner.Server) - self._metastream.setAttr('xml:lang', self._owner.lang) - - self.restart = True - self._owner.Connection.send_init(after_SASL=self.after_SASL) - - def StreamTerminate(self): - """ - Send a stream terminator - """ - self._owner.Connection.send_terminator() - - def ProcessNonBlocking(self, data=None): - if self.restart: - fromstream = self._metastream - fromstream.setAttr('from', fromstream.getAttr('to')) - fromstream.delAttr('to') - data = '%s%s>%s' % (XML_DECLARATION, str(fromstream)[:-2], data) - self.restart = False - return XMPPDispatcher.ProcessNonBlocking(self, data) - - def dispatch(self, stanza): - if stanza.getName() == 'body' and stanza.getNamespace() == NS_HTTP_BIND: - - stanza_attrs = stanza.getAttrs() - if 'authid' in stanza_attrs: - # should be only in init response - # auth module expects id of stream in document attributes - self.Stream._document_attrs['id'] = stanza_attrs['authid'] - self._owner.Connection.handle_body_attrs(stanza_attrs) - - children = stanza.getChildren() - if children: - for child in children: - # if child doesn't have any ns specified, simplexml (or expat) - # thinks it's of parent's (BOSH body) namespace, so we have to - # rewrite it to jabber:client - if child.getNamespace() == NS_HTTP_BIND: - child.setNamespace(self._owner.defaultNamespace) - XMPPDispatcher.dispatch(self, child) - else: - XMPPDispatcher.dispatch(self, stanza) + self._default_handler(stanza) + + def add_callback_for_id(self, id_, func, timeout, user_data): + if timeout is not None and self._timeout_id is None: + log.info('Add timeout source') + self._timeout_id = GLib.timeout_add_seconds( + 1, self._timeout_check) + timeout = time.monotonic() + timeout + self._id_callbacks[id_] = (func, timeout, user_data) + + def _timeout_check(self): + log.info('Run timeout check') + if not self._id_callbacks: + log.info('Remove timeout source, no callbacks scheduled') + self._timeout_id = None + return False + + for id_ in list(self._id_callbacks.keys()): + func, timeout, user_data = self._id_callbacks.get(id_) + if timeout is None: + continue + + if user_data is None: + user_data = {} + + if timeout < time.monotonic(): + self._id_callbacks.pop(id_) + func(self._client, StanzaTimeoutError(id_), **user_data) + return True + + def _remove_timeout_source(self): + if self._timeout_id is not None: + GLib.source_remove(self._timeout_id) + self._timeout_id = None + + def cleanup(self): + self._client = None + self._modules = {} + self._parser = None + self._id_callbacks.clear() + self._dispatch_callback = None + self._handlers.clear() + self._remove_timeout_source() + self.remove_subscriptions() diff --git a/nbxmpp/examples/client.py b/nbxmpp/examples/client.py new file mode 100644 index 0000000..82f6f5c --- /dev/null +++ b/nbxmpp/examples/client.py @@ -0,0 +1,281 @@ +#!/usr/bin/python3 + +import os +import logging +import json +from pathlib import Path + +import gi +gi.require_version('Gtk', '3.0') +gi.require_version('GLib', '2.0') +from gi.repository import Gtk +from gi.repository import GLib + +import nbxmpp +from nbxmpp.protocol import JID +from nbxmpp.client import Client +from nbxmpp.structs import ProxyData +from nbxmpp.addresses import ServerAddress +from nbxmpp.const import ConnectionType +from nbxmpp.const import ConnectionProtocol +from nbxmpp.const import StreamError + +consoleloghandler = logging.StreamHandler() +log = logging.getLogger('nbxmpp') +log.setLevel('INFO') +log.addHandler(consoleloghandler) + +formatter = logging.Formatter('%(asctime)s %(levelname)-7s %(name)-18s %(message)s', + datefmt='%H:%M:%S') +consoleloghandler.setFormatter(formatter) + + +class Builder: + def __init__(self, filename): + file_path = Path(__file__).resolve() + ui_file_path = file_path.parent / filename + self._builder = Gtk.Builder() + self._builder.add_from_file(str(ui_file_path)) + + def __getattr__(self, name): + try: + return getattr(self._builder, name) + except AttributeError: + return self._builder.get_object(name) + + +class StanzaRow(Gtk.ListBoxRow): + def __init__(self, stanza, incoming): + Gtk.ListBoxRow.__init__(self) + color = 'red' if incoming else 'blue' + if isinstance(stanza, bytes): + stanza = str(stanza) + if not isinstance(stanza, str): + stanza = stanza.__str__(fancy=True) + stanza = GLib.markup_escape_text(stanza) + label = Gtk.Label() + label.set_markup('<span foreground="%s">%s</span>' % (color, stanza)) + label.set_xalign(0) + label.set_halign(Gtk.Align.START) + self.add(label) + self.show_all() + + +class TestClient(Gtk.Window): + def __init__(self): + Gtk.Window.__init__(self, title='Test Client') + self.set_default_size(500, 500) + + self._builder = Builder('client.ui') + self._builder.connect_signals(self) + + self.add(self._builder.grid) + + self._client = None + self._scroll_timeout = None + self._create_paths() + self._load_config() + + def _create_client(self): + self._client = Client() + self._client.set_domain(self.address.getDomain()) + self._client.set_username(self.address.getNode()) + self._client.set_resource('test') + + proxy_ip = self._builder.proxy_ip.get_text() + if proxy_ip: + proxy_port = int(self._builder.proxy_port.get_text()) + proxy_host = '%s:%s' % (proxy_ip, proxy_port) + proxy = ProxyData(self._builder.proxy_type.get_active_text().lower(), + proxy_host, + self._builder.proxy_username.get_text() or None, + self._builder.proxy_password.get_text() or None) + self._client.set_proxy(proxy) + + self._client.set_connection_types(self._get_connection_types()) + self._client.set_protocols(self._get_connection_protocols()) + + self._client.set_password(self.password) + + self._client.subscribe('resume-failed', self._on_signal) + self._client.subscribe('resume-successful', self._on_signal) + self._client.subscribe('disconnected', self._on_signal) + self._client.subscribe('connection-lost', self._on_signal) + self._client.subscribe('connection-failed', self._on_signal) + self._client.subscribe('connected', self._on_connected) + + self._client.subscribe('stanza-sent', self._on_stanza_sent) + self._client.subscribe('stanza-received', self._on_stanza_received) + + self._client.register_handler('message', self._on_message) + + @property + def password(self): + return self._builder.password.get_text() + + @property + def address(self): + return JID(self._builder.address.get_text()) + + @property + def xml_box(self): + return self._builder.xml_box + + def scroll_to_end(self): + adj_v = self._builder.scrolledwin.get_vadjustment() + if adj_v is None: + # This can happen when the Widget is already destroyed when called + # from GLib.idle_add + self._scroll_timeout = None + return + max_scroll_pos = adj_v.get_upper() - adj_v.get_page_size() + adj_v.set_value(max_scroll_pos) + + adj_h = self._builder.scrolledwin.get_hadjustment() + adj_h.set_value(0) + self._scroll_timeout = None + + def _on_signal(self, _client, signal_name, *args, **kwargs): + log.info('%s, Error: %s', signal_name, self._client.get_error()) + if signal_name == 'disconnected': + if self._client.get_error() is None: + return + domain, error, text = self._client.get_error() + if domain == StreamError.BAD_CERTIFICATE: + self._client.set_ignore_tls_errors(True) + self._client.connect() + + def _on_connected(self, _client, _signal_name): + self.send_presence() + + def _on_message(self, _stream, stanza, _properties): + log.info('Message received') + log.info(stanza.getBody()) + + def _on_stanza_sent(self, _stream, _signal_name, data): + self.xml_box.add(StanzaRow(data, False)) + self._add_scroll_timeout() + + def _on_stanza_received(self, _stream, _signal_name, data): + self.xml_box.add(StanzaRow(data, True)) + self._add_scroll_timeout() + + def _add_scroll_timeout(self): + if self._scroll_timeout is not None: + return + self._scroll_timeout = GLib.timeout_add(50, self.scroll_to_end) + + def _connect_clicked(self, *args): + if self._client is None: + self._create_client() + + self._client.connect() + + def _disconnect_clicked(self, *args): + if self._client is not None: + self._client.disconnect() + + def _clear_clicked(self, *args): + self.xml_box.foreach(self._remove) + + def _on_reconnect_clicked(self, *args): + if self._client is not None: + self._client.reconnect() + + def _get_connection_types(self): + types = [] + if self._builder.directtls.get_active(): + types.append(ConnectionType.DIRECT_TLS) + if self._builder.starttls.get_active(): + types.append(ConnectionType.START_TLS) + if self._builder.plain.get_active(): + types.append(ConnectionType.PLAIN) + return types + + def _get_connection_protocols(self): + protocols = [] + if self._builder.tcp.get_active(): + protocols.append(ConnectionProtocol.TCP) + if self._builder.websocket.get_active(): + protocols.append(ConnectionProtocol.WEBSOCKET) + return protocols + + def _on_save_clicked(self, *args): + data = {} + data['jid'] = self._builder.address.get_text() + data['password'] = self._builder.password.get_text() + data['proxy_type'] = self._builder.proxy_type.get_active_text() + data['proxy_ip'] = self._builder.proxy_ip.get_text() + data['proxy_port'] = self._builder.proxy_port.get_text() + data['proxy_username'] = self._builder.proxy_username.get_text() + data['proxy_password'] = self._builder.proxy_password.get_text() + + data['directtls'] = self._builder.directtls.get_active() + data['starttls'] = self._builder.starttls.get_active() + data['plain'] = self._builder.plain.get_active() + data['tcp'] = self._builder.tcp.get_active() + data['websocket'] = self._builder.websocket.get_active() + + path = self._get_config_dir() / 'config' + with path.open('w') as fp: + json.dump(data, fp) + + def _load_config(self): + path = self._get_config_dir() / 'config' + if not path.exists(): + return + + with path.open('r') as fp: + data = json.load(fp) + + self._builder.address.set_text(data.get('jid', '')) + self._builder.password.set_text(data.get('password', '')) + self._builder.proxy_type.set_active_id(data.get('proxy_type', 'HTTP')) + self._builder.proxy_ip.set_text(data.get('proxy_ip', '')) + self._builder.proxy_port.set_text(data.get('proxy_port', '')) + self._builder.proxy_username.set_text(data.get('proxy_username', '')) + self._builder.proxy_password.set_text(data.get('proxy_password', '')) + + self._builder.directtls.set_active(data.get('directtls', False)) + self._builder.starttls.set_active(data.get('starttls', False)) + self._builder.plain.set_active(data.get('plain', False)) + self._builder.tcp.set_active(data.get('tcp', False)) + self._builder.websocket.set_active(data.get('websocket', False)) + + @staticmethod + def _get_config_dir(): + if os.name == 'nt': + return Path(os.path.join(os.environ['appdata'], 'nbxmpp')) + + expand = os.path.expanduser + base = os.getenv('XDG_CONFIG_HOME') + if base is None or base[0] != '/': + base = expand('~/.config') + return Path(os.path.join(base, 'nbxmpp')) + + def _create_paths(self): + path_ = self._get_config_dir() + if not path_.exists(): + for parent_path in reversed(path_.parents): + # Create all parent folders + # don't use mkdir(parent=True), as it ignores `mode` + # when creating the parents + if not parent_path.exists(): + print('creating %s directory' % parent_path) + parent_path.mkdir(mode=0o700) + print('creating %s directory' % path_) + path_.mkdir(mode=0o700) + + def _remove(self, item): + self.xml_box.remove(item) + item.destroy() + + def send_presence(self): + presence = nbxmpp.Presence() + self._client.send_stanza(presence) + + +win = TestClient() +win.connect("delete-event", Gtk.main_quit) +win.show_all() +Gtk.main() diff --git a/nbxmpp/examples/client.ui b/nbxmpp/examples/client.ui new file mode 100644 index 0000000..6744feb --- /dev/null +++ b/nbxmpp/examples/client.ui @@ -0,0 +1,469 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Generated with glade 3.22.1 --> +<interface> + <requires lib="gtk+" version="3.20"/> + <object class="GtkGrid" id="grid"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="margin_left">24</property> + <property name="margin_right">24</property> + <property name="margin_top">24</property> + <property name="margin_bottom">24</property> + <property name="row_spacing">12</property> + <child> + <object class="GtkButtonBox"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="spacing">12</property> + <property name="layout_style">start</property> + <child> + <object class="GtkButton"> + <property name="label" translatable="yes">Connect</property> + <property name="visible">True</property> + <property name="can_focus">True</property> + <property name="receives_default">True</property> + <signal name="clicked" handler="_connect_clicked" swapped="no"/> + <style> + <class name="suggested-action"/> + </style> + </object> + <packing> + <property name="expand">True</property> + <property name="fill">True</property> + <property name="position">0</property> + </packing> + </child> + <child> + <object class="GtkButton"> + <property name="label" translatable="yes">Disconnect</property> + <property name="visible">True</property> + <property name="can_focus">True</property> + <property name="receives_default">True</property> + <signal name="clicked" handler="_disconnect_clicked" swapped="no"/> + <style> + <class name="destructive-action"/> + </style> + </object> + <packing> + <property name="expand">True</property> + <property name="fill">True</property> + <property name="position">1</property> + </packing> + </child> + <child> + <object class="GtkButton"> + <property name="label" translatable="yes">Reconnect</property> + <property name="visible">True</property> + <property name="can_focus">True</property> + <property name="receives_default">True</property> + <signal name="clicked" handler="_on_reconnect_clicked" swapped="no"/> + </object> + <packing> + <property name="expand">True</property> + <property name="fill">True</property> + <property name="position">2</property> + </packing> + </child> + <child> + <object class="GtkButton" id="clear"> + <property name="label" translatable="yes">Clear</property> + <property name="visible">True</property> + <property name="can_focus">True</property> + <property name="receives_default">True</property> + <signal name="clicked" handler="_clear_clicked" swapped="no"/> + </object> + <packing> + <property name="expand">True</property> + <property name="fill">True</property> + <property name="position">3</property> + </packing> + </child> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">2</property> + </packing> + </child> + <child> + <object class="GtkGrid"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="hexpand">True</property> + <property name="row_spacing">6</property> + <property name="column_spacing">12</property> + <child> + <object class="GtkLabel"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="label" translatable="yes">XMPP-Address</property> + <property name="xalign">1</property> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">0</property> + </packing> + </child> + <child> + <object class="GtkLabel"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="label" translatable="yes">Password</property> + <property name="xalign">1</property> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">1</property> + </packing> + </child> + <child> + <object class="GtkEntry" id="address"> + <property name="visible">True</property> + <property name="can_focus">True</property> + <property name="valign">center</property> + <property name="hexpand">True</property> + </object> + <packing> + <property name="left_attach">1</property> + <property name="top_attach">0</property> + </packing> + </child> + <child> + <object class="GtkEntry" id="password"> + <property name="visible">True</property> + <property name="can_focus">True</property> + <property name="valign">center</property> + </object> + <packing> + <property name="left_attach">1</property> + <property name="top_attach">1</property> + </packing> + </child> + <child> + <object class="GtkLabel"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="label" translatable="yes">Proxy Host or IP</property> + <property name="xalign">1</property> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">3</property> + </packing> + </child> + <child> + <object class="GtkLabel"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="label" translatable="yes">Proxy Port</property> + <property name="xalign">1</property> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">4</property> + </packing> + </child> + <child> + <object class="GtkLabel"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="label" translatable="yes">Proxy Password</property> + <property name="xalign">1</property> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">6</property> + </packing> + </child> + <child> + <object class="GtkLabel"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="label" translatable="yes">Proxy Username</property> + <property name="xalign">1</property> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">5</property> + </packing> + </child> + <child> + <object class="GtkEntry" id="proxy_ip"> + <property name="visible">True</property> + <property name="can_focus">True</property> + </object> + <packing> + <property name="left_attach">1</property> + <property name="top_attach">3</property> + </packing> + </child> + <child> + <object class="GtkEntry" id="proxy_username"> + <property name="visible">True</property> + <property name="can_focus">True</property> + </object> + <packing> + <property name="left_attach">1</property> + <property name="top_attach">5</property> + </packing> + </child> + <child> + <object class="GtkEntry" id="proxy_port"> + <property name="visible">True</property> + <property name="can_focus">True</property> + </object> + <packing> + <property name="left_attach">1</property> + <property name="top_attach">4</property> + </packing> + </child> + <child> + <object class="GtkEntry" id="proxy_password"> + <property name="visible">True</property> + <property name="can_focus">True</property> + </object> + <packing> + <property name="left_attach">1</property> + <property name="top_attach">6</property> + </packing> + </child> + <child> + <object class="GtkLabel"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="label" translatable="yes">Save</property> + <property name="xalign">1</property> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">7</property> + </packing> + </child> + <child> + <object class="GtkButton"> + <property name="visible">True</property> + <property name="can_focus">True</property> + <property name="receives_default">True</property> + <property name="tooltip_text" translatable="yes">Save</property> + <property name="halign">start</property> + <signal name="clicked" handler="_on_save_clicked" swapped="no"/> + <child> + <object class="GtkImage"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="icon_name">document-save-symbolic</property> + </object> + </child> + </object> + <packing> + <property name="left_attach">1</property> + <property name="top_attach">7</property> + </packing> + </child> + <child> + <object class="GtkFrame"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="label_xalign">0</property> + <property name="shadow_type">none</property> + <child> + <object class="GtkAlignment"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="top_padding">12</property> + <property name="bottom_padding">12</property> + <property name="left_padding">12</property> + <child> + <object class="GtkGrid"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <child> + <object class="GtkCheckButton" id="directtls"> + <property name="label" translatable="yes">DIRECT TLS</property> + <property name="visible">True</property> + <property name="can_focus">True</property> + <property name="receives_default">False</property> + <property name="halign">start</property> + <property name="draw_indicator">True</property> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">0</property> + </packing> + </child> + <child> + <object class="GtkCheckButton" id="starttls"> + <property name="label" translatable="yes">START TLS</property> + <property name="visible">True</property> + <property name="can_focus">True</property> + <property name="receives_default">False</property> + <property name="halign">start</property> + <property name="draw_indicator">True</property> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">1</property> + </packing> + </child> + <child> + <object class="GtkCheckButton" id="plain"> + <property name="label" translatable="yes">PLAIN</property> + <property name="visible">True</property> + <property name="can_focus">True</property> + <property name="receives_default">False</property> + <property name="halign">start</property> + <property name="draw_indicator">True</property> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">2</property> + </packing> + </child> + </object> + </child> + </object> + </child> + <child type="label"> + <object class="GtkLabel"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="label" translatable="yes">Connection Type</property> + </object> + </child> + </object> + <packing> + <property name="left_attach">2</property> + <property name="top_attach">0</property> + <property name="height">3</property> + </packing> + </child> + <child> + <object class="GtkFrame"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="label_xalign">0</property> + <property name="shadow_type">none</property> + <child> + <object class="GtkAlignment"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="top_padding">12</property> + <property name="bottom_padding">12</property> + <property name="left_padding">12</property> + <child> + <object class="GtkGrid"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <child> + <object class="GtkCheckButton" id="tcp"> + <property name="label" translatable="yes">TCP</property> + <property name="visible">True</property> + <property name="can_focus">True</property> + <property name="receives_default">False</property> + <property name="halign">start</property> + <property name="draw_indicator">True</property> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">0</property> + </packing> + </child> + <child> + <object class="GtkCheckButton" id="websocket"> + <property name="label" translatable="yes">WEBSOCKET</property> + <property name="visible">True</property> + <property name="can_focus">True</property> + <property name="receives_default">False</property> + <property name="halign">start</property> + <property name="draw_indicator">True</property> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">1</property> + </packing> + </child> + </object> + </child> + </object> + </child> + <child type="label"> + <object class="GtkLabel"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="label" translatable="yes">Connection Protocol</property> + </object> + </child> + </object> + <packing> + <property name="left_attach">2</property> + <property name="top_attach">3</property> + <property name="height">3</property> + </packing> + </child> + <child> + <object class="GtkLabel"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="halign">end</property> + <property name="label" translatable="yes">Proxy Type</property> + <property name="xalign">1</property> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">2</property> + </packing> + </child> + <child> + <object class="GtkComboBoxText" id="proxy_type"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="halign">start</property> + <property name="active">0</property> + <items> + <item id="SOCKS5" translatable="yes">SOCKS5</item> + </items> + </object> + <packing> + <property name="left_attach">1</property> + <property name="top_attach">2</property> + </packing> + </child> + <child> + <placeholder/> + </child> + <child> + <placeholder/> + </child> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">0</property> + </packing> + </child> + <child> + <object class="GtkScrolledWindow" id="scrolledwin"> + <property name="visible">True</property> + <property name="can_focus">True</property> + <property name="hexpand">True</property> + <property name="vexpand">True</property> + <property name="shadow_type">in</property> + <child> + <object class="GtkViewport"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <child> + <object class="GtkListBox" id="xml_box"> + <property name="visible">True</property> + <property name="can_focus">False</property> + <property name="vexpand">True</property> + </object> + </child> + </object> + </child> + </object> + <packing> + <property name="left_attach">0</property> + <property name="top_attach">1</property> + </packing> + </child> + </object> +</interface> diff --git a/nbxmpp/exceptions.py b/nbxmpp/exceptions.py new file mode 100644 index 0000000..473bf0e --- /dev/null +++ b/nbxmpp/exceptions.py @@ -0,0 +1,24 @@ +# Copyright (C) 2019 Philipp Hörist <philipp AT hoerist.com> +# +# This file is part of nbxmpp. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 3 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; If not, see <http://www.gnu.org/licenses/>. + + +class EndOfConnection(Exception): + pass + + +class NonFatalSSLError(Exception): + pass diff --git a/nbxmpp/idlequeue.py b/nbxmpp/idlequeue.py index d20ac46..0ab6f18 100644 --- a/nbxmpp/idlequeue.py +++ b/nbxmpp/idlequeue.py @@ -285,7 +285,7 @@ class IdleQueue: """ Remove the read timeout """ - log.info('read timeout removed for fd %s' % fd) + log.debug('read timeout removed for fd %s' % fd) if fd in self.read_timeouts: if timeout: if timeout in self.read_timeouts[fd]: @@ -302,7 +302,7 @@ class IdleQueue: A filedescriptor fd can have several timeouts. """ - log_txt = 'read timeout set for fd %s on %s seconds' % (fd, seconds) + log_txt = 'read timeout set for fd %s on %i seconds' % (fd, seconds) if func: log_txt += ' with function ' + str(func) log.info(log_txt) diff --git a/nbxmpp/modules/activity.py b/nbxmpp/modules/activity.py index 7f48ba1..9e1f0b3 100644 --- a/nbxmpp/modules/activity.py +++ b/nbxmpp/modules/activity.py @@ -38,7 +38,7 @@ class Activity: priority=16), ] - def _process_pubsub_activity(self, _con, stanza, properties): + def _process_pubsub_activity(self, _client, stanza, properties): if not properties.is_pubsub_event: return diff --git a/nbxmpp/modules/attention.py b/nbxmpp/modules/attention.py index a0d2269..109bfde 100644 --- a/nbxmpp/modules/attention.py +++ b/nbxmpp/modules/attention.py @@ -34,7 +34,7 @@ class Attention: priority=15), ] - def _process_message_attention(self, _con, stanza, properties): + def _process_message_attention(self, _client, stanza, properties): attention = stanza.getTag('attention', namespace=NS_ATTENTION) if attention is None: return diff --git a/nbxmpp/modules/bookmarks.py b/nbxmpp/modules/bookmarks.py index 85ca237..5b6d8c3 100644 --- a/nbxmpp/modules/bookmarks.py +++ b/nbxmpp/modules/bookmarks.py @@ -78,7 +78,7 @@ class Bookmarks: self._node_configuration_in_progress = False self._node_configuration_not_possible = False - def _process_pubsub_bookmarks(self, _con, stanza, properties): + def _process_pubsub_bookmarks(self, _client, stanza, properties): if not properties.is_pubsub_event: return @@ -108,7 +108,7 @@ class Bookmarks: properties.pubsub_event = pubsub_event - def _process_pubsub_bookmarks2(self, _con, _stanza, properties): + def _process_pubsub_bookmarks2(self, _client, _stanza, properties): if not properties.is_pubsub_event: return @@ -415,6 +415,6 @@ class Bookmarks: return Iq('set', NS_PRIVATE, payload=storage_node) @staticmethod - def _on_private_store_result(_con, stanza): + def _on_private_store_result(_client, stanza): if not isResultNode(stanza): return raise_error(log.info, stanza) diff --git a/nbxmpp/modules/captcha.py b/nbxmpp/modules/captcha.py index 52bfc77..8e4f90a 100644 --- a/nbxmpp/modules/captcha.py +++ b/nbxmpp/modules/captcha.py @@ -38,7 +38,7 @@ class Captcha: ] @staticmethod - def _process_captcha(_con, stanza, properties): + def _process_captcha(_client, stanza, properties): captcha = stanza.getTag('captcha', namespace=NS_CAPTCHA) if captcha is None: return diff --git a/nbxmpp/modules/chat_markers.py b/nbxmpp/modules/chat_markers.py index 7d33dd6..eec8fa6 100644 --- a/nbxmpp/modules/chat_markers.py +++ b/nbxmpp/modules/chat_markers.py @@ -35,7 +35,7 @@ class ChatMarkers: ] @staticmethod - def _process_message_marker(_con, stanza, properties): + def _process_message_marker(_client, stanza, properties): type_ = stanza.getTag('received', namespace=NS_CHATMARKERS) if type_ is None: type_ = stanza.getTag('displayed', namespace=NS_CHATMARKERS) diff --git a/nbxmpp/modules/chatstates.py b/nbxmpp/modules/chatstates.py index 8304310..8f9869f 100644 --- a/nbxmpp/modules/chatstates.py +++ b/nbxmpp/modules/chatstates.py @@ -35,7 +35,7 @@ class Chatstates: priority=15), ] - def _process_message_chatstate(self, _con, stanza, properties): + def _process_message_chatstate(self, _client, stanza, properties): chatstate = parse_chatstate(stanza) if chatstate is None: return diff --git a/nbxmpp/modules/correction.py b/nbxmpp/modules/correction.py index d56bd29..68d6376 100644 --- a/nbxmpp/modules/correction.py +++ b/nbxmpp/modules/correction.py @@ -34,7 +34,7 @@ class Correction: priority=15), ] - def _process_message_correction(self, _con, stanza, properties): + def _process_message_correction(self, _client, stanza, properties): replace = stanza.getTag('replace', namespace=NS_CORRECT) if replace is None: return diff --git a/nbxmpp/modules/delay.py b/nbxmpp/modules/delay.py index cf440d6..859ad1a 100644 --- a/nbxmpp/modules/delay.py +++ b/nbxmpp/modules/delay.py @@ -38,7 +38,7 @@ class Delay: priority=15) ] - def _process_message_delay(self, _con, stanza, properties): + def _process_message_delay(self, _client, stanza, properties): if properties.is_muc_subject: # MUC Subjects can have a delay timestamp # to indicate when the user has set the subject, @@ -65,7 +65,7 @@ class Delay: properties.user_timestamp = parse_delay(stanza, not_from=jids) @staticmethod - def _process_presence_delay(_con, stanza, properties): + def _process_presence_delay(_client, stanza, properties): properties.user_timestamp = parse_delay(stanza) diff --git a/nbxmpp/modules/eme.py b/nbxmpp/modules/eme.py index c585a2a..84b1d3e 100644 --- a/nbxmpp/modules/eme.py +++ b/nbxmpp/modules/eme.py @@ -35,7 +35,7 @@ class EME: ] @staticmethod - def _process_eme(_con, stanza, properties): + def _process_eme(_client, stanza, properties): encryption = stanza.getTag('encryption', namespace=NS_EME) if encryption is None: return diff --git a/nbxmpp/modules/entity_caps.py b/nbxmpp/modules/entity_caps.py index 4eee6ab..dd04ee2 100644 --- a/nbxmpp/modules/entity_caps.py +++ b/nbxmpp/modules/entity_caps.py @@ -35,7 +35,7 @@ class EntityCaps: ] @staticmethod - def _process_entity_caps(_con, stanza, properties): + def _process_entity_caps(_client, stanza, properties): caps = stanza.getTag('c', namespace=NS_CAPS) if caps is None: properties.entity_caps = EntityCapsData() diff --git a/nbxmpp/modules/http_auth.py b/nbxmpp/modules/http_auth.py index fb78c46..5488f77 100644 --- a/nbxmpp/modules/http_auth.py +++ b/nbxmpp/modules/http_auth.py @@ -40,7 +40,7 @@ class HTTPAuth: ] @staticmethod - def _process_http_auth(_con, stanza, properties): + def _process_http_auth(_client, stanza, properties): confirm = stanza.getTag('confirm', namespace=NS_HTTP_AUTH) if confirm is None: return diff --git a/nbxmpp/modules/ibb.py b/nbxmpp/modules/ibb.py index e70721c..402f5eb 100644 --- a/nbxmpp/modules/ibb.py +++ b/nbxmpp/modules/ibb.py @@ -46,7 +46,7 @@ class IBB: priority=20), ] - def _process_ibb(self, _con, stanza, properties): + def _process_ibb(self, _client, stanza, properties): if properties.type.is_set: open_ = stanza.getTag('open', namespace=NS_IBB) if open_ is not None: @@ -70,23 +70,24 @@ class IBB: except Exception as error: log.warning(error) log.warning(stanza) - self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST)) + self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST)) raise NodeProcessed if block_size > 65535: log.warning('Invalid block-size') - self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST)) + self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST)) raise NodeProcessed sid = attrs.get('sid') if not sid: log.warning('Invalid sid') - self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST)) + self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST)) raise NodeProcessed type_ = attrs.get('stanza') if type_ == 'message': - self._client.send(ErrorStanza(stanza, ERR_FEATURE_NOT_IMPLEMENTED)) + self._client.send_stanza(ErrorStanza(stanza, + ERR_FEATURE_NOT_IMPLEMENTED)) raise NodeProcessed return IBBData(type='open', block_size=block_size, sid=sid) @@ -95,7 +96,7 @@ class IBB: sid = close.getAttrs().get('sid') if sid is None: log.warning('Invalid sid') - self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST)) + self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST)) raise NodeProcessed return IBBData(type='close', sid=sid) @@ -105,21 +106,21 @@ class IBB: sid = attrs.get('sid') if sid is None: log.warning('Invalid sid') - self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST)) + self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST)) raise NodeProcessed try: seq = int(attrs.get('seq')) except Exception: log.exception('Invalid seq') - self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST)) + self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST)) raise NodeProcessed try: decoded_data = b64decode(data.getData(), return_type=bytes) except Exception: log.exception('Failed to decode IBB data') - self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST)) + self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST)) raise NodeProcessed return IBBData(type='data', sid=sid, seq=seq, data=decoded_data) @@ -130,7 +131,7 @@ class IBB: reply.getChildren().clear() else: reply = ErrorStanza(stanza, error) - self._client.send(reply) + self._client.send_stanza(reply) @call_on_response('_default_response') def send_open(self, jid, sid, block_size): diff --git a/nbxmpp/modules/idle.py b/nbxmpp/modules/idle.py index 5f5d6db..81870f2 100644 --- a/nbxmpp/modules/idle.py +++ b/nbxmpp/modules/idle.py @@ -35,7 +35,7 @@ class Idle: ] @staticmethod - def _process_idle(_con, stanza, properties): + def _process_idle(_client, stanza, properties): idle_tag = stanza.getTag('idle', namespace=NS_IDLE) if idle_tag is None: return diff --git a/nbxmpp/modules/iq.py b/nbxmpp/modules/iq.py index e699c75..35b92ea 100644 --- a/nbxmpp/modules/iq.py +++ b/nbxmpp/modules/iq.py @@ -36,13 +36,13 @@ class BaseIq: priority=10), ] - def _process_iq_base(self, _con, stanza, properties): + def _process_iq_base(self, _client, stanza, properties): try: properties.type = IqType(stanza.getType()) except ValueError: log.warning('Message with invalid type: %s', stanza.getType()) log.warning(stanza) - self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST)) + self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST)) raise NodeProcessed properties.jid = stanza.getFrom() diff --git a/nbxmpp/modules/location.py b/nbxmpp/modules/location.py index 5fe2252..88ac35d 100644 --- a/nbxmpp/modules/location.py +++ b/nbxmpp/modules/location.py @@ -37,7 +37,7 @@ class Location: priority=16), ] - def _process_pubsub_location(self, _con, _stanza, properties): + def _process_pubsub_location(self, _client, _stanza, properties): if not properties.is_pubsub_event: return diff --git a/nbxmpp/modules/message.py b/nbxmpp/modules/message.py index 8a708eb..23c4022 100644 --- a/nbxmpp/modules/message.py +++ b/nbxmpp/modules/message.py @@ -41,7 +41,7 @@ class BaseMessage: priority=10), ] - def _process_message_base(self, _con, stanza, properties): + def _process_message_base(self, _client, stanza, properties): properties.type = self._parse_type(stanza) # Determine remote JID @@ -72,7 +72,7 @@ class BaseMessage: properties.error = error_factory(stanza) @staticmethod - def _process_message_after_base(_con, stanza, properties): + def _process_message_after_base(_client, stanza, properties): # This handler runs after decryption handlers had the chance # to decrypt the body properties.body = stanza.getBody() diff --git a/nbxmpp/modules/mood.py b/nbxmpp/modules/mood.py index 00cd755..7f02025 100644 --- a/nbxmpp/modules/mood.py +++ b/nbxmpp/modules/mood.py @@ -38,7 +38,7 @@ class Mood: priority=16), ] - def _process_pubsub_mood(self, _con, stanza, properties): + def _process_pubsub_mood(self, _client, stanza, properties): if not properties.is_pubsub_event: return diff --git a/nbxmpp/modules/muc.py b/nbxmpp/modules/muc.py index 6df191d..28c96e1 100644 --- a/nbxmpp/modules/muc.py +++ b/nbxmpp/modules/muc.py @@ -94,7 +94,7 @@ class MUC: ] @staticmethod - def _process_muc_presence(_con, stanza, properties): + def _process_muc_presence(_client, stanza, properties): muc = stanza.getTag('x', namespace=NS_MUC) if muc is None: return @@ -103,7 +103,7 @@ class MUC: properties.muc_jid.setBare() properties.muc_nickname = properties.jid.getResource() - def _process_muc_user_presence(self, _con, stanza, properties): + def _process_muc_user_presence(self, _client, stanza, properties): muc_user = stanza.getTag('x', namespace=NS_MUC_USER) if muc_user is None: return @@ -162,7 +162,7 @@ class MUC: properties.muc_user = self._parse_muc_user(muc_user) - def _process_groupchat_message(self, _con, stanza, properties): + def _process_groupchat_message(self, _client, stanza, properties): properties.from_muc = True properties.muc_jid = properties.jid.copy() properties.muc_jid.setBare() @@ -179,7 +179,7 @@ class MUC: properties.muc_ofrom = JID(address.getAttr('jid')) @staticmethod - def _process_message(_con, stanza, properties): + def _process_message(_client, stanza, properties): muc_user = stanza.getTag('x', namespace=NS_MUC_USER) if muc_user is None: return @@ -230,7 +230,7 @@ class MUC: properties.muc_status_codes = codes @staticmethod - def _process_direct_invite(_con, stanza, properties): + def _process_direct_invite(_client, stanza, properties): direct = stanza.getTag('x', namespace=NS_CONFERENCE) if direct is None: return @@ -252,7 +252,7 @@ class MUC: properties.muc_invite = InviteData(**data) @staticmethod - def _process_mediated_invite(_con, stanza, properties): + def _process_mediated_invite(_client, stanza, properties): muc_user = stanza.getTag('x', namespace=NS_MUC_USER) if muc_user is None: return @@ -292,7 +292,7 @@ class MUC: return @staticmethod - def _process_voice_request(_con, stanza, properties): + def _process_voice_request(_client, stanza, properties): data_form = stanza.getTag('x', namespace=NS_DATA) if data_form is None: return @@ -324,7 +324,7 @@ class MUC: form = voice_request.form form.type_ = 'submit' form['muc#request_allow'].value = True - self._client.send(Message(to=muc_jid, payload=form)) + self._client.send_stanza(Message(to=muc_jid, payload=form)) @call_on_response('_affiliation_received') def get_affiliation(self, jid, affiliation): @@ -451,7 +451,7 @@ class MUC: def set_subject(self, room_jid, subject): message = Message(room_jid, typ='groupchat', subject=subject) log.info('Set subject for %s', room_jid) - self._client.send(message) + self._client.send_stanza(message) def decline(self, room, to, reason=None): message = Message(to=room) @@ -459,7 +459,7 @@ class MUC: decline = muc_user.addChild('decline', attrs={'to': to}) if reason: decline.setTagData('reason', reason) - self._client.send(message) + self._client.send_stanza(message) def request_voice(self, room): message = Message(to=room) @@ -470,7 +470,7 @@ class MUC: value='participant', typ='text-single')) message.addChild(node=xdata) - self._client.send(message) + self._client.send_stanza(message) def invite(self, room, to, password, reason=None, continue_=False, type_=InviteType.MEDIATED): @@ -480,7 +480,7 @@ class MUC: else: invite = self._build_mediated_invite( room, to, reason, password, continue_) - self._client.send(invite) + self._client.send_stanza(invite) @staticmethod def _build_direct_invite(room, to, reason, password, continue_): @@ -520,7 +520,7 @@ class MUC: message = Message(typ='error', to=room_jid) message.setID(message_id) message.setError(ERR_NOT_ACCEPTABLE) - self._client.send(message) + self._client.send_stanza(message) @callback def _default_response(self, stanza): diff --git a/nbxmpp/modules/muclumbus.py b/nbxmpp/modules/muclumbus.py index e68bfcf..e6a71eb 100644 --- a/nbxmpp/modules/muclumbus.py +++ b/nbxmpp/modules/muclumbus.py @@ -18,6 +18,10 @@ import logging import json +import gi +gi.require_version('Soup', '2.4') +from gi.repository import Soup + from nbxmpp.protocol import NS_MUCLUMBUS from nbxmpp.protocol import NS_DATA from nbxmpp.protocol import NS_RSM @@ -32,14 +36,6 @@ from nbxmpp.util import call_on_response from nbxmpp.util import callback from nbxmpp.util import raise_error -try: - import gi - gi.require_version('Soup', '2.4') - from gi.repository import Soup - SOUP_AVAILABLE = True -except (ValueError, ImportError): - SOUP_AVAILABLE = False - log = logging.getLogger('nbxmpp.m.muclumbus') # API Documentation @@ -50,9 +46,14 @@ class Muclumbus: self._client = client self.handlers = [] - self._soup_session = None - if SOUP_AVAILABLE: - self._soup_session = Soup.Session() + self._proxy_resolver = None + self._soup_session = Soup.Session() + + def set_proxy(self, proxy): + if proxy is None: + return + self._proxy_resolver = proxy.get_resolver() + self._soup_session.props.proxy_resolver = self._proxy_resolver @call_on_response('_parameters_received') def request_parameters(self, jid): @@ -90,9 +91,6 @@ class Muclumbus: def set_http_search(self, uri, keywords, after=None, callback=None, user_data=None): - if not SOUP_AVAILABLE: - raise ImportError('Module Soup not found') - search = {'keywords': keywords} if after is not None: search['after'] = after diff --git a/nbxmpp/modules/nickname.py b/nbxmpp/modules/nickname.py index 0262a64..1da02cb 100644 --- a/nbxmpp/modules/nickname.py +++ b/nbxmpp/modules/nickname.py @@ -44,7 +44,7 @@ class Nickname: priority=40), ] - def _process_nickname(self, _con, stanza, properties): + def _process_nickname(self, _client, stanza, properties): if stanza.getName() == 'message': properties.nickname = self._parse_nickname(stanza) @@ -57,7 +57,7 @@ class Nickname: return properties.nickname = self._parse_nickname(stanza) - def _process_pubsub_nickname(self, _con, _stanza, properties): + def _process_pubsub_nickname(self, _client, _stanza, properties): if not properties.is_pubsub_event: return diff --git a/nbxmpp/modules/omemo.py b/nbxmpp/modules/omemo.py index 859e37e..ba24d42 100644 --- a/nbxmpp/modules/omemo.py +++ b/nbxmpp/modules/omemo.py @@ -55,7 +55,7 @@ class OMEMO: priority=7), ] - def _process_omemo_message(self, _con, stanza, properties): + def _process_omemo_message(self, _client, stanza, properties): try: properties.omemo = self._parse_omemo_message(stanza) log.info('Received message') @@ -126,7 +126,7 @@ class OMEMO: return OMEMOMessage(sid=sid, iv=iv, keys=keys, payload=payload) - def _process_omemo_devicelist(self, _con, stanza, properties): + def _process_omemo_devicelist(self, _client, stanza, properties): if not properties.is_pubsub_event: return diff --git a/nbxmpp/modules/oob.py b/nbxmpp/modules/oob.py index c0de07f..0b1f210 100644 --- a/nbxmpp/modules/oob.py +++ b/nbxmpp/modules/oob.py @@ -34,7 +34,7 @@ class OOB: priority=15), ] - def _process_message_oob(self, _con, stanza, properties): + def _process_message_oob(self, _client, stanza, properties): oob = stanza.getTag('x', namespace=NS_X_OOB) if oob is None: return diff --git a/nbxmpp/modules/openpgp.py b/nbxmpp/modules/openpgp.py index 8079098..0b2978d 100644 --- a/nbxmpp/modules/openpgp.py +++ b/nbxmpp/modules/openpgp.py @@ -59,7 +59,7 @@ class OpenPGP: priority=7), ] - def _process_openpgp_message(self, _con, stanza, properties): + def _process_openpgp_message(self, _client, stanza, properties): openpgp = stanza.getTag('openpgp', namespace=NS_OPENPGP) if openpgp is None: log.warning('No openpgp node found') @@ -80,7 +80,7 @@ class OpenPGP: log.warning(stanza) return - def _process_pubsub_openpgp(self, _con, stanza, properties): + def _process_pubsub_openpgp(self, _client, stanza, properties): if not properties.is_pubsub_event: return diff --git a/nbxmpp/modules/pgplegacy.py b/nbxmpp/modules/pgplegacy.py index 256b89f..0f6f458 100644 --- a/nbxmpp/modules/pgplegacy.py +++ b/nbxmpp/modules/pgplegacy.py @@ -39,7 +39,7 @@ class PGPLegacy: ] @staticmethod - def _process_signed(_con, stanza, properties): + def _process_signed(_client, stanza, properties): signed = stanza.getTag('x', namespace=NS_SIGNED) if signed is None: return @@ -47,7 +47,7 @@ class PGPLegacy: properties.signed = signed.getData() @staticmethod - def _process_pgplegacy_message(_con, stanza, properties): + def _process_pgplegacy_message(_client, stanza, properties): pgplegacy = stanza.getTag('x', namespace=NS_ENCRYPTED) if pgplegacy is None: log.warning('No x node found') diff --git a/nbxmpp/modules/presence.py b/nbxmpp/modules/presence.py index f9e3bc2..bebb2fa 100644 --- a/nbxmpp/modules/presence.py +++ b/nbxmpp/modules/presence.py @@ -37,7 +37,7 @@ class BasePresence: priority=10), ] - def _process_presence_base(self, _con, stanza, properties): + def _process_presence_base(self, _client, stanza, properties): properties.type = self._parse_type(stanza) properties.priority = self._parse_priority(stanza) properties.show = self._parse_show(stanza) @@ -79,7 +79,7 @@ class BasePresence: except ValueError: log.warning('Presence with invalid type received') log.warning(stanza) - self._client.send(ErrorStanza(stanza, ERR_BAD_REQUEST)) + self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST)) raise NodeProcessed @staticmethod diff --git a/nbxmpp/modules/pubsub.py b/nbxmpp/modules/pubsub.py index 2f8d1a2..3967568 100644 --- a/nbxmpp/modules/pubsub.py +++ b/nbxmpp/modules/pubsub.py @@ -50,7 +50,7 @@ class PubSub: priority=15), ] - def _process_pubsub_base(self, _con, stanza, properties): + def _process_pubsub_base(self, _client, stanza, properties): properties.pubsub = True event = stanza.getTag('event', namespace=NS_PUBSUB_EVENT) diff --git a/nbxmpp/modules/receipts.py b/nbxmpp/modules/receipts.py index 329e88a..2056efe 100644 --- a/nbxmpp/modules/receipts.py +++ b/nbxmpp/modules/receipts.py @@ -38,7 +38,7 @@ class Receipts: priority=15), ] - def _process_message_receipt(self, _con, stanza, properties): + def _process_message_receipt(self, _client, stanza, properties): request = stanza.getTag('request', namespace=NS_RECEIPTS) if request is not None: properties.receipt = ReceiptData(request.getName()) diff --git a/nbxmpp/modules/security_labels.py b/nbxmpp/modules/security_labels.py index 04e119d..ac29d11 100644 --- a/nbxmpp/modules/security_labels.py +++ b/nbxmpp/modules/security_labels.py @@ -35,7 +35,7 @@ class SecurityLabels: priority=15), ] - def _process_message_security_label(self, _con, stanza, properties): + def _process_message_security_label(self, _client, stanza, properties): security = stanza.getTag('securitylabel', namespace=NS_SECLABEL) if security is None: return diff --git a/nbxmpp/modules/software_version.py b/nbxmpp/modules/software_version.py index c23188c..23d06b5 100644 --- a/nbxmpp/modules/software_version.py +++ b/nbxmpp/modules/software_version.py @@ -101,5 +101,5 @@ class SoftwareVersion: log.info('Send software version: %s %s %s', self._name, self._version, self._os) - self._client.send(iq) + self._client.send_stanza(iq) raise NodeProcessed diff --git a/nbxmpp/modules/tune.py b/nbxmpp/modules/tune.py index 6ce4b0f..15bc610 100644 --- a/nbxmpp/modules/tune.py +++ b/nbxmpp/modules/tune.py @@ -37,7 +37,7 @@ class Tune: priority=16), ] - def _process_pubsub_tune(self, _con, _stanza, properties): + def _process_pubsub_tune(self, _client, _stanza, properties): if not properties.is_pubsub_event: return diff --git a/nbxmpp/modules/user_avatar.py b/nbxmpp/modules/user_avatar.py index fb79c96..b7a33ee 100644 --- a/nbxmpp/modules/user_avatar.py +++ b/nbxmpp/modules/user_avatar.py @@ -45,7 +45,7 @@ class UserAvatar: priority=16), ] - def _process_pubsub_avatar(self, _con, stanza, properties): + def _process_pubsub_avatar(self, _client, stanza, properties): if not properties.is_pubsub_event: return diff --git a/nbxmpp/modules/vcard_avatar.py b/nbxmpp/modules/vcard_avatar.py index c47d5ed..6e929be 100644 --- a/nbxmpp/modules/vcard_avatar.py +++ b/nbxmpp/modules/vcard_avatar.py @@ -36,7 +36,7 @@ class VCardAvatar: ] @staticmethod - def _process_avatar(_con, stanza, properties): + def _process_avatar(_client, stanza, properties): if properties.type != PresenceType.AVAILABLE: return diff --git a/nbxmpp/old_dispatcher.py b/nbxmpp/old_dispatcher.py new file mode 100644 index 0000000..60f68d5 --- /dev/null +++ b/nbxmpp/old_dispatcher.py @@ -0,0 +1,758 @@ +## dispatcher.py +## +## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov +## modified by Dimitur Kirov <dkirov@gmail.com> +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2, or (at your option) +## any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. + + +""" +Main xmpp decision making logic. Provides library with methods to assign +different handlers to different XMPP stanzas and namespaces +""" + +import sys +import locale +import re +import uuid +import logging +import inspect +from xml.parsers.expat import ExpatError + +from nbxmpp.simplexml import NodeBuilder +from nbxmpp.plugin import PlugIn +from nbxmpp.protocol import NS_STREAMS +from nbxmpp.protocol import NS_HTTP_BIND +from nbxmpp.protocol import NodeProcessed +from nbxmpp.protocol import InvalidFrom +from nbxmpp.protocol import InvalidJid +from nbxmpp.protocol import InvalidStanza +from nbxmpp.protocol import Iq +from nbxmpp.protocol import Presence +from nbxmpp.protocol import Message +from nbxmpp.protocol import Protocol +from nbxmpp.protocol import Node +from nbxmpp.protocol import Error +from nbxmpp.protocol import ERR_FEATURE_NOT_IMPLEMENTED +from nbxmpp.modules.eme import EME +from nbxmpp.modules.http_auth import HTTPAuth +from nbxmpp.modules.presence import BasePresence +from nbxmpp.modules.message import BaseMessage +from nbxmpp.modules.iq import BaseIq +from nbxmpp.modules.nickname import Nickname +from nbxmpp.modules.delay import Delay +from nbxmpp.modules.muc import MUC +from nbxmpp.modules.idle import Idle +from nbxmpp.modules.pgplegacy import PGPLegacy +from nbxmpp.modules.vcard_avatar import VCardAvatar +from nbxmpp.modules.captcha import Captcha +from nbxmpp.modules.entity_caps import EntityCaps +from nbxmpp.modules.blocking import Blocking +from nbxmpp.modules.pubsub import PubSub +from nbxmpp.modules.activity import Activity +from nbxmpp.modules.tune import Tune +from nbxmpp.modules.mood import Mood +from nbxmpp.modules.location import Location +from nbxmpp.modules.user_avatar import UserAvatar +from nbxmpp.modules.bookmarks import Bookmarks +from nbxmpp.modules.openpgp import OpenPGP +from nbxmpp.modules.omemo import OMEMO +from nbxmpp.modules.annotations import Annotations +from nbxmpp.modules.muclumbus import Muclumbus +from nbxmpp.modules.software_version import SoftwareVersion +from nbxmpp.modules.adhoc import AdHoc +from nbxmpp.modules.ibb import IBB +from nbxmpp.modules.discovery import Discovery +from nbxmpp.modules.chat_markers import ChatMarkers +from nbxmpp.modules.receipts import Receipts +from nbxmpp.modules.oob import OOB +from nbxmpp.modules.correction import Correction +from nbxmpp.modules.attention import Attention +from nbxmpp.modules.security_labels import SecurityLabels +from nbxmpp.modules.chatstates import Chatstates +from nbxmpp.modules.register import Register +from nbxmpp.modules.http_upload import HTTPUpload +from nbxmpp.modules.misc import unwrap_carbon +from nbxmpp.modules.misc import unwrap_mam +from nbxmpp.util import get_properties_struct + + +log = logging.getLogger('nbxmpp.dispatcher') + +#: default timeout to wait for response for our id +DEFAULT_TIMEOUT_SECONDS = 25 + +XML_DECLARATION = '<?xml version=\'1.0\'?>' + +# FIXME: ugly +class Dispatcher: + """ + Why is this here - I needed to redefine Dispatcher for BOSH and easiest way + was to inherit original Dispatcher (now renamed to XMPPDispatcher). Trouble + is that reference used to access dispatcher instance is in Client attribute + named by __class__.__name__ of the dispatcher instance .. long story short: + + I wrote following to avoid changing each client.Dispatcher.whatever() in xmpp + + If having two kinds of dispatcher will go well, I will rewrite the dispatcher + references in other scripts + """ + + def PlugIn(self, client_obj, after_SASL=False, old_features=None): + if client_obj.protocol_type == 'XMPP': + XMPPDispatcher().PlugIn(client_obj) + elif client_obj.protocol_type == 'BOSH': + BOSHDispatcher().PlugIn(client_obj, after_SASL, old_features) + else: + assert False # should never be reached + + @classmethod + def get_instance(cls, *args, **kwargs): + """ + Factory Method for object creation + + Use this instead of directly initializing the class in order to make + unit testing much easier. + """ + return cls(*args, **kwargs) + + +class XMPPDispatcher(PlugIn): + """ + Handles XMPP stream and is the first who takes control over a fresh stanza + + Is plugged into NonBlockingClient but can be replugged to restart handled + stream headers (used by SASL f.e.). + """ + + def __init__(self): + PlugIn.__init__(self) + self.handlers = {} + self._modules = {} + self._expected = {} + self._defaultHandler = None + self._pendingExceptions = [] + self._eventHandler = None + self._cycleHandlers = [] + self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler, + self.RegisterEventHandler, self.UnregisterCycleHandler, + self.RegisterCycleHandler, self.RegisterHandlerOnce, + self.UnregisterHandler, self.RegisterProtocol, + self.SendAndCallForResponse, + self.getAnID, self.Event, self.send, self.get_module] + + # \ufddo -> \ufdef range + c = '\ufdd0' + r = c + while c < '\ufdef': + c = chr(ord(c) + 1) + r += '|' + c + + # \ufffe-\uffff, \u1fffe-\u1ffff, ..., \u10fffe-\u10ffff + c = '\ufffe' + r += '|' + c + r += '|' + chr(ord(c) + 1) + while c < '\U0010fffe': + c = chr(ord(c) + 0x10000) + r += '|' + c + r += '|' + chr(ord(c) + 1) + + self.invalid_chars_re = re.compile(r) + + def getAnID(self): + return str(uuid.uuid4()) + + def dumpHandlers(self): + """ + Return set of user-registered callbacks in it's internal format. Used + within the library to carry user handlers set over Dispatcher replugins + """ + return self.handlers + + def restoreHandlers(self, handlers): + """ + Restore user-registered callbacks structure from dump previously obtained + via dumpHandlers. Used within the library to carry user handlers set over + Dispatcher replugins. + """ + self.handlers = handlers + + def get_module(self, name): + return self._modules[name] + + def _register_modules(self): + self._modules['BasePresence'] = BasePresence(self._owner) + self._modules['BaseMessage'] = BaseMessage(self._owner) + self._modules['BaseIq'] = BaseIq(self._owner) + self._modules['EME'] = EME(self._owner) + self._modules['HTTPAuth'] = HTTPAuth(self._owner) + self._modules['Nickname'] = Nickname(self._owner) + self._modules['MUC'] = MUC(self._owner) + self._modules['Delay'] = Delay(self._owner) + self._modules['Captcha'] = Captcha(self._owner) + self._modules['Idle'] = Idle(self._owner) + self._modules['PGPLegacy'] = PGPLegacy(self._owner) + self._modules['VCardAvatar'] = VCardAvatar(self._owner) + self._modules['EntityCaps'] = EntityCaps(self._owner) + self._modules['Blocking'] = Blocking(self._owner) + self._modules['PubSub'] = PubSub(self._owner) + self._modules['Mood'] = Mood(self._owner) + self._modules['Activity'] = Activity(self._owner) + self._modules['Tune'] = Tune(self._owner) + self._modules['Location'] = Location(self._owner) + self._modules['UserAvatar'] = UserAvatar(self._owner) + self._modules['Bookmarks'] = Bookmarks(self._owner) + self._modules['OpenPGP'] = OpenPGP(self._owner) + self._modules['OMEMO'] = OMEMO(self._owner) + self._modules['Annotations'] = Annotations(self._owner) + self._modules['Muclumbus'] = Muclumbus(self._owner) + self._modules['SoftwareVersion'] = SoftwareVersion(self._owner) + self._modules['AdHoc'] = AdHoc(self._owner) + self._modules['IBB'] = IBB(self._owner) + self._modules['Discovery'] = Discovery(self._owner) + self._modules['ChatMarkers'] = ChatMarkers(self._owner) + self._modules['Receipts'] = Receipts(self._owner) + self._modules['OOB'] = OOB(self._owner) + self._modules['Correction'] = Correction(self._owner) + self._modules['Attention'] = Attention(self._owner) + self._modules['SecurityLabels'] = SecurityLabels(self._owner) + self._modules['Chatstates'] = Chatstates(self._owner) + self._modules['Register'] = Register(self._owner) + self._modules['HTTPUpload'] = HTTPUpload(self._owner) + + for instance in self._modules.values(): + for handler in instance.handlers: + self.RegisterHandler(*handler) + + def _init(self): + """ + Register default namespaces/protocols/handlers. Used internally + """ + # FIXME: inject dependencies, do not rely that they are defined by our + # owner + self.RegisterNamespace('unknown') + self.RegisterNamespace(NS_STREAMS) + self.RegisterNamespace(self._owner.defaultNamespace) + self.RegisterProtocol('iq', Iq) + self.RegisterProtocol('presence', Presence) + self.RegisterProtocol('message', Message) + self.RegisterDefaultHandler(self.returnStanzaHandler) + self.RegisterEventHandler(self._owner._caller._event_dispatcher) + self._register_modules() + + def plugin(self, owner): + """ + Plug the Dispatcher instance into Client class instance and send initial + stream header. Used internally + """ + self._init() + self._owner.lastErrNode = None + self._owner.lastErr = None + self._owner.lastErrCode = None + if hasattr(self._owner, 'StreamInit'): + self._owner.StreamInit() + else: + self.StreamInit() + + def plugout(self): + """ + Prepare instance to be destructed + """ + self._modules = {} + self.Stream.dispatch = None + self.Stream.features = None + self.Stream.destroy() + self._owner = None + self.Stream = None + + def StreamInit(self): + """ + Send an initial stream header + """ + self._owner.Connection.sendqueue = [] + self.Stream = NodeBuilder() + self.Stream.dispatch = self.dispatch + self.Stream._dispatch_depth = 2 + self.Stream.stream_header_received = self._check_stream_start + self.Stream.features = None + self._metastream = Node('stream:stream') + self._metastream.setNamespace(self._owner.Namespace) + self._metastream.setAttr('version', '1.0') + self._metastream.setAttr('xmlns:stream', NS_STREAMS) + self._metastream.setAttr('to', self._owner.Server) + self._metastream.setAttr('xml:lang', self._owner.lang) + self._owner.send("%s%s>" % (XML_DECLARATION, str(self._metastream)[:-2])) + + def _check_stream_start(self, ns, tag, attrs): + if ns != NS_STREAMS or tag!='stream': + raise ValueError('Incorrect stream start: (%s,%s). Terminating.' + % (tag, ns)) + + def replace_non_character(self, data): + return re.sub(self.invalid_chars_re, '\ufffd', data) + + def ProcessNonBlocking(self, data): + """ + Check incoming stream for data waiting + + :param data: data received from transports/IO sockets + :return: + 1) length of processed data if some data were processed; + 2) '0' string if no data were processed but link is alive; + 3) 0 (zero) if underlying connection is closed. + """ + # FIXME: + # When an error occurs we disconnect the transport directly. Client's + # disconnect method will never be called. + # Is this intended? + # also look at transports start_disconnect() + data = self.replace_non_character(data) + for handler in self._cycleHandlers: + handler(self) + if len(self._pendingExceptions) > 0: + _pendingException = self._pendingExceptions.pop() + sys.excepthook(*_pendingException) + return + try: + self.Stream.Parse(data) + # end stream:stream tag received + if self.Stream and self.Stream.has_received_endtag(): + self._owner.disconnect(self.Stream.streamError) + return 0 + except ExpatError as error: + log.error('Invalid XML received from server. Forcing disconnect.') + log.error(error) + self._owner.Connection.disconnect() + return 0 + except ValueError as e: + log.debug('ValueError: %s' % str(e)) + self._owner.Connection.pollend() + return 0 + if len(self._pendingExceptions) > 0: + _pendingException = self._pendingExceptions.pop() + sys.excepthook(*_pendingException) + return + if len(data) == 0: + return '0' + return len(data) + + def RegisterNamespace(self, xmlns, order='info'): + """ + Create internal structures for newly registered namespace + + You can register handlers for this namespace afterwards. By default + one namespace is already registered + (jabber:client or jabber:component:accept depending on context. + """ + log.debug('Registering namespace "%s"' % xmlns) + self.handlers[xmlns] = {} + self.RegisterProtocol('unknown', Protocol, xmlns=xmlns) + self.RegisterProtocol('default', Protocol, xmlns=xmlns) + + def RegisterProtocol(self, tag_name, proto, xmlns=None, order='info'): + """ + Used to declare some top-level stanza name to dispatcher + + Needed to start registering handlers for such stanzas. Iq, message and + presence protocols are registered by default. + """ + if not xmlns: + xmlns = self._owner.defaultNamespace + log.debug('Registering protocol "%s" as %s(%s)', tag_name, proto, xmlns) + self.handlers[xmlns][tag_name] = {'type': proto, 'default': []} + + def RegisterNamespaceHandler(self, xmlns, handler, typ='', ns='', system=0): + """ + Register handler for processing all stanzas for specified namespace + """ + self.RegisterHandler('default', handler, typ, ns, xmlns, system) + + def RegisterHandler(self, name, handler, typ='', ns='', xmlns=None, + system=False, priority=50): + """ + Register user callback as stanzas handler of declared type + + Callback arguments: + dispatcher instance (for replying), incoming return of previous handlers. + The callback must raise xmpp.NodeProcessed just before return if it wants + to prevent other callbacks to be called with the same stanza as argument + _and_, more importantly library from returning stanza to sender with error set. + + :param name: name of stanza. F.e. "iq". + :param handler: user callback. + :param typ: value of stanza's "type" attribute. If not specified any + value will match + :param ns: namespace of child that stanza must contain. + :param xmlns: xml namespace + :param system: call handler even if NodeProcessed Exception were raised + already. + :param priority: The priority of the handler, higher get called later + """ + if not xmlns: + xmlns = self._owner.defaultNamespace + + if not typ and not ns: + typ = 'default' + + log.debug( + 'Registering handler %s for "%s" type->%s ns->%s(%s) priority->%s', + handler, name, typ, ns, xmlns, priority) + + if xmlns not in self.handlers: + self.RegisterNamespace(xmlns, 'warn') + if name not in self.handlers[xmlns]: + self.RegisterProtocol(name, Protocol, xmlns, 'warn') + + specific = typ + ns + if specific not in self.handlers[xmlns][name]: + self.handlers[xmlns][name][specific] = [] + + self.handlers[xmlns][name][specific].append( + {'func': handler, + 'system': system, + 'priority': priority, + 'specific': specific}) + + def RegisterHandlerOnce(self, name, handler, typ='', ns='', xmlns=None, + system=0): + """ + Unregister handler after first call (not implemented yet) + """ + # FIXME Drop or implement + if not xmlns: + xmlns = self._owner.defaultNamespace + self.RegisterHandler(name, handler, typ, ns, xmlns, system) + + def UnregisterHandler(self, name, handler, typ='', ns='', xmlns=None): + """ + Unregister handler. "typ" and "ns" must be specified exactly the same as + with registering. + """ + if not xmlns: + xmlns = self._owner.defaultNamespace + if not typ and not ns: + typ = 'default' + if xmlns not in self.handlers: + return + if name not in self.handlers[xmlns]: + return + + specific = typ + ns + if specific not in self.handlers[xmlns][name]: + return + for handler_dict in self.handlers[xmlns][name][specific]: + if handler_dict['func'] == handler: + try: + self.handlers[xmlns][name][specific].remove(handler_dict) + log.debug( + 'Unregister handler %s for "%s" type->%s ns->%s(%s)', + handler, name, typ, ns, xmlns) + except ValueError: + log.warning( + 'Unregister failed: %s for "%s" type->%s ns->%s(%s)', + handler, name, typ, ns, xmlns) + pass + + def RegisterDefaultHandler(self, handler): + """ + Specify the handler that will be used if no NodeProcessed exception were + raised. This is returnStanzaHandler by default. + """ + self._defaultHandler = handler + + def RegisterEventHandler(self, handler): + """ + Register handler that will process events. F.e. "FILERECEIVED" event. See + common/connection: _event_dispatcher() + """ + self._eventHandler = handler + + def returnStanzaHandler(self, conn, stanza): + """ + Return stanza back to the sender with <feature-not-implemented/> error + set + """ + if stanza.getType() in ('get', 'set'): + conn._owner.send(Error(stanza, ERR_FEATURE_NOT_IMPLEMENTED)) + + def RegisterCycleHandler(self, handler): + """ + Register handler that will be called on every Dispatcher.Process() call + """ + if handler not in self._cycleHandlers: + self._cycleHandlers.append(handler) + + def UnregisterCycleHandler(self, handler): + """ + Unregister handler that will be called on every Dispatcher.Process() call + """ + if handler in self._cycleHandlers: + self._cycleHandlers.remove(handler) + + def Event(self, realm, event, data=None): + """ + Raise some event + + :param realm: scope of event. Usually a namespace. + :param event: the event itself. F.e. "SUCCESSFUL SEND". + :param data: data that comes along with event. Depends on event. + """ + if self._eventHandler: + self._eventHandler(realm, event, data) + else: + log.warning('Received unhandled event: %s' % event) + + def dispatch(self, stanza): + """ + Main procedure that performs XMPP stanza recognition and calling + apppropriate handlers for it. Called by simplexml + """ + + self.Event('', 'STANZA RECEIVED', stanza) + + self.Stream._mini_dom = None + + # Count stanza + self._owner.Smacks.count_incoming(stanza.getName()) + + name = stanza.getName() + if name == 'features': + self._owner.got_features = True + self.Stream.features = stanza + elif name == 'error': + if stanza.getTag('see-other-host'): + self._owner.got_see_other_host = stanza + + xmlns = stanza.getNamespace() + + if xmlns not in self.handlers: + log.warning('Unknown namespace: %s', xmlns) + xmlns = 'unknown' + # features stanza has been handled before + if name not in self.handlers[xmlns]: + if name not in ('features', 'stream'): + log.warning('Unknown stanza: %s', stanza) + else: + log.debug('Got %s / %s stanza', xmlns, name) + name = 'unknown' + else: + log.debug('Got %s / %s stanza', xmlns, name) + + # Convert simplexml to Protocol object + try: + stanza = self.handlers[xmlns][name]['type'](node=stanza) + except InvalidJid: + log.warning('Invalid JID, ignoring stanza') + log.warning(stanza) + return + + own_jid = self._owner.get_bound_jid() + properties = get_properties_struct(name) + + if name == 'iq': + if stanza.getFrom() is None and own_jid is not None: + stanza.setFrom(own_jid.getBare()) + + if name == 'message': + # https://tools.ietf.org/html/rfc6120#section-8.1.1.1 + # If the stanza does not include a 'to' address then the client MUST + # treat it as if the 'to' address were included with a value of the + # client's full JID. + + to = stanza.getTo() + if to is None: + stanza.setTo(own_jid) + elif not to.bareMatch(own_jid): + log.warning('Message addressed to someone else: %s', stanza) + return + + if stanza.getFrom() is None: + stanza.setFrom(own_jid.getBare()) + + # Unwrap carbon + try: + stanza, properties.carbon = unwrap_carbon(stanza, own_jid) + except (InvalidFrom, InvalidJid) as exc: + log.warning(exc) + log.warning(stanza) + return + except NodeProcessed as exc: + log.info(exc) + return + + # Unwrap mam + try: + stanza, properties.mam = unwrap_mam(stanza, own_jid) + except (InvalidStanza, InvalidJid) as exc: + log.warning(exc) + log.warning(stanza) + return + + typ = stanza.getType() + if name == 'message' and not typ: + typ = 'normal' + elif not typ: + typ = '' + + stanza.props = stanza.getProperties() + log.debug('type: %s, properties: %s', typ, stanza.props) + + _id = stanza.getID() + processed = False + if _id in self._expected: + cb, args = self._expected[_id] + log.debug('Expected stanza arrived. Callback %s(%s) found', + cb, args) + try: + if args is None: + cb(self, stanza) + else: + cb(self, stanza, **args) + except NodeProcessed: + pass + return + + # Gather specifics depending on stanza properties + specifics = ['default'] + if typ and typ in self.handlers[xmlns][name]: + specifics.append(typ) + for prop in stanza.props: + if prop in self.handlers[xmlns][name]: + specifics.append(prop) + if typ and typ + prop in self.handlers[xmlns][name]: + specifics.append(typ + prop) + + # Create the handler chain + chain = [] + chain += self.handlers[xmlns]['default']['default'] + for specific in specifics: + chain += self.handlers[xmlns][name][specific] + + # Sort chain with priority + chain.sort(key=lambda x: x['priority']) + + for handler in chain: + if not processed or handler['system']: + try: + log.info('Call handler: %s', handler['func'].__qualname__) + # Backwards compatibility until all handlers support + # properties + signature = inspect.signature(handler['func']) + if len(signature.parameters) > 2: + handler['func'](self, stanza, properties) + else: + handler['func'](self, stanza) + except NodeProcessed: + processed = True + except Exception: + self._pendingExceptions.insert(0, sys.exc_info()) + return + + # Stanza was not processed call default handler + if not processed and self._defaultHandler: + self._defaultHandler(self, stanza) + + def SendAndCallForResponse(self, stanza, func=None, args=None): + """ + Put stanza on the wire and call back when recipient replies. Additional + callback arguments can be specified in args + """ + _waitid = self.send(stanza) + self._expected[_waitid] = (func, args) + return _waitid + + def send(self, stanza, now=False): + """ + Wrap transports send method when plugged into NonBlockingClient. Makes + sure stanzas get ID and from tag. + """ + ID = None + if type(stanza) != str: + if isinstance(stanza, Protocol): + ID = stanza.getID() + if ID is None: + stanza.setID(self.getAnID()) + ID = stanza.getID() + if self._owner._registered_name and not stanza.getAttr('from'): + stanza.setAttr('from', self._owner._registered_name) + + self._owner.Connection.send(stanza, now) + + # If no ID then it is a whitespace + if hasattr(self._owner, 'Smacks') and ID: + self._owner.Smacks.save_in_queue(stanza) + + return ID + + +class BOSHDispatcher(XMPPDispatcher): + + def PlugIn(self, owner, after_SASL=False, old_features=None): + self.old_features = old_features + self.after_SASL = after_SASL + XMPPDispatcher.PlugIn(self, owner) + + def StreamInit(self): + """ + Send an initial stream header + """ + self.Stream = NodeBuilder() + self.Stream.dispatch = self.dispatch + self.Stream._dispatch_depth = 2 + self.Stream.stream_header_received = self._check_stream_start + self.Stream.features = self.old_features + + self._metastream = Node('stream:stream') + self._metastream.setNamespace(self._owner.Namespace) + self._metastream.setAttr('version', '1.0') + self._metastream.setAttr('xmlns:stream', NS_STREAMS) + self._metastream.setAttr('to', self._owner.Server) + self._metastream.setAttr('xml:lang', self._owner.lang) + + self.restart = True + self._owner.Connection.send_init(after_SASL=self.after_SASL) + + def StreamTerminate(self): + """ + Send a stream terminator + """ + self._owner.Connection.send_terminator() + + def ProcessNonBlocking(self, data=None): + if self.restart: + fromstream = self._metastream + fromstream.setAttr('from', fromstream.getAttr('to')) + fromstream.delAttr('to') + data = '%s%s>%s' % (XML_DECLARATION, str(fromstream)[:-2], data) + self.restart = False + return XMPPDispatcher.ProcessNonBlocking(self, data) + + def dispatch(self, stanza): + if stanza.getName() == 'body' and stanza.getNamespace() == NS_HTTP_BIND: + + stanza_attrs = stanza.getAttrs() + if 'authid' in stanza_attrs: + # should be only in init response + # auth module expects id of stream in document attributes + self.Stream._document_attrs['id'] = stanza_attrs['authid'] + self._owner.Connection.handle_body_attrs(stanza_attrs) + + children = stanza.getChildren() + if children: + for child in children: + # if child doesn't have any ns specified, simplexml (or expat) + # thinks it's of parent's (BOSH body) namespace, so we have to + # rewrite it to jabber:client + if child.getNamespace() == NS_HTTP_BIND: + child.setNamespace(self._owner.defaultNamespace) + XMPPDispatcher.dispatch(self, child) + else: + XMPPDispatcher.dispatch(self, stanza) diff --git a/nbxmpp/protocol.py b/nbxmpp/protocol.py index 206d65e..af0222b 100644 --- a/nbxmpp/protocol.py +++ b/nbxmpp/protocol.py @@ -139,6 +139,7 @@ NS_PUBSUB_PUBLISH_OPTIONS = NS_PUBSUB + '#publish-options' # XEP-0060 NS_PUBSUB_OWNER = 'http://jabber.org/protocol/pubsub#owner' # XEP-0060 NS_PUBSUB_CONFIG = 'http://jabber.org/protocol/pubsub#node_config' # XEP-0060 NS_REGISTER = 'jabber:iq:register' +NS_REGISTER_FEATURE = 'http://jabber.org/features/iq-register' NS_ROSTER = 'jabber:iq:roster' NS_ROSTERNOTES = 'storage:rosternotes' NS_ROSTERX = 'http://jabber.org/protocol/rosterx' # XEP-0144 @@ -204,6 +205,7 @@ NS_BOOKMARK_CONVERSION = 'urn:xmpp:bookmarks-conversion:0' NS_DOMAIN_BASED_NAME = 'urn:xmpp:domain-based-name:1' NS_HINTS = 'urn:xmpp:hints' NS_MUCLUMBUS = 'https://xmlns.zombofant.net/muclumbus/search/1.0' +NS_FRAMING = 'urn:ietf:params:xml:ns:xmpp-framing' #xmpp_stream_error_conditions = ''' #bad-format -- -- -- The entity has sent XML that cannot be processed. @@ -951,14 +953,39 @@ class JID: return str(self) -class BOSHBody(Node): - """ - <body> tag that wraps usual XMPP stanzas in XMPP over BOSH - """ +class StreamErrorNode(Node): + def __init__(self, node): + Node.__init__(self, node=node) + + self._text = {} + + text_elements = self.getTags('text', namespace=NS_XMPP_STREAMS) + for element in text_elements: + lang = element.getXmlLang() + text = element.getData() + self._text[lang] = text - def __init__(self, attrs=None, payload=None, node=None): - Node.__init__(self, tag='body', attrs=attrs, payload=payload, node=node) - self.setNamespace(NS_HTTP_BIND) + def get_condition(self): + for tag in self.getChildren(): + if tag.getName() != 'text' and tag.getNamespace() == NS_XMPP_STREAMS: + return tag.getName() + + def get_text(self, pref_lang=None): + if pref_lang is not None: + text = self._text.get(pref_lang) + if text is not None: + return text + + if self._text: + text = self._text.get('en') + if text is not None: + return text + + text = self._text.get(None) + if text is not None: + return text + return self._text.popitem()[1] + return '' class Protocol(Node): @@ -1734,6 +1761,99 @@ class Hashes2(Node): self.setData(hash_) +class BindRequest(Iq): + def __init__(self, resource): + if resource is not None: + resource = Node('resource', payload=resource) + Iq.__init__(self, typ='set') + self.addChild(node=Node('bind', {'xmlns': NS_BIND}, payload=resource)) + + +class TLSRequest(Node): + def __init__(self): + Node.__init__(self, tag='starttls', attrs={'xmlns': NS_TLS}) + + +class SessionRequest(Iq): + def __init__(self): + Iq.__init__(self, typ='set') + self.addChild(node=Node('session', attrs={'xmlns': NS_SESSION})) + + +class StreamHeader(Node): + def __init__(self, domain, lang=None): + if lang is None: + lang = 'en' + Node.__init__(self, + tag='stream:stream', + attrs={'xmlns': NS_CLIENT, + 'version': '1.0', + 'xmlns:stream': NS_STREAMS, + 'to': domain, + 'xml:lang': lang}) + + +class WebsocketOpenHeader(Node): + def __init__(self, domain, lang=None): + if lang is None: + lang = 'en' + Node.__init__(self, + tag='open', + attrs={'xmlns': NS_FRAMING, + 'version': '1.0', + 'to': domain, + 'xml:lang': lang}) + +class WebsocketCloseHeader(Node): + def __init__(self): + Node.__init__(self, tag='close', attrs={'xmlns': NS_FRAMING}) + + +class Features(Node): + def __init__(self, node): + Node.__init__(self, node=node) + + def has_starttls(self): + tls = self.getTag('starttls', namespace=NS_TLS) + if tls is not None: + required = tls.getTag('required') is not None + return True, required + return False, False + + def has_sasl(self): + return self.getTag('mechanisms', namespace=NS_SASL) is not None + + def get_mechs(self): + mechanisms = self.getTag('mechanisms', namespace=NS_SASL) + mechanisms = mechanisms.getTags('mechanism') + return set(mech.getData() for mech in mechanisms) + + def get_domain_based_name(self): + hostname = self.getTag('hostname', namespace=NS_DOMAIN_BASED_NAME) + if hostname is not None: + return hostname.getData() + + def has_bind(self): + return self.getTag('bind', namespace=NS_BIND) is not None + + def session_required(self): + session = self.getTag('session', namespace=NS_SESSION) + if session is not None: + optional = session.getTag('optional') is not None + return not optional + return False + + def has_sm(self): + return self.getTag('sm', namespace=NS_STREAM_MGMT) is not None + + def has_roster_version(self): + return self.getTag('ver', namespace=NS_ROSTER_VER) is not None + + def has_register(self): + return self.getTag( + 'register', namespace=NS_REGISTER_FEATURE) is not None + + class ErrorNode(Node): """ XMPP-style error element diff --git a/nbxmpp/resolver.py b/nbxmpp/resolver.py new file mode 100644 index 0000000..0f43578 --- /dev/null +++ b/nbxmpp/resolver.py @@ -0,0 +1,150 @@ +# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com> +# +# This file is part of nbxmpp. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 3 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; If not, see <http://www.gnu.org/licenses/>. + +import logging + +from gi.repository import Gio +from gi.repository import GLib + + +log = logging.getLogger('nbxmpp.resolver') + + +class DNSResolveRequest: + def __init__(self, cache, domain, callback): + self._domain = domain + self._result = self._lookup_cache(cache) + self._callback = callback + + @property + def result(self): + return self._result + + @result.setter + def result(self, value): + self._result = value + + @property + def is_cached(self): + return self.result is not None + + def _lookup_cache(self, cache): + cached_request = cache.get(self) + if cached_request is None: + return None + return cached_request.result + + def finalize(self): + GLib.idle_add(self._callback, self.result) + self._callback = None + + def __hash__(self): + raise NotImplementedError + + def __eq__(self, other): + return hash(other) == hash(self) + + +class AlternativeMethods(DNSResolveRequest): + def __init__(self, *args, **kwargs): + DNSResolveRequest.__init__(self, *args, **kwargs) + + @property + def hostname(self): + return '_xmppconnect.%s' % self._domain + + def __hash__(self): + return hash(self.hostname) + + +class Singleton(type): + _instances = {} + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super(Singleton, cls).__call__(*args, + **kwargs) + return cls._instances[cls] + + +class GioResolver(metaclass=Singleton): + def __init__(self): + self._cache = {} + + def _cache_request(self, request): + self._cache[request] = request + + def resolve_alternatives(self, domain, callback): + request = AlternativeMethods(self._cache, domain, callback) + if request.is_cached: + request.finalize() + return + + Gio.Resolver.get_default().lookup_records_async( + request.hostname, + Gio.ResolverRecordType.TXT, + None, + self._on_alternatives_result, + request) + + def _on_alternatives_result(self, resolver, result, request): + try: + results = resolver.lookup_records_finish(result) + except GLib.Error as error: + log.warning(error.message) + log.warning(error.code) + request.finalize() + return + + try: + websocket_uri = self._parse_alternative_methods(results) + except Exception: + log.exception('Failed to parse alternative ' + 'connection methods: %s', results) + request.finalize() + return + + request.result = websocket_uri + self._cache_request(request) + request.finalize() + + @staticmethod + def _parse_alternative_methods(variant_results): + result_list = [res[0][0] for res in variant_results] + for result in result_list: + if result.startswith('_xmpp-client-websocket'): + return result.split('=')[1] + + +if __name__ == '__main__': + import sys + + try: + domain_ = sys.argv[1] + except Exception: + print('Provide domain name as argument') + sys.exit() + + # Execute: + # > python3 -m nbxmpp.resolver domain + + def on_result(result): + print('Result: ', result) + mainloop.quit() + + GioResolver().resolve_alternatives(domain_, on_result) + mainloop = GLib.MainLoop() + mainloop.run() diff --git a/nbxmpp/simplexml.py b/nbxmpp/simplexml.py index a560cf4..dd83340 100644 --- a/nbxmpp/simplexml.py +++ b/nbxmpp/simplexml.py @@ -531,7 +531,8 @@ class NodeBuilder: XML handler """ - def __init__(self, data=None, initial_node=None): + def __init__(self, data=None, initial_node=None, + dispatch_depth=1, finished=True): """ Take two optional parameters: "data" and "initial_node" @@ -540,7 +541,6 @@ class NodeBuilder: about it as of "node upgrade". "data" (if provided) feeded to parser immidiatedly after instance init. """ - log.debug("Preparing to handle incoming XML stream.") self._parser = xml.parsers.expat.ParserCreate() self._parser.UseForeignDTD(False) self._parser.StartElementHandler = self.starttag @@ -559,16 +559,17 @@ class NodeBuilder: self.__depth = 0 self.__last_depth = 0 self.__max_depth = 0 - self._dispatch_depth = 1 + self._dispatch_depth = dispatch_depth self._document_attrs = None self._document_nsp = None - self._mini_dom=initial_node + self._mini_dom = initial_node self.last_is_data = 1 - self._ptr=None + self._ptr = None self.data_buffer = None self.streamError = '' + self._is_stream = not finished if data: - self._parser.Parse(data, 1) + self._parser.Parse(data, finished) def check_data_buffer(self): if self.data_buffer: @@ -618,7 +619,7 @@ class NodeBuilder: header = Node(tag=tag, attrs=attrs, nsp=self._document_nsp, node_built=True) self.dispatch(header) - self.stream_header_received(ns, name, attrs) + self._check_stream_start(ns, name) except ValueError as e: self._document_attrs = None raise ValueError(str(e)) @@ -626,7 +627,15 @@ class NodeBuilder: self._ptr.parent.data.append('') self.last_is_data = 0 - def endtag(self, tag ): + def _check_stream_start(self, ns, tag): + if self._is_stream: + if ns != 'http://etherx.jabber.org/streams' or tag != 'stream': + raise ValueError('Incorrect stream start: (%s,%s). Terminating.' + % (tag, ns)) + else: + self.stream_header_received() + + def endtag(self, tag): """ XML Parser callback. Used internally """ @@ -681,7 +690,7 @@ class NodeBuilder: """ pass - def stream_header_received(self, ns, tag, attrs): + def stream_header_received(self): """ Method called when stream just opened """ diff --git a/nbxmpp/smacks.py b/nbxmpp/smacks.py index bb08622..f1c373a 100644 --- a/nbxmpp/smacks.py +++ b/nbxmpp/smacks.py @@ -21,16 +21,13 @@ import logging from nbxmpp.protocol import NS_STREAM_MGMT from nbxmpp.protocol import NS_DELAY2 from nbxmpp.simplexml import Node -from nbxmpp.transports import DISCONNECTING -from nbxmpp.plugin import PlugIn -from nbxmpp.const import Realm -from nbxmpp.const import Event +from nbxmpp.const import StreamState log = logging.getLogger('nbxmpp.smacks') -class Smacks(PlugIn): +class Smacks: """ This is Smacks is the Stream Management class. It takes care of requesting and sending acks. Also, it keeps track of the unhandled outgoing stanzas. @@ -39,8 +36,8 @@ class Smacks(PlugIn): number of handled stanzas """ - def __init__(self): - PlugIn.__init__(self) + def __init__(self, client): + self._client = client self._out_h = 0 # Outgoing stanzas handled self._in_h = 0 # Incoming stanzas handled self._acked_h = 0 # Last acked stanza @@ -51,59 +48,54 @@ class Smacks(PlugIn): # Max number of stanzas in queue before making a request self.max_queue = 0 + self._sm_supported = False self.enabled = False # If SM is enabled self._enable_sent = False # If we sent 'enable' self.resumed = False # If the session was resumed self.resume_in_progress = False self.resume_supported = False # Does the session support resume - self._resume_jid = None # The JID from the previous session self._session_id = None self._location = None - def get_resume_data(self): - if self.resume_supported: - return { - 'out': self._out_h, - 'in': self._in_h, - 'session_id': self._session_id, - 'location': self._location, - 'uqueue': self._uqueue, - 'bound_jid': self._owner._registered_name - } - - def set_resume_data(self, data): - if data is None: + self.register_handlers() + + @property + def sm_supported(self): + return self._sm_supported + + @sm_supported.setter + def sm_supported(self, value): + log.info('Server supports detected: %s', value) + self._sm_supported = value + + def delegate(self, stanza): + if stanza.getNamespace() != NS_STREAM_MGMT: return - log.debug('Resume data set') - self._out_h = data.get('out') - self._in_h = data.get('in') - self._session_id = data.get('session_id') - self._location = data.get('location') - self._old_uqueue = data.get('uqueue') - self._resume_jid = data.get('bound_jid') - self.resume_supported = True + if stanza.getName() == 'resumed': + self._on_resumed(stanza) + elif stanza.getName() == 'failed': + self._on_failed(None, stanza, None) def register_handlers(self): - self._owner.Dispatcher.RegisterNamespace(NS_STREAM_MGMT) - self._owner.Dispatcher.RegisterHandler( + self._client.register_handler( 'enabled', self._on_enabled, xmlns=NS_STREAM_MGMT) - self._owner.Dispatcher.RegisterHandler( + self._client.register_handler( + 'failed', self._on_failed, xmlns=NS_STREAM_MGMT) + self._client.register_handler( 'r', self._send_ack, xmlns=NS_STREAM_MGMT) - self._owner.Dispatcher.RegisterHandler( + self._client.register_handler( 'a', self._on_ack, xmlns=NS_STREAM_MGMT) - self._owner.Dispatcher.RegisterHandler( - 'resumed', self._on_resumed, xmlns=NS_STREAM_MGMT) - self._owner.Dispatcher.RegisterHandler( - 'failed', self._on_failed, xmlns=NS_STREAM_MGMT) def send_enable(self): + if not self.sm_supported: + return enable = Node(NS_STREAM_MGMT + ' enable', attrs={'resume': 'true'}) - self._owner.Connection.send(enable, now=False) + self._client.send_nonza(enable, now=False) log.debug('Send enable') self._enable_sent = True - def _on_enabled(self, _disp, stanza): + def _on_enabled(self, _con, stanza, _properties): if self.enabled: log.error('Received "enabled", but SM is already enabled') return @@ -160,7 +152,7 @@ class Smacks(PlugIn): log.info('Resend %s stanzas', len(self._old_uqueue)) for stanza in self._old_uqueue: # Use dispatcher so we increment the counter - self._owner.Dispatcher.send(stanza) + self._client.send_stanza(stanza) self._old_uqueue = [] def resume_request(self): @@ -181,9 +173,9 @@ class Smacks(PlugIn): self._acked_h = self._in_h self.resume_in_progress = True - self._owner.Connection.send(resume, now=False) + self._client.send_nonza(resume, now=False) - def _on_resumed(self, _disp, stanza): + def _on_resumed(self, stanza): """ Checks if the number of stanzas sent are the same as the number of stanzas received by the server. Resends stanzas not received @@ -197,29 +189,29 @@ class Smacks(PlugIn): self.enabled = True self.resumed = True self.resume_in_progress = False - self._owner.set_bound_jid(self._resume_jid) - self._owner.Dispatcher.Event(Realm.CONNECTING, Event.RESUME_SUCCESSFUL) + self._client.set_state(StreamState.RESUME_SUCCESSFUL) self._resend_queue() def _send_ack(self, *args): ack = Node(NS_STREAM_MGMT + ' a', attrs={'h': self._in_h}) self._acked_h = self._in_h log.debug('Send ack, h: %s', self._in_h) - self._owner.Connection.send(ack, now=False) + self._client.send_nonza(ack, now=False) - def send_closing_ack(self): - if self._owner.Connection.get_state() != DISCONNECTING: - return - ack = Node(NS_STREAM_MGMT + ' a', attrs={'h': self._in_h}) - log.debug('Send closing ack, h: %s', self._in_h) - self._owner.Connection.send(ack, now=True) + def close_session(self): + # We end the connection deliberately + # Reset the state -> no resume + log.info('Close session') + self._reset_state() def _request_ack(self): request = Node(NS_STREAM_MGMT + ' r') log.debug('Request ack') - self._owner.Connection.send(request, now=False) + self._client.send_nonza(request, now=False) - def _on_ack(self, _disp, stanza): + def _on_ack(self, _stream, stanza, _properties): + if not self.enabled: + return log.debug('Ack received, h: %s', stanza.getAttr('h')) self._validate_ack(stanza, self._uqueue) @@ -250,17 +242,17 @@ class Smacks(PlugIn): else: log.debug('Validate ack, our h: %d, server h: %d, queue: %d', self._out_h, count_server, queue_size) - log.debug('removing %d stanzas from queue', queue_size - diff) + log.debug('Removing %d stanzas from queue', queue_size - diff) while len(queue) > diff: queue.pop(0) - def _on_failed(self, _disp, stanza): + def _on_failed(self, _stream, stanza, _properties): ''' This can be called after 'enable' and 'resume' ''' - log.info('Stream Management negotiation failed') + log.info('Negotiation failed') error_text = stanza.getTagData('text') if error_text is not None: log.info(error_text) @@ -275,10 +267,12 @@ class Smacks(PlugIn): log.info(tag.getName()) if self.resume_in_progress: - self._owner.Dispatcher.Event(Realm.CONNECTING, Event.RESUME_FAILED) - # We failed while resuming - self.resume_supported = False - self._owner.bind() + # Reset state before sending Bind, because otherwise stanza + # will be counted and ack will be requested. + # _reset_state() also resets resume_in_progress + self._reset_state() + self._client.set_state(StreamState.RESUME_FAILED) + self._reset_state() def _reset_state(self): diff --git a/nbxmpp/structs.py b/nbxmpp/structs.py index 63ad950..c35f2e5 100644 --- a/nbxmpp/structs.py +++ b/nbxmpp/structs.py @@ -1,10 +1,10 @@ -# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com> +# Copyright (C) 2018-2020 Philipp Hörist <philipp AT hoerist.com> # # This file is part of nbxmpp. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 +# as published by the Free Software Foundation; either version 3 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, @@ -19,8 +19,12 @@ import time import random from collections import namedtuple +from gi.repository import Soup +from gi.repository import Gio + from nbxmpp.protocol import JID from nbxmpp.protocol import NS_STANZAS +from nbxmpp.protocol import NS_STREAMS from nbxmpp.protocol import NS_MAM_1 from nbxmpp.protocol import NS_MAM_2 from nbxmpp.protocol import NS_MUC @@ -37,8 +41,8 @@ from nbxmpp.const import LOCATION_DATA from nbxmpp.const import AdHocStatus StanzaHandler = namedtuple('StanzaHandler', - 'name callback typ ns xmlns system priority') -StanzaHandler.__new__.__defaults__ = ('', '', None, False, 50) + 'name callback typ ns xmlns priority') +StanzaHandler.__new__.__defaults__ = ('', '', None, 50) CommonResult = namedtuple('CommonResult', 'jid') CommonResult.__new__.__defaults__ = (None,) @@ -387,6 +391,23 @@ class AdHocCommand(namedtuple('AdHocCommand', 'jid node name sessionid status da return self.status == AdHocStatus.CANCELED +class ProxyData(namedtuple('ProxyData', 'type host username password')): + + __slots__ = [] + + def get_uri(self): + if self.username is not None: + user_pass = Soup.uri_encode('%s:%s' % (self.username, + self.password)) + return '%s://%s@%s' % (self.type, + user_pass, + self.host) + return '%s://%s' % (self.type, self.host) + + def get_resolver(self): + return Gio.SimpleProxyResolver.new(self.get_uri(), None) + + class OMEMOBundle(namedtuple('OMEMOBundle', 'spk spk_signature ik otpks')): def pick_prekey(self): return random.SystemRandom().choice(self.otpks) @@ -512,6 +533,52 @@ class StanzaMalformedError(CommonError): raise NotImplementedError +class StanzaTimeoutError(CommonError): + def __init__(self, id_): + self.condition = 'timeout' + self.id = id_ + + @classmethod + def from_string(cls, node_string): + raise NotImplementedError + + def __str__(self): + return 'IQ with id %s reached timeout' % self.id + + def serialize(self): + raise NotImplementedError + + +class StreamError(CommonError): + def __init__(self, stanza): + self.condition = stanza.getError() + self.condition_data = self._error_node.getTagData(self.condition) + self.app_condition = stanza.getAppError() + self.type = stanza.getErrorType() + self.jid = stanza.getFrom() + self.id = stanza.getID() + self._text = {} + + text_elements = self._error_node.getTags('text', namespace=NS_STREAMS) + for element in text_elements: + lang = element.getXmlLang() + text = element.getData() + self._text[lang] = text + + @classmethod + def from_string(cls, node_string): + raise NotImplementedError + + def __str__(self): + text = self.get_text('en') or '' + if text: + text = ' - %s' % text + return 'Error from %s: %s%s' % (self.jid, self.condition, text) + + def serialize(self): + raise NotImplementedError + + class TuneData(namedtuple('TuneData', 'artist length rating source title track uri')): __slots__ = [] diff --git a/nbxmpp/tcp.py b/nbxmpp/tcp.py new file mode 100644 index 0000000..fca36eb --- /dev/null +++ b/nbxmpp/tcp.py @@ -0,0 +1,335 @@ +# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com> +# +# This file is part of nbxmpp. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 3 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; If not, see <http://www.gnu.org/licenses/>. + +import logging +import struct +from collections import deque + +from gi.repository import GLib +from gi.repository import Gio +from gi.repository import GObject + +from nbxmpp.const import TCPState +from nbxmpp.const import ConnectionType +from nbxmpp.util import utf8_decode +from nbxmpp.util import convert_tls_error_flags +from nbxmpp.connection import Connection + +log = logging.getLogger('nbxmpp.tcp') + +READ_BUFFER_SIZE = 8192 + + +class TCPConnection(Connection): + def __init__(self, *args, **kwargs): + Connection.__init__(self, *args, **kwargs) + + self._client = Gio.SocketClient.new() + + if self._address.proxy is not None: + self._proxy_resolver = self._address.proxy.get_resolver() + self._client.set_proxy_resolver(self._proxy_resolver) + + GObject.Object.connect(self._client, 'event', self._on_event) + + self._con = None + + self._read_buffer = b'' + + self._write_queue = deque([]) + self._write_stanza_buffer = None + + self._connect_cancellable = Gio.Cancellable() + self._read_cancellable = Gio.Cancellable() + + self._input_closed = False + self._output_closed = False + + self._keepalive_id = None + + def connect(self): + self.state = TCPState.CONNECTING + + if self._address.is_service: + self._client.connect_to_service_async(self._address.domain, + self._address.service, + self._connect_cancellable, + self._on_connect_finished, + None) + elif self._address.is_host: + self._client.connect_to_host_async(self._address.host, + 0, + self._connect_cancellable, + self._on_connect_finished, + None) + + else: + raise ValueError('Invalid Address') + + def _on_event(self, _socket_client, event, _connectable, connection): + if event == Gio.SocketClientEvent.CONNECTING: + remote_address = connection.get_remote_address() + use_proxy = self._address.proxy is not None + target = 'proxy' if use_proxy else self._address.domain + log.info('Connecting to %s (%s)', + target, + remote_address.to_string()) + + def _check_certificate(self, _connection, certificate, errors): + self._peer_certificate = certificate + self._peer_certificate_errors = convert_tls_error_flags(errors) + + if self._accept_certificate(): + return True + + self.notify('bad-certificate') + return False + + def _on_certificate_set(self, connection, _param): + self._peer_certificate = connection.props.peer_certificate + self._peer_certificate_errors = convert_tls_error_flags( + connection.props.peer_certificate_errors) + self.notify('certificate-set') + + def _on_connect_finished(self, client, result, _user_data): + try: + if self._address.proxy is not None: + self._con = client.connect_to_host_finish(result) + elif self._address.is_service: + self._con = client.connect_to_service_finish(result) + elif self._address.is_host: + self._con = client.connect_to_host_finish(result) + else: + raise ValueError('Address must be a service or host') + except GLib.Error as error: + log.error('Connect Error: %s', error) + self._finalize('connection-failed') + return + + self._con.set_graceful_disconnect(True) + self._con.get_socket().set_keepalive(True) + + self.state = TCPState.CONNECTED + + use_proxy = self._address.proxy is not None + target = 'proxy' if use_proxy else self._address.domain + log.info('Connected to %s (%s)', + target, + self._con.get_remote_address().to_string()) + + self._on_connected() + + def _on_connected(self): + self.notify('connected') + self._read_async() + + def _remove_keepalive_timer(self): + if self._keepalive_id is not None: + GLib.source_remove(self._keepalive_id) + self._keepalive_id = None + + def _renew_keepalive_timer(self): + self._remove_keepalive_timer() + self._keepalive_id = GLib.timeout_add_seconds(5, self._send_keepalive) + + def _send_keepalive(self): + log.info('Send keepalive') + self._keepalive_id = None + if not self._con.get_output_stream().has_pending(): + self._write_all_async(' '.encode()) + + def start_tls_negotiation(self): + log.info('Start TLS negotiation') + remote_address = self._con.get_remote_address() + identity = Gio.NetworkAddress.new(self._address.domain, + remote_address.props.port) + + tls_client = Gio.TlsClientConnection.new(self._con, identity) + + if self._address.type == ConnectionType.DIRECT_TLS: + tls_client.set_advertised_protocols(['xmpp-client']) + tls_client.set_rehandshake_mode(Gio.TlsRehandshakeMode.NEVER) + tls_client.set_validation_flags(Gio.TlsCertificateFlags.VALIDATE_ALL) + tls_client.connect('accept-certificate', self._check_certificate) + tls_client.connect('notify::peer-certificate', self._on_certificate_set) + + # This Wraps the Gio.TlsClientConnection and the Gio.Socket together + # so we get back a Gio.SocketConnection + self._con = Gio.TcpWrapperConnection.new(tls_client, + self._con.get_socket()) + + def _read_async(self): + if self._input_closed: + return + + self._con.get_input_stream().read_bytes_async( + READ_BUFFER_SIZE, + GLib.PRIORITY_DEFAULT, + self._read_cancellable, + self._on_read_async_finish, + None) + + def _on_read_async_finish(self, stream, result, _user_data): + try: + data = stream.read_bytes_finish(result) + except GLib.Error as error: + quark = GLib.quark_try_string('g-io-error-quark') + if error.matches(quark, Gio.IOErrorEnum.CANCELLED): + if self._input_closed: + return + + quark = GLib.quark_try_string('g-tls-error-quark') + if error.matches(quark, Gio.TlsError.EOF): + log.info('Incoming stream closed: TLS EOF') + self._finalize('disconnected') + return + + if error.matches(quark, Gio.TlsError.BAD_CERTIFICATE): + log.info('Certificate Error: %s', error) + self._finalize('disconnected') + return + + log.error('Read Error: %s', error) + return + + data = data.get_data() + if not data: + if self._state == TCPState.DISCONNECTING: + log.info('Reveived zero data on _read_async()') + self._finalize('disconnected') + else: + log.warning('Reveived zero data on _read_async()') + return + + self._renew_keepalive_timer() + + self._read_buffer += data + data, self._read_buffer = utf8_decode(self._read_buffer) + + self._log_stanza(data, received=True) + self.notify('data-received', data) + + self._read_async() + + def _write_stanzas(self): + self._write_stanza_buffer = self._write_queue + self._write_queue = deque([]) + data = ''.join(map(str, self._write_stanza_buffer)).encode() + self._write_all_async(data) + + def _write_all_async(self, data): + # We have to pass data to the callback, because GLib takes no + # reference on the passed data and python would gc collect it + # bevor GLib has a chance to write it to the stream + self._con.get_output_stream().write_all_async( + data, + GLib.PRIORITY_DEFAULT, + None, + self._on_write_all_async_finished, + data) + + def _on_write_all_async_finished(self, stream, result, data): + try: + stream.write_all_finish(result) + except GLib.Error as error: + quark = GLib.quark_try_string('g-tls-error-quark') + if error.matches(quark, Gio.TlsError.BAD_CERTIFICATE): + self._write_stanza_buffer = None + return + + log.error('Write Error: %s', error) + return + + self._renew_keepalive_timer() + + data = data.decode() + if data == ' ': + # keepalive whitespace + return + + for stanza in self._write_stanza_buffer: + self._log_stanza(stanza, received=False) + self._write_stanza_buffer = None + + self.notify('data-sent', data) + + if self._output_closed and not self._write_queue: + self._check_for_shutdown() + return + + if self._write_queue: + self._write_stanzas() + + def send(self, stanza, now=False): + if self._state in (TCPState.DISCONNECTED, TCPState.DISCONNECTING): + log.warning('send() not possible in state: %s', self._state) + return + + if now: + self._write_queue.appendleft(stanza) + else: + self._write_queue.append(stanza) + + if not self._con.get_output_stream().has_pending(): + self._write_stanzas() + + def disconnect(self): + self._remove_keepalive_timer() + if self.state == TCPState.CONNECTING: + self.state = TCPState.DISCONNECTING + self._connect_cancellable.cancel() + return + + if self._state in (TCPState.DISCONNECTED, TCPState.DISCONNECTING): + log.warning('Called disconnect on state: %s', self._state) + return + + self.state = TCPState.DISCONNECTING + self._finalize('disconnected') + + def _check_for_shutdown(self): + if self._input_closed and self._output_closed: + self._finalize('disconnected') + + def shutdown_input(self): + self._remove_keepalive_timer() + log.info('Shutdown input') + self._input_closed = True + self._read_cancellable.cancel() + self._check_for_shutdown() + + def shutdown_output(self): + self._remove_keepalive_timer() + self.state = TCPState.DISCONNECTING + log.info('Shutdown output') + self._output_closed = True + + def _finalize(self, signal_name): + self._remove_keepalive_timer() + if self._con is not None: + try: + self._con.get_socket().shutdown(True, True) + except GLib.Error as error: + log.info(error) + self.state = TCPState.DISCONNECTED + self.notify(signal_name) + self.destroy() + + def destroy(self): + super().destroy() + self._con = None + self._client = None diff --git a/nbxmpp/util.py b/nbxmpp/util.py index 47f1d86..f6b2d0f 100644 --- a/nbxmpp/util.py +++ b/nbxmpp/util.py @@ -20,15 +20,30 @@ import base64 import weakref import hashlib import uuid +import binascii +import ipaddress +import os +import re +from collections import defaultdict + from functools import wraps from functools import lru_cache import precis_i18n.codec +from gi.repository import Gio from nbxmpp.protocol import DiscoInfoMalformed from nbxmpp.protocol import isErrorNode from nbxmpp.protocol import NS_DATA from nbxmpp.protocol import NS_HTTPUPLOAD_0 +from nbxmpp.const import GIO_TLS_ERRORS +from nbxmpp.const import StreamState +from nbxmpp.protocol import NS_STREAMS +from nbxmpp.protocol import NS_CLIENT +from nbxmpp.protocol import NS_FRAMING +from nbxmpp.protocol import StanzaMalformed +from nbxmpp.protocol import StreamHeader +from nbxmpp.protocol import WebsocketOpenHeader from nbxmpp.structs import Properties from nbxmpp.structs import IqProperties from nbxmpp.structs import MessageProperties @@ -93,9 +108,9 @@ def call_on_response(cb): if callback_ is not None: attrs['callback'] = weakref.WeakMethod(callback_) - self._client.SendAndCallForResponse(stanza, - getattr(self, cb), - attrs) + self._client.send_stanza(stanza, + callback=getattr(self, cb), + user_data=attrs) return func_wrapper return response_decorator @@ -343,3 +358,138 @@ def get_form(stanza, form_type): if field.value == form_type: return form return None + + +def validate_stream_header(stanza, domain, is_websocket): + attrs = stanza.getAttrs() + if attrs.get('from') != domain: + raise StanzaMalformed('Invalid from attr in stream header') + + if is_websocket: + if attrs.get('xmlns') != NS_FRAMING: + raise StanzaMalformed('Invalid namespace in stream header') + else: + if attrs.get('xmlns:stream') != NS_STREAMS: + raise StanzaMalformed('Invalid stream namespace in stream header') + if attrs.get('xmlns') != NS_CLIENT: + raise StanzaMalformed('Invalid namespace in stream header') + + if attrs.get('version') != '1.0': + raise StanzaMalformed('Invalid stream version in stream header') + stream_id = attrs.get('id') + if stream_id is None: + raise StanzaMalformed('No stream id found in stream header') + return stream_id + + +def get_stream_header(domain, lang, is_websocket): + if is_websocket: + return WebsocketOpenHeader(domain, lang) + header = StreamHeader(domain, lang) + return "<?xml version='1.0'?>%s>" % str(header)[:-3] + + +def get_stanza_id(): + return str(uuid.uuid4()) + + +def utf8_decode(data): + ''' + Decodes utf8 byte string to unicode string + Does handle invalid utf8 sequences by splitting + the invalid sequence at the end + + returns (decoded unicode string, invalid byte sequence) + ''' + try: + return data.decode(), b'' + except UnicodeDecodeError: + for i in range(-1, -4, -1): + char = data[i] + if char & 0xc0 == 0x80: + continue + return data[:i].decode(), data[i:] + raise + + +def get_rand_number(): + return int(binascii.hexlify(os.urandom(6)), 16) + + +def get_invalid_xml_regex(): + # \ufddo -> \ufdef range + c = '\ufdd0' + r = c + while c < '\ufdef': + c = chr(ord(c) + 1) + r += '|' + c + + # \ufffe-\uffff, \u1fffe-\u1ffff, ..., \u10fffe-\u10ffff + c = '\ufffe' + r += '|' + c + r += '|' + chr(ord(c) + 1) + while c < '\U0010fffe': + c = chr(ord(c) + 0x10000) + r += '|' + c + r += '|' + chr(ord(c) + 1) + + return re.compile(r) + + +def get_tls_error_phrase(tls_error): + phrase = GIO_TLS_ERRORS.get(tls_error) + if phrase is None: + return GIO_TLS_ERRORS.get(Gio.TlsCertificateFlags.GENERIC_ERROR) + return phrase + + +def convert_tls_error_flags(flags): + if not flags: + return set() + + # If GLib ever adds more flags GIO_TLS_ERRORS have to + # be extended, otherwise errors go unnoticed + if Gio.TlsCertificateFlags.VALIDATE_ALL != 127: + raise ValueError + + return set(filter(lambda error: error & flags, GIO_TLS_ERRORS.keys())) + + +def get_websocket_close_string(websocket): + data = websocket.get_close_data() + code = websocket.get_close_code() + + if code is None and data is None: + return '' + return ' Data: %s Code: %s' % (data, code) + + +def is_websocket_close(stanza): + return stanza.getName() == 'close' and stanza.getNamespace() == NS_FRAMING + + +def is_websocket_stream_error(stanza): + return stanza.getName() == 'error' and stanza.getNamespace() == NS_STREAMS + + +class Observable: + def __init__(self, log_): + self._log = log_ + self._frozen = False + self._callbacks = defaultdict(list) + + def remove_subscriptions(self): + self._callbacks = defaultdict(list) + + def subscribe(self, signal_name, func): + self._callbacks[signal_name].append(func) + + def notify(self, signal_name, *args, **kwargs): + if self._frozen: + self._frozen = False + return + + self._log.info('Signal: %s', signal_name) + callbacks = self._callbacks.get(signal_name, []) + for func in callbacks: + func(self, signal_name, *args, **kwargs) diff --git a/nbxmpp/websocket.py b/nbxmpp/websocket.py new file mode 100644 index 0000000..8e34ddf --- /dev/null +++ b/nbxmpp/websocket.py @@ -0,0 +1,182 @@ +# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com> +# +# This file is part of nbxmpp. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 3 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; If not, see <http://www.gnu.org/licenses/>. + +import logging + +from gi.repository import Soup +from gi.repository import GLib +from gi.repository import Gio + +from nbxmpp.const import TCPState +from nbxmpp.const import ConnectionType +from nbxmpp.util import get_websocket_close_string +from nbxmpp.util import convert_tls_error_flags +from nbxmpp.connection import Connection + +log = logging.getLogger('nbxmpp.websocket') + + +class WebsocketConnection(Connection): + def __init__(self, *args, **kwargs): + Connection.__init__(self, *args, **kwargs) + + self._session = Soup.Session() + self._session.props.ssl_strict = False + + if log.getEffectiveLevel() == logging.INFO: + self._session.add_feature( + Soup.Logger.new(Soup.LoggerLogLevel.BODY, -1)) + + self._websocket = None + self._cancellable = Gio.Cancellable() + + self._input_closed = False + self._output_closed = False + + def connect(self): + log.info('Try to connect to %s', self._address.uri) + + self.state = TCPState.CONNECTING + + message = Soup.Message.new('GET', self._address.uri) + message.connect('starting', self._check_certificate) + message.set_flags(Soup.MessageFlags.NO_REDIRECT) + self._session.websocket_connect_async(message, + None, + ['xmpp'], + self._cancellable, + self._on_connect, + None) + + def _on_connect(self, session, result, _user_data): + # TODO: check if protocol 'xmpp' is set + try: + self._websocket = session.websocket_connect_finish(result) + except GLib.Error as error: + quark = GLib.quark_try_string('g-io-error-quark') + if error.matches(quark, Gio.IOErrorEnum.CANCELLED): + self._finalize('disconnected') + return + + log.error('Connection Error: %s', error) + self._finalize('connection-failed') + return + + self._websocket.set_keepalive_interval(5) + self._websocket.connect('message', self._on_websocket_message) + self._websocket.connect('closed', self._on_websocket_closed) + self._websocket.connect('closing', self._on_websocket_closing) + self._websocket.connect('error', self._on_websocket_error) + self._websocket.connect('pong', self._on_websocket_pong) + + self.state = TCPState.CONNECTED + self.notify('connected') + + def start_tls_negotiation(self): + # Soup.Session does this automatically + raise NotImplementedError + + def _check_certificate(self, message): + https_used, certificate, errors = message.get_https_status() + if not https_used and self._address.type == ConnectionType.PLAIN: + return + + self._peer_certificate = certificate + self._peer_certificate_errors = convert_tls_error_flags(errors) + + self.notify('certificate-set') + + if self._accept_certificate(): + return + + self.notify('bad-certificate') + self._cancellable.cancel() + + def _on_websocket_message(self, _websocket, _type, message): + data = message.get_data().decode() + self._log_stanza(data) + + if self._input_closed: + log.warning('Received data after stream closed') + return + + self.notify('data-received', data) + + @staticmethod + def _on_websocket_pong(_websocket, _message): + log.info('Pong received') + + def _on_websocket_closed(self, websocket): + log.info('Closed %s', get_websocket_close_string(websocket)) + self._finalize('disconnected') + + @staticmethod + def _on_websocket_closing(_websocket): + log.info('Closing') + + def _on_websocket_error(self, websocket, error): + log.error(error) + if self._state not in (TCPState.DISCONNECTED, TCPState.DISCONNECTING): + self._finalize('disconnected') + + def send(self, stanza, now=False): + if self._state in (TCPState.DISCONNECTED, TCPState.DISCONNECTING): + log.warning('send() not possible in state: %s', self._state) + return + + data = str(stanza) + self._websocket.send_text(data) + self._log_stanza(data, received=False) + self.notify('data-sent', data) + + def disconnect(self): + if self._state == TCPState.CONNECTING: + self.state = TCPState.DISCONNECTING + self._cancellable.cancel() + return + + if self._state in (TCPState.DISCONNECTED, TCPState.DISCONNECTING): + log.warning('Called disconnect on state: %s', self._state) + return + + self._websocket.close(Soup.WebsocketCloseCode.NORMAL, None) + self.state = TCPState.DISCONNECTING + + def _check_for_shutdown(self): + if self._input_closed and self._output_closed: + self._websocket.close(Soup.WebsocketCloseCode.NORMAL, None) + + def shutdown_input(self): + log.info('Shutdown input') + self._input_closed = True + self._check_for_shutdown() + + def shutdown_output(self): + self.state = TCPState.DISCONNECTING + log.info('Shutdown output') + self._output_closed = True + + def _finalize(self, signal_name): + self.state = TCPState.DISCONNECTED + self.notify(signal_name) + self.destroy() + + def destroy(self): + super().destroy() + self._session.abort() + self._session = None + self._websocket = None |