# Copyright (C) 2019 Philipp Hörist # # This file is part of nbxmpp. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 3 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; If not, see . import logging import re import time from xml.parsers.expat import ExpatError from gi.repository import GLib from nbxmpp.simplexml import NodeBuilder from nbxmpp.simplexml import Node from nbxmpp.protocol import NS_STREAMS from nbxmpp.protocol import NS_CLIENT from nbxmpp.protocol import NS_XMPP_STREAMS from nbxmpp.protocol import NodeProcessed from nbxmpp.protocol import InvalidFrom from nbxmpp.protocol import InvalidJid from nbxmpp.protocol import InvalidStanza from nbxmpp.protocol import Iq from nbxmpp.protocol import Presence from nbxmpp.protocol import Message from nbxmpp.protocol import Protocol from nbxmpp.protocol import Error from nbxmpp.protocol import StreamErrorNode from nbxmpp.protocol import ERR_FEATURE_NOT_IMPLEMENTED from nbxmpp.modules.eme import EME from nbxmpp.modules.http_auth import HTTPAuth from nbxmpp.modules.presence import BasePresence from nbxmpp.modules.message import BaseMessage from nbxmpp.modules.iq import BaseIq from nbxmpp.modules.nickname import Nickname from nbxmpp.modules.delay import Delay from nbxmpp.modules.muc import MUC from nbxmpp.modules.idle import Idle from nbxmpp.modules.pgplegacy import PGPLegacy from nbxmpp.modules.vcard_avatar import VCardAvatar from nbxmpp.modules.captcha import Captcha from nbxmpp.modules.entity_caps import EntityCaps from nbxmpp.modules.blocking import Blocking from nbxmpp.modules.pubsub import PubSub from nbxmpp.modules.activity import Activity from nbxmpp.modules.tune import Tune from nbxmpp.modules.mood import Mood from nbxmpp.modules.location import Location from nbxmpp.modules.user_avatar import UserAvatar from nbxmpp.modules.bookmarks import Bookmarks from nbxmpp.modules.openpgp import OpenPGP from nbxmpp.modules.omemo import OMEMO from nbxmpp.modules.annotations import Annotations from nbxmpp.modules.muclumbus import Muclumbus from nbxmpp.modules.software_version import SoftwareVersion from nbxmpp.modules.adhoc import AdHoc from nbxmpp.modules.ibb import IBB from nbxmpp.modules.discovery import Discovery from nbxmpp.modules.chat_markers import ChatMarkers from nbxmpp.modules.receipts import Receipts from nbxmpp.modules.oob import OOB from nbxmpp.modules.correction import Correction from nbxmpp.modules.attention import Attention from nbxmpp.modules.security_labels import SecurityLabels from nbxmpp.modules.chatstates import Chatstates from nbxmpp.modules.register import Register from nbxmpp.modules.http_upload import HTTPUpload from nbxmpp.modules.misc import unwrap_carbon from nbxmpp.modules.misc import unwrap_mam from nbxmpp.structs import StanzaTimeoutError from nbxmpp.util import get_properties_struct from nbxmpp.util import get_invalid_xml_regex from nbxmpp.util import is_websocket_close from nbxmpp.util import is_websocket_stream_error from nbxmpp.util import Observable log = logging.getLogger('nbxmpp.dispatcher') class StanzaDispatcher(Observable): """ Dispatches stanzas to handlers Signals: before-dispatch parsing-error stream-end """ def __init__(self, client): Observable.__init__(self, log) self._client = client self._modules = {} self._parser = None self._websocket_stream_error = None self._handlers = {} self._id_callbacks = {} self._dispatch_callback = None self._timeout_id = None self._stanza_types = { 'iq': Iq, 'message': Message, 'presence': Presence, 'error': StreamErrorNode, } self.invalid_chars_re = get_invalid_xml_regex() self._register_namespace('unknown') self._register_namespace(NS_STREAMS) self._register_namespace(NS_CLIENT) self._register_protocol('iq', Iq) self._register_protocol('presence', Presence) self._register_protocol('message', Message) self._register_modules() def set_dispatch_callback(self, callback): log.info('Set dispatch callback: %s', callback) self._dispatch_callback = callback def get_module(self, name): return self._modules[name] def _register_modules(self): self._modules['BasePresence'] = BasePresence(self._client) self._modules['BaseMessage'] = BaseMessage(self._client) self._modules['BaseIq'] = BaseIq(self._client) self._modules['EME'] = EME(self._client) self._modules['HTTPAuth'] = HTTPAuth(self._client) self._modules['Nickname'] = Nickname(self._client) self._modules['MUC'] = MUC(self._client) self._modules['Delay'] = Delay(self._client) self._modules['Captcha'] = Captcha(self._client) self._modules['Idle'] = Idle(self._client) self._modules['PGPLegacy'] = PGPLegacy(self._client) self._modules['VCardAvatar'] = VCardAvatar(self._client) self._modules['EntityCaps'] = EntityCaps(self._client) self._modules['Blocking'] = Blocking(self._client) self._modules['PubSub'] = PubSub(self._client) self._modules['Mood'] = Mood(self._client) self._modules['Activity'] = Activity(self._client) self._modules['Tune'] = Tune(self._client) self._modules['Location'] = Location(self._client) self._modules['UserAvatar'] = UserAvatar(self._client) self._modules['Bookmarks'] = Bookmarks(self._client) self._modules['OpenPGP'] = OpenPGP(self._client) self._modules['OMEMO'] = OMEMO(self._client) self._modules['Annotations'] = Annotations(self._client) self._modules['Muclumbus'] = Muclumbus(self._client) self._modules['SoftwareVersion'] = SoftwareVersion(self._client) self._modules['AdHoc'] = AdHoc(self._client) self._modules['IBB'] = IBB(self._client) self._modules['Discovery'] = Discovery(self._client) self._modules['ChatMarkers'] = ChatMarkers(self._client) self._modules['Receipts'] = Receipts(self._client) self._modules['OOB'] = OOB(self._client) self._modules['Correction'] = Correction(self._client) self._modules['Attention'] = Attention(self._client) self._modules['SecurityLabels'] = SecurityLabels(self._client) self._modules['Chatstates'] = Chatstates(self._client) self._modules['Register'] = Register(self._client) self._modules['HTTPUpload'] = HTTPUpload(self._client) for instance in self._modules.values(): for handler in instance.handlers: self.register_handler(*handler) def reset_parser(self): self._remove_timeout_source() if self._parser is not None: self._parser.dispatch = None self._parser.destroy() self._parser = None self._id_callbacks.clear() self._parser = NodeBuilder(dispatch_depth=2, finished=False) self._parser.dispatch = self.dispatch def replace_non_character(self, data): return re.sub(self.invalid_chars_re, '\ufffd', data) def process_data(self, data): # Parse incoming data data = self.replace_non_character(data) if self._client.is_websocket: stanza = Node(node=data) if is_websocket_stream_error(stanza): for tag in stanza.getChildren(): name = tag.getName() if name != 'text' and tag.getNamespace() == NS_XMPP_STREAMS: self._websocket_stream_error = name elif is_websocket_close(stanza): log.info('Stream received') self.notify('stream-end', self._websocket_stream_error) return self.dispatch(stanza) return try: self._parser.Parse(data) except (ExpatError, ValueError) as error: log.error('XML parsing error: %s', error) self.notify('parsing-error', error) return # end stream:stream tag received if self._parser.has_received_endtag(): log.info('End of stream: %s', self._parser.streamError) self.notify('stream-end', self._parser.streamError) return def _register_namespace(self, xmlns): """ Setup handler structure for namespace """ log.debug('Register namespace "%s"', xmlns) self._handlers[xmlns] = {} self._register_protocol('error', Protocol, xmlns=xmlns) self._register_protocol('unknown', Protocol, xmlns=xmlns) self._register_protocol('default', Protocol, xmlns=xmlns) def _register_protocol(self, tag_name, protocol, xmlns=None): """ Register protocol for top level tag names """ if xmlns is None: xmlns = NS_CLIENT log.debug('Register protocol "%s (%s)" as %s', tag_name, xmlns, protocol) self._handlers[xmlns][tag_name] = {'type': protocol, 'default': []} def register_handler(self, name, handler, typ='', ns='', xmlns=None, priority=50): """ Register handler :param name: name of top level tag, example: iq, message, presence :param handler: callback :param typ: value of stanza's "type" attribute. If not specified any value will match :param ns: Namespace of child that stanza must contain :param xmlns: XML namespace, only needed if not jabber:client :param priority: The priority of the handler, higher get called later """ if not xmlns: xmlns = NS_CLIENT if not typ and not ns: typ = 'default' log.debug('Register handler %s for "%s" type->%s ns->%s(%s) priority->%s', handler, name, typ, ns, xmlns, priority) if xmlns not in self._handlers: self._register_namespace(xmlns) if name not in self._handlers[xmlns]: self._register_protocol(name, Protocol, xmlns) specific = typ + ns if specific not in self._handlers[xmlns][name]: self._handlers[xmlns][name][specific] = [] self._handlers[xmlns][name][specific].append( {'func': handler, 'priority': priority, 'specific': specific}) def unregister_handler(self, name, handler, typ='', ns='', xmlns=None): """ Unregister handler """ if not xmlns: xmlns = NS_CLIENT if not typ and not ns: typ = 'default' specific = typ + ns try: self._handlers[xmlns][name][specific] except KeyError: return for handler_dict in self._handlers[xmlns][name][specific]: if handler_dict['func'] != handler: return try: self._handlers[xmlns][name][specific].remove(handler_dict) except ValueError: log.warning('Unregister failed: %s for "%s" type->%s ns->%s(%s)', handler, name, typ, ns, xmlns) else: log.debug('Unregister handler %s for "%s" type->%s ns->%s(%s)', handler, name, typ, ns, xmlns) def _default_handler(self, stanza): """ Return stanza back to the sender with error """ if stanza.getType() in ('get', 'set'): self._client.send_stanza(Error(stanza, ERR_FEATURE_NOT_IMPLEMENTED)) def dispatch(self, stanza): self.notify('before-dispatch', stanza) if self._dispatch_callback is not None: name = stanza.getName() protocol_class = self._stanza_types.get(name) if protocol_class is not None: stanza = protocol_class(node=stanza) self._dispatch_callback(stanza) return # Count stanza self._client._smacks.count_incoming(stanza.getName()) name = stanza.getName() xmlns = stanza.getNamespace() if xmlns not in self._handlers: log.warning('Unknown namespace: %s', xmlns) xmlns = 'unknown' if name not in self._handlers[xmlns]: log.warning('Unknown stanza: %s', stanza) name = 'unknown' # Convert simplexml to Protocol object try: stanza = self._handlers[xmlns][name]['type'](node=stanza) except InvalidJid: log.warning('Invalid JID, ignoring stanza') log.warning(stanza) return own_jid = self._client.get_bound_jid() properties = get_properties_struct(name) if name == 'iq': if stanza.getFrom() is None and own_jid is not None: stanza.setFrom(own_jid.getBare()) if name == 'message': # https://tools.ietf.org/html/rfc6120#section-8.1.1.1 # If the stanza does not include a 'to' address then the client MUST # treat it as if the 'to' address were included with a value of the # client's full JID. to = stanza.getTo() if to is None: stanza.setTo(own_jid) elif not to.bareMatch(own_jid): log.warning('Message addressed to someone else: %s', stanza) return if stanza.getFrom() is None: stanza.setFrom(own_jid.getBare()) # Unwrap carbon try: stanza, properties.carbon = unwrap_carbon(stanza, own_jid) except (InvalidFrom, InvalidJid) as exc: log.warning(exc) log.warning(stanza) return except NodeProcessed as exc: log.info(exc) return # Unwrap mam try: stanza, properties.mam = unwrap_mam(stanza, own_jid) except (InvalidStanza, InvalidJid) as exc: log.warning(exc) log.warning(stanza) return typ = stanza.getType() if name == 'message' and not typ: typ = 'normal' elif not typ: typ = '' stanza.props = stanza.getProperties() log.debug('type: %s, properties: %s', typ, stanza.props) # Process callbacks _id = stanza.getID() func, _timeout, user_data = self._id_callbacks.pop( _id, (None, None, {})) if user_data is None: user_data = {} if func is not None: try: func(self._client, stanza, **user_data) except Exception: log.exception('Error while handling stanza') return # Gather specifics depending on stanza properties specifics = ['default'] if typ and typ in self._handlers[xmlns][name]: specifics.append(typ) for prop in stanza.props: if prop in self._handlers[xmlns][name]: specifics.append(prop) if typ and typ + prop in self._handlers[xmlns][name]: specifics.append(typ + prop) # Create the handler chain chain = [] chain += self._handlers[xmlns]['default']['default'] for specific in specifics: chain += self._handlers[xmlns][name][specific] # Sort chain with priority chain.sort(key=lambda x: x['priority']) for handler in chain: log.info('Call handler: %s', handler['func'].__qualname__) try: handler['func'](self._client, stanza, properties) except NodeProcessed: return except Exception: log.exception('Handler exception:') return # Stanza was not processed call default handler self._default_handler(stanza) def add_callback_for_id(self, id_, func, timeout, user_data): if timeout is not None and self._timeout_id is None: log.info('Add timeout source') self._timeout_id = GLib.timeout_add_seconds( 1, self._timeout_check) timeout = time.monotonic() + timeout self._id_callbacks[id_] = (func, timeout, user_data) def _timeout_check(self): log.info('Run timeout check') if not self._id_callbacks: log.info('Remove timeout source, no callbacks scheduled') self._timeout_id = None return False for id_ in list(self._id_callbacks.keys()): func, timeout, user_data = self._id_callbacks.get(id_) if timeout is None: continue if user_data is None: user_data = {} if timeout < time.monotonic(): self._id_callbacks.pop(id_) func(self._client, StanzaTimeoutError(id_), **user_data) return True def _remove_timeout_source(self): if self._timeout_id is not None: GLib.source_remove(self._timeout_id) self._timeout_id = None def cleanup(self): self._client = None self._modules = {} self._parser = None self._id_callbacks.clear() self._dispatch_callback = None self._handlers.clear() self._remove_timeout_source() self.remove_subscriptions()