Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/gajim-plugins.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'omemo/modules/omemo.py')
-rw-r--r--omemo/modules/omemo.py514
1 files changed, 0 insertions, 514 deletions
diff --git a/omemo/modules/omemo.py b/omemo/modules/omemo.py
deleted file mode 100644
index 548d4af..0000000
--- a/omemo/modules/omemo.py
+++ /dev/null
@@ -1,514 +0,0 @@
-# Copyright (C) 2019 Philipp Hörist <philipp AT hoerist.com>
-#
-# This file is part of OMEMO Gajim Plugin.
-#
-# OMEMO Gajim Plugin is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published
-# by the Free Software Foundation; version 3 only.
-#
-# OMEMO Gajim Plugin is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with OMEMO Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
-
-# XEP-0384: OMEMO Encryption
-
-import time
-from pathlib import Path
-
-from nbxmpp.namespaces import Namespace
-from nbxmpp.protocol import NodeProcessed
-from nbxmpp.errors import StanzaError
-from nbxmpp.const import PresenceType
-from nbxmpp.const import Affiliation
-from nbxmpp.structs import StanzaHandler
-from nbxmpp.modules.omemo import create_omemo_message
-from nbxmpp.modules.omemo import get_key_transport_message
-from nbxmpp.modules.util import is_error
-
-from gajim.common import app
-from gajim.common import configpaths
-from gajim.common.events import MessageNotSent
-from gajim.common.const import EncryptionData
-from gajim.common.const import Trust as GajimTrust
-from gajim.common.modules.base import BaseModule
-from gajim.common.modules.util import event_node
-from gajim.common.modules.util import as_task
-
-from gajim.plugins.plugins_i18n import _
-
-from omemo.backend.state import OmemoState
-from omemo.backend.state import KeyExchangeMessage
-from omemo.backend.state import SelfMessage
-from omemo.backend.state import MessageNotForDevice
-from omemo.backend.state import DecryptionFailed
-from omemo.backend.state import DuplicateMessage
-from omemo.backend.util import Trust
-from omemo.modules.events import OMEMONewFingerprint
-from omemo.modules.util import prepare_stanza
-
-
-ALLOWED_TAGS = [
- ('request', Namespace.RECEIPTS),
- ('active', Namespace.CHATSTATES),
- ('gone', Namespace.CHATSTATES),
- ('inactive', Namespace.CHATSTATES),
- ('paused', Namespace.CHATSTATES),
- ('composing', Namespace.CHATSTATES),
- ('markable', Namespace.CHATMARKERS),
- ('no-store', Namespace.HINTS),
- ('store', Namespace.HINTS),
- ('no-copy', Namespace.HINTS),
- ('no-permanent-store', Namespace.HINTS),
- ('replace', Namespace.CORRECT),
- ('thread', None),
- ('origin-id', Namespace.SID),
-]
-
-ENCRYPTION_NAME = 'OMEMO'
-
-# Module name
-name = 'OMEMO'
-zeroconf = False
-
-
-class OMEMO(BaseModule):
-
- _nbxmpp_extends = 'OMEMO'
- _nbxmpp_methods = [
- 'set_devicelist',
- 'request_devicelist',
- 'set_bundle',
- 'request_bundle',
- ]
-
- def __init__(self, client):
- BaseModule.__init__(self, client, plugin=True)
-
- self.handlers = [
- StanzaHandler(name='message',
- callback=self._message_received,
- ns=Namespace.OMEMO_TEMP,
- priority=9),
- StanzaHandler(name='presence',
- callback=self._on_muc_user_presence,
- ns=Namespace.MUC_USER,
- priority=48),
- ]
-
- self._register_pubsub_handler(self._devicelist_notification_received)
-
- self.available = True
-
- self._own_jid = self._client.get_own_jid().bare
- self._backend = self._get_backend()
-
- self._omemo_groupchats = set()
- self._muc_temp_store = {}
- self._query_for_bundles = []
- self._device_bundle_querys = []
- self._query_for_devicelists = []
-
- def get_own_jid(self, stripped=False):
- if stripped:
- return self._client.get_own_jid().bare
- return self._client.get_own_jid()
-
- @property
- def backend(self):
- return self._backend
-
- def _get_backend(self):
- data_dir = Path(configpaths.get('MY_DATA'))
- db_path = data_dir / f'omemo_{self._own_jid}.db'
- return OmemoState(self._own_jid, db_path, self._account, self)
-
- def is_omemo_groupchat(self, room_jid):
- return room_jid in self._omemo_groupchats
-
- def on_signed_in(self):
- self._log.info('Announce Support after Sign In')
- self._query_for_bundles = []
- self.set_bundle()
- self.request_devicelist()
-
- def activate(self):
- """ Method called when the Plugin is activated in the PluginManager
- """
- self._client.get_module('Caps').update_caps()
-
- if app.account_is_connected(self._account):
- self._log.info('Announce Support after Plugin Activation')
- self._query_for_bundles = []
- self.set_bundle()
- self.request_devicelist()
-
- def deactivate(self):
- """ Method called when the Plugin is deactivated in the PluginManager
- """
- self._query_for_bundles = []
-
- def encrypt_message(self, conn, event, callback, groupchat):
- if not event.message:
- callback(event)
- return
-
- omemo_message = self.backend.encrypt(event.jid, event.message)
- if omemo_message is None:
- app.ged.raise_event(
- MessageNotSent(client=conn,
- jid=event.jid,
- message=event.message,
- error=_('Encryption error'),
- time=time.time()))
- return
-
- create_omemo_message(event.stanza, omemo_message,
- node_whitelist=ALLOWED_TAGS)
-
- if groupchat:
- self._muc_temp_store[omemo_message.payload] = event.message
- else:
- event.xhtml = None
- event.encrypted = ENCRYPTION_NAME
- event.additional_data['encrypted'] = {
- 'name': ENCRYPTION_NAME,
- 'trust': GajimTrust[Trust.VERIFIED.name]}
-
- self._debug_print_stanza(event.stanza)
- callback(event)
-
- def _send_key_transport_message(self, typ, jid, devices):
- omemo_message = self.backend.encrypt_key_transport(jid, devices)
- if omemo_message is None:
- self._log.warning('Key transport message to %s (%s) failed',
- jid, devices)
- return
-
- transport_message = get_key_transport_message(typ, jid, omemo_message)
- self._log.info('Send key transport message %s (%s)', jid, devices)
- self._client.connection.send(transport_message)
-
- def _message_received(self, _con, stanza, properties):
- if not properties.is_omemo:
- return
-
- if properties.is_carbon_message and properties.carbon.is_sent:
- from_jid = self._own_jid
-
- elif properties.is_mam_message:
- from_jid = self._process_mam_message(properties)
-
- elif properties.from_muc:
- from_jid = self._process_muc_message(properties)
-
- else:
- from_jid = properties.jid.bare
-
- if from_jid is None:
- return
-
- self._log.info('Message received from: %s', from_jid)
-
- try:
- plaintext, fingerprint, trust = self.backend.decrypt_message(
- properties.omemo, from_jid)
- except (KeyExchangeMessage, DuplicateMessage):
- raise NodeProcessed
-
- except SelfMessage:
- if not properties.from_muc:
- raise NodeProcessed
-
- if properties.omemo.payload not in self._muc_temp_store:
- self._log.warning("Can't decrypt own GroupChat Message")
- return
-
- plaintext = self._muc_temp_store[properties.omemo.payload]
- fingerprint = self.backend.own_fingerprint
- trust = Trust.VERIFIED
- del self._muc_temp_store[properties.omemo.payload]
-
- except DecryptionFailed:
- return
-
- except MessageNotForDevice:
- if properties.omemo.payload is None:
- # Key Transport message for another device
- return
-
- plaintext = _('This message was encrypted with OMEMO, '
- 'but not for your device.')
- # Neither trust nor fingerprint can be verified if we didn't
- # successfully decrypt the message
- trust = Trust.UNTRUSTED
- fingerprint = None
-
- prepare_stanza(stanza, plaintext)
- self._debug_print_stanza(stanza)
- properties.encrypted = EncryptionData({'name': ENCRYPTION_NAME,
- 'fingerprint': fingerprint,
- 'trust': GajimTrust[trust.name]})
-
- def _process_muc_message(self, properties):
- resource = properties.jid.resource
- if properties.muc_ofrom is not None:
- # History Message from MUC
- return properties.muc_ofrom.bare
-
- contact = self._client.get_module('Contacts').get_contact(
- properties.jid)
- if contact.real_jid is not None:
- return contact.real_jid.bare
-
- self._log.info('Groupchat: Last resort trying to find SID in DB')
- from_jid = self.backend.storage.getJidFromDevice(properties.omemo.sid)
- if not from_jid:
- self._log.error("Can't decrypt GroupChat Message from %s", resource)
- return
- return from_jid
-
- def _process_mam_message(self, properties):
- self._log.info('Message received, archive: %s', properties.mam.archive)
- if properties.from_muc:
- self._log.info('MUC MAM Message received')
- if properties.muc_user is None or properties.muc_user.jid is None:
- self._log.warning('Received MAM Message which can '
- 'not be mapped to a real jid')
- return
- return properties.muc_user.jid.bare
- return properties.from_.bare
-
- def _on_muc_user_presence(self, _con, _stanza, properties):
- if properties.type == PresenceType.ERROR:
- return
-
- if properties.is_muc_destroyed:
- return
-
- room = properties.jid.bare
-
- if properties.muc_user is None or properties.muc_user.jid is None:
- # No real jid found
- return
-
- jid = properties.muc_user.jid.bare
- if properties.muc_user.affiliation in (Affiliation.OUTCAST,
- Affiliation.NONE):
- self.backend.remove_muc_member(room, jid)
- else:
- self.backend.add_muc_member(room, jid)
-
- if self.is_omemo_groupchat(room):
- if not self.is_contact_in_roster(jid):
- # Query Devicelists from JIDs not in our Roster
- self._log.info('%s not in Roster, query devicelist...', jid)
- self.request_devicelist(jid)
-
- def get_affiliation_list(self, room_jid):
- for affiliation in ('owner', 'admin', 'member'):
- self._nbxmpp('MUC').get_affiliation(
- room_jid,
- affiliation,
- callback=self._on_affiliations_received,
- user_data=room_jid)
-
- def _on_affiliations_received(self, task):
- room_jid = task.get_user_data()
- try:
- result = task.finish()
- except StanzaError as error:
- self._log.info('Affiliation request failed: %s', error)
- return
-
- for user_jid in result.users:
- jid = str(user_jid)
- self.backend.add_muc_member(room_jid, jid)
-
- if not self.is_contact_in_roster(jid):
- # Query Devicelists from JIDs not in our Roster
- self._log.info('%s not in Roster, query devicelist...', jid)
- self.request_devicelist(jid)
-
- def is_contact_in_roster(self, jid):
- if jid == self._own_jid:
- return True
-
- roster_item = self._client.get_module('Roster').get_item(jid)
- if roster_item is None:
- return False
-
- contact = self._client.get_module('Contacts').get_contact(jid)
- return contact.subscription == 'both'
-
- def on_muc_disco_update(self, event):
- self._check_if_omemo_capable(event.jid)
-
- def on_room_joined(self, contact):
- jid = str(contact.jid)
- self._check_if_omemo_capable(jid)
- if self.is_omemo_groupchat(jid):
- self.get_affiliation_list(jid)
-
- def _check_if_omemo_capable(self, jid):
- disco_info = app.storage.cache.get_last_disco_info(jid)
- if disco_info.muc_is_members_only and disco_info.muc_is_nonanonymous:
- self._log.info('OMEMO room discovered: %s', jid)
- self._omemo_groupchats.add(jid)
- else:
- self._log.info('OMEMO room removed due to config change: %s', jid)
- self._omemo_groupchats.discard(jid)
-
- def _check_for_missing_sessions(self, jid):
- devices_without_session = self.backend.devices_without_sessions(jid)
- for device_id in devices_without_session:
- if device_id in self._device_bundle_querys:
- continue
- self._device_bundle_querys.append(device_id)
- self.request_bundle(jid, device_id)
-
- def are_keys_missing(self, contact_jid):
- """ Checks if devicekeys are missing and queries the
- bundles
-
- Parameters
- ----------
- contact_jid : str
- bare jid of the contact
-
- Returns
- -------
- bool
- Returns True if there are no trusted Fingerprints
- """
-
- # Fetch Bundles of own other Devices
- if self._own_jid not in self._query_for_bundles:
-
- devices_without_session = self.backend \
- .devices_without_sessions(self._own_jid)
-
- self._query_for_bundles.append(self._own_jid)
-
- if devices_without_session:
- for device_id in devices_without_session:
- self.request_bundle(self._own_jid, device_id)
-
- # Fetch Bundles of contacts devices
- if contact_jid not in self._query_for_bundles:
-
- devices_without_session = self.backend \
- .devices_without_sessions(contact_jid)
-
- self._query_for_bundles.append(contact_jid)
-
- if devices_without_session:
- for device_id in devices_without_session:
- self.request_bundle(contact_jid, device_id)
-
- if self.backend.has_trusted_keys(contact_jid):
- return False
- return True
-
- def set_bundle(self):
- self._nbxmpp('OMEMO').set_bundle(self.backend.bundle,
- self.backend.own_device)
-
- @as_task
- def request_bundle(self, jid, device_id):
- _task = yield
-
- self._log.info('Fetch device bundle %s %s', device_id, jid)
-
- bundle = yield self._nbxmpp('OMEMO').request_bundle(
- jid,
- device_id)
-
- if is_error(bundle) or bundle is None:
- self._log.info('Bundle request failed: %s %s: %s',
- jid, device_id, bundle)
- return
-
- self.backend.build_session(jid, device_id, bundle)
- self._log.info('Session created for: %s', jid)
- # TODO: In MUC we should send a groupchat message
- self._send_key_transport_message('chat', jid, [device_id])
-
- # Trigger dialog to trust new Fingerprints if
- # the Chat Window is Open
-
- # TODO: This does not work anymore
- # ctrl = app.window.get_control(self._account, jid)
- # if ctrl:
- # app.ged.raise_event(OMEMONewFingerprint(chat_control=ctrl))
-
- def set_devicelist(self, devicelist=None):
- devicelist_ = set([self.backend.own_device])
- if devicelist is not None:
- devicelist_.update(devicelist)
- self._log.info('Publishing own devicelist: %s', devicelist_)
- self._nbxmpp('OMEMO').set_devicelist(devicelist_)
-
- def clear_devicelist(self):
- self.backend.update_devicelist(self._own_jid, [self.backend.own_device])
- self.set_devicelist()
-
- @as_task
- def request_devicelist(self, jid=None):
- _task = yield
-
- if jid is None:
- jid = self._own_jid
-
- if jid in self._query_for_devicelists:
- return
-
- self._query_for_devicelists.append(jid)
-
- devicelist = yield self._nbxmpp('OMEMO').request_devicelist(jid=jid)
- if is_error(devicelist) or devicelist is None:
- self._log.info('Devicelist request failed: %s %s', jid, devicelist)
- devicelist = []
-
- self._process_devicelist_update(jid, devicelist)
-
- @event_node(Namespace.OMEMO_TEMP_DL)
- def _devicelist_notification_received(self, _con, _stanza, properties):
- if properties.pubsub_event.retracted:
- return
-
- devicelist = properties.pubsub_event.data or []
-
- self._process_devicelist_update(str(properties.jid), devicelist)
-
- def _process_devicelist_update(self, jid, devicelist):
- own_devices = jid is None or self._client.get_own_jid().bare_match(jid)
- if own_devices:
- jid = self._own_jid
-
- self._log.info('Received device list for %s: %s', jid, devicelist)
- # Pass a copy, we need the full list for potential set_devicelist()
- self.backend.update_devicelist(jid, list(devicelist))
-
- if jid in self._query_for_bundles:
- self._query_for_bundles.remove(jid)
-
- if own_devices:
- if not self.backend.is_own_device_published:
- # Our own device_id is not in the list, it could be
- # overwritten by some other client
- self.set_devicelist(devicelist)
-
- self._check_for_missing_sessions(jid)
-
- def _debug_print_stanza(self, stanza):
- stanzastr = '\n' + stanza.__str__(fancy=True)
- stanzastr = stanzastr[0:-1]
- self._log.debug(stanzastr)
-
-
-def get_instance(*args, **kwargs):
- return OMEMO(*args, **kwargs), 'OMEMO'