Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/gajim-plugins.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlovetox <philipp@hoerist.com>2022-04-18 15:04:43 +0300
committerlovetox <philipp@hoerist.com>2022-04-18 15:05:10 +0300
commit572301fe895377061410483f3680799afe422993 (patch)
tree070810d76df1a30787976d57ca7361f8b151897d
parentbe9afe87193c10dd5578dcdf05a194203655b497 (diff)
[openpgp] Be more resistant against invalid keys
-rw-r--r--openpgp/backend/gpgme.py27
-rw-r--r--openpgp/backend/pygpg.py118
-rw-r--r--openpgp/backend/util.py13
3 files changed, 99 insertions, 59 deletions
diff --git a/openpgp/backend/gpgme.py b/openpgp/backend/gpgme.py
index c75cc06..54b8619 100644
--- a/openpgp/backend/gpgme.py
+++ b/openpgp/backend/gpgme.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with OpenPGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
+from typing import Optional
import logging
@@ -22,6 +23,7 @@ from nbxmpp.protocol import JID
import gpg
from gpg.results import ImportResult
+from openpgp.backend.util import parse_uid
from openpgp.modules.util import DecryptionFailed
log = logging.getLogger('gajim.p.openpgp.gpgme')
@@ -39,10 +41,17 @@ class KeyringItem:
except Exception:
return False
- def _get_uid(self):
+ def is_valid(self, jid: JID) -> bool:
+ if not self.is_xmpp_key:
+ return False
+ return jid == self.jid
+
+ def _get_uid(self) -> Optional[str]:
for uid in self._key.uids:
- if uid.uid.startswith('xmpp:'):
- return uid.uid
+ try:
+ return parse_uid(uid.uid)
+ except Exception:
+ pass
@property
def fingerprint(self):
@@ -56,7 +65,7 @@ class KeyringItem:
@property
def jid(self):
if self._uid is not None:
- return JID.from_string(self._uid[5:])
+ return JID.from_string(self._uid)
def __hash__(self):
return hash(self.fingerprint)
@@ -118,6 +127,7 @@ class GPGME:
keyring_item = KeyringItem(key)
if not keyring_item.is_xmpp_key:
log.warning('Key not suited for xmpp: %s', key.fpr)
+ self.delete_key(keyring_item.fingerprint)
continue
keys.append(keyring_item)
@@ -191,10 +201,17 @@ class GPGME:
fingerprint = result.imports[0].fpr
key = self.get_key(fingerprint)
+ item = KeyringItem(key)
+ if not item.is_valid(jid):
+ log.warning('Invalid key found')
+ log.warning(key)
+ self.delete_key(item.fingerprint)
+ return
- return KeyringItem(key)
+ return item
def delete_key(self, fingerprint):
+ log.info('Delete Key: %s', fingerprint)
key = self.get_key(fingerprint)
with gpg.Context(**self._context_args) as context:
context.op_delete(key, True)
diff --git a/openpgp/backend/pygpg.py b/openpgp/backend/pygpg.py
index e6850c1..009114f 100644
--- a/openpgp/backend/pygpg.py
+++ b/openpgp/backend/pygpg.py
@@ -14,21 +14,68 @@
# You should have received a copy of the GNU General Public License
# along with OpenPGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
-import os
+from typing import Optional
+
import logging
-from collections import namedtuple
import gnupg
+from nbxmpp.protocol import JID
+from openpgp.backend.util import parse_uid
from openpgp.modules.util import DecryptionFailed
+
log = logging.getLogger('gajim.p.openpgp.pygnupg')
if log.getEffectiveLevel() == logging.DEBUG:
log = logging.getLogger('gnupg')
log.addHandler(logging.StreamHandler())
log.setLevel(logging.DEBUG)
-KeyringItem = namedtuple('KeyringItem', 'jid keyid fingerprint')
+
+class KeyringItem:
+ def __init__(self, key):
+ self._key = key
+ self._uid = self._get_uid()
+
+ @property
+ def is_xmpp_key(self) -> bool:
+ try:
+ return self.jid is not None
+ except Exception:
+ return False
+
+ def is_valid(self, jid: JID) -> bool:
+ if not self.is_xmpp_key:
+ return False
+ return jid == self.jid
+
+ @property
+ def keyid(self) -> str:
+ return self._key['keyid']
+
+ def _get_uid(self) -> Optional[str]:
+ for uid in self._key['uids']:
+ try:
+ return parse_uid(uid)
+ except Exception:
+ pass
+
+ @property
+ def fingerprint(self):
+ return self._key['fingerprint']
+
+ @property
+ def uid(self):
+ if self._uid is not None:
+ return self._uid
+
+ @property
+ def jid(self):
+ if self._uid is not None:
+ return JID.from_string(self._uid)
+
+ def __hash__(self):
+ return hash(self.fingerprint)
class PythonGnuPG(gnupg.GPG):
@@ -93,22 +140,15 @@ class PythonGnuPG(gnupg.GPG):
result = super().list_keys(secret=secret)
keys = []
for key in result:
- item = self._make_keyring_item(key)
- if item is None:
+ item = KeyringItem(key)
+ if not item.is_xmpp_key:
+ log.warning('Invalid key found, deleting key')
+ log.warning(key)
+ self.delete_key(item.fingerprint)
continue
- keys.append(self._make_keyring_item(key))
+ keys.append(item)
return keys
- @staticmethod
- def _make_keyring_item(key):
- userid = key['uids'][0]
- if not userid.startswith('xmpp:'):
- log.warning('Incorrect userid: %s found for key, '
- 'key will be ignored', userid)
- return
- jid = userid[5:]
- return KeyringItem(jid, key['keyid'], key['fingerprint'])
-
def import_key(self, data, jid):
log.info('Import key from %s', jid)
result = super().import_keys(data)
@@ -117,45 +157,15 @@ class PythonGnuPG(gnupg.GPG):
log.error(result)
return
- if not self.validate_key(data, str(jid)):
- return None
key = self.get_key(result.results[0]['fingerprint'])
- return self._make_keyring_item(key[0])
-
- def validate_key(self, public_key, jid):
- import tempfile
- temppath = os.path.join(tempfile.gettempdir(), 'temp_pubkey')
- with open(temppath, 'wb') as tempfile:
- tempfile.write(public_key)
-
- result = self.scan_keys(temppath)
- if result:
- key_found = False
- for uid in result.uids:
- if uid.startswith('xmpp:'):
- if uid[5:] == jid:
- key_found = True
- else:
- log.warning('Found wrong userid in key: %s != %s',
- uid[5:], jid)
- log.debug(result)
- os.remove(temppath)
- return False
-
- if not key_found:
- log.warning('No valid userid found in key')
- log.debug(result)
- os.remove(temppath)
- return False
-
- log.info('Key validation succesful')
- os.remove(temppath)
- return True
-
- log.warning('Invalid key data: %s')
- log.debug(result)
- os.remove(temppath)
- return False
+ item = KeyringItem(key[0])
+ if not item.is_valid(jid):
+ log.warning('Invalid key found, deleting key')
+ log.warning(key)
+ self.delete_key(item.fingerprint)
+ return
+
+ return item
def get_own_key_details(self):
result = super().list_keys(secret=True)
diff --git a/openpgp/backend/util.py b/openpgp/backend/util.py
new file mode 100644
index 0000000..e4cc329
--- /dev/null
+++ b/openpgp/backend/util.py
@@ -0,0 +1,13 @@
+
+from __future__ import annotations
+
+
+def parse_uid(uid: str, compat=False) -> str:
+ if uid.startswith('xmpp:'):
+ return uid[5:]
+
+ # Compat with uids of form "Name <xmpp:my@jid.com>"
+ if compat and '<xmpp:' in uid and uid.endswith('>'):
+ return uid[:-1].split('<xmpp:', maxsplit=1)[1]
+
+ raise ValueError('Uknown UID format: %s' % uid)