diff options
author | lovetox <philipp@hoerist.com> | 2022-04-18 15:04:43 +0300 |
---|---|---|
committer | lovetox <philipp@hoerist.com> | 2022-04-18 15:05:10 +0300 |
commit | 572301fe895377061410483f3680799afe422993 (patch) | |
tree | 070810d76df1a30787976d57ca7361f8b151897d | |
parent | be9afe87193c10dd5578dcdf05a194203655b497 (diff) |
[openpgp] Be more resistant against invalid keys
-rw-r--r-- | openpgp/backend/gpgme.py | 27 | ||||
-rw-r--r-- | openpgp/backend/pygpg.py | 118 | ||||
-rw-r--r-- | openpgp/backend/util.py | 13 |
3 files changed, 99 insertions, 59 deletions
diff --git a/openpgp/backend/gpgme.py b/openpgp/backend/gpgme.py index c75cc06..54b8619 100644 --- a/openpgp/backend/gpgme.py +++ b/openpgp/backend/gpgme.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with OpenPGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>. +from typing import Optional import logging @@ -22,6 +23,7 @@ from nbxmpp.protocol import JID import gpg from gpg.results import ImportResult +from openpgp.backend.util import parse_uid from openpgp.modules.util import DecryptionFailed log = logging.getLogger('gajim.p.openpgp.gpgme') @@ -39,10 +41,17 @@ class KeyringItem: except Exception: return False - def _get_uid(self): + def is_valid(self, jid: JID) -> bool: + if not self.is_xmpp_key: + return False + return jid == self.jid + + def _get_uid(self) -> Optional[str]: for uid in self._key.uids: - if uid.uid.startswith('xmpp:'): - return uid.uid + try: + return parse_uid(uid.uid) + except Exception: + pass @property def fingerprint(self): @@ -56,7 +65,7 @@ class KeyringItem: @property def jid(self): if self._uid is not None: - return JID.from_string(self._uid[5:]) + return JID.from_string(self._uid) def __hash__(self): return hash(self.fingerprint) @@ -118,6 +127,7 @@ class GPGME: keyring_item = KeyringItem(key) if not keyring_item.is_xmpp_key: log.warning('Key not suited for xmpp: %s', key.fpr) + self.delete_key(keyring_item.fingerprint) continue keys.append(keyring_item) @@ -191,10 +201,17 @@ class GPGME: fingerprint = result.imports[0].fpr key = self.get_key(fingerprint) + item = KeyringItem(key) + if not item.is_valid(jid): + log.warning('Invalid key found') + log.warning(key) + self.delete_key(item.fingerprint) + return - return KeyringItem(key) + return item def delete_key(self, fingerprint): + log.info('Delete Key: %s', fingerprint) key = self.get_key(fingerprint) with gpg.Context(**self._context_args) as context: context.op_delete(key, True) diff --git a/openpgp/backend/pygpg.py b/openpgp/backend/pygpg.py index e6850c1..009114f 100644 --- a/openpgp/backend/pygpg.py +++ b/openpgp/backend/pygpg.py @@ -14,21 +14,68 @@ # You should have received a copy of the GNU General Public License # along with OpenPGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>. -import os +from typing import Optional + import logging -from collections import namedtuple import gnupg +from nbxmpp.protocol import JID +from openpgp.backend.util import parse_uid from openpgp.modules.util import DecryptionFailed + log = logging.getLogger('gajim.p.openpgp.pygnupg') if log.getEffectiveLevel() == logging.DEBUG: log = logging.getLogger('gnupg') log.addHandler(logging.StreamHandler()) log.setLevel(logging.DEBUG) -KeyringItem = namedtuple('KeyringItem', 'jid keyid fingerprint') + +class KeyringItem: + def __init__(self, key): + self._key = key + self._uid = self._get_uid() + + @property + def is_xmpp_key(self) -> bool: + try: + return self.jid is not None + except Exception: + return False + + def is_valid(self, jid: JID) -> bool: + if not self.is_xmpp_key: + return False + return jid == self.jid + + @property + def keyid(self) -> str: + return self._key['keyid'] + + def _get_uid(self) -> Optional[str]: + for uid in self._key['uids']: + try: + return parse_uid(uid) + except Exception: + pass + + @property + def fingerprint(self): + return self._key['fingerprint'] + + @property + def uid(self): + if self._uid is not None: + return self._uid + + @property + def jid(self): + if self._uid is not None: + return JID.from_string(self._uid) + + def __hash__(self): + return hash(self.fingerprint) class PythonGnuPG(gnupg.GPG): @@ -93,22 +140,15 @@ class PythonGnuPG(gnupg.GPG): result = super().list_keys(secret=secret) keys = [] for key in result: - item = self._make_keyring_item(key) - if item is None: + item = KeyringItem(key) + if not item.is_xmpp_key: + log.warning('Invalid key found, deleting key') + log.warning(key) + self.delete_key(item.fingerprint) continue - keys.append(self._make_keyring_item(key)) + keys.append(item) return keys - @staticmethod - def _make_keyring_item(key): - userid = key['uids'][0] - if not userid.startswith('xmpp:'): - log.warning('Incorrect userid: %s found for key, ' - 'key will be ignored', userid) - return - jid = userid[5:] - return KeyringItem(jid, key['keyid'], key['fingerprint']) - def import_key(self, data, jid): log.info('Import key from %s', jid) result = super().import_keys(data) @@ -117,45 +157,15 @@ class PythonGnuPG(gnupg.GPG): log.error(result) return - if not self.validate_key(data, str(jid)): - return None key = self.get_key(result.results[0]['fingerprint']) - return self._make_keyring_item(key[0]) - - def validate_key(self, public_key, jid): - import tempfile - temppath = os.path.join(tempfile.gettempdir(), 'temp_pubkey') - with open(temppath, 'wb') as tempfile: - tempfile.write(public_key) - - result = self.scan_keys(temppath) - if result: - key_found = False - for uid in result.uids: - if uid.startswith('xmpp:'): - if uid[5:] == jid: - key_found = True - else: - log.warning('Found wrong userid in key: %s != %s', - uid[5:], jid) - log.debug(result) - os.remove(temppath) - return False - - if not key_found: - log.warning('No valid userid found in key') - log.debug(result) - os.remove(temppath) - return False - - log.info('Key validation succesful') - os.remove(temppath) - return True - - log.warning('Invalid key data: %s') - log.debug(result) - os.remove(temppath) - return False + item = KeyringItem(key[0]) + if not item.is_valid(jid): + log.warning('Invalid key found, deleting key') + log.warning(key) + self.delete_key(item.fingerprint) + return + + return item def get_own_key_details(self): result = super().list_keys(secret=True) diff --git a/openpgp/backend/util.py b/openpgp/backend/util.py new file mode 100644 index 0000000..e4cc329 --- /dev/null +++ b/openpgp/backend/util.py @@ -0,0 +1,13 @@ + +from __future__ import annotations + + +def parse_uid(uid: str, compat=False) -> str: + if uid.startswith('xmpp:'): + return uid[5:] + + # Compat with uids of form "Name <xmpp:my@jid.com>" + if compat and '<xmpp:' in uid and uid.endswith('>'): + return uid[:-1].split('<xmpp:', maxsplit=1)[1] + + raise ValueError('Uknown UID format: %s' % uid) |