# Copyright (C) 2019 Philipp Hörist # # This file is part of the PGP Gajim Plugin. # # PGP Gajim Plugin is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation; version 3 only. # # PGP Gajim Plugin is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with PGP Gajim Plugin. If not, see . import json from pathlib import Path from gajim.common import app from gajim.common import configpaths CURRENT_STORE_VERSION = 3 class KeyResolveError(Exception): pass class KeyStore: def __init__(self, account, own_jid, log, list_keys_func): self._list_keys_func = list_keys_func self._log = log self._account = account own_bare_jid = own_jid.bare path = Path(configpaths.get('PLUGINS_DATA')) / 'pgplegacy' / own_bare_jid if not path.exists(): path.mkdir(parents=True) self._store_path = path / 'store' if self._store_path.exists(): # having store v2 or higher with self._store_path.open('r') as file: try: self._store = json.load(file) except Exception: log.exception('Could not load config') self._store = self._empty_store() ver = self._store.get('_version', 2) if ver > CURRENT_STORE_VERSION: raise Exception('Unknown store version! ' 'Please upgrade pgp plugin.') elif ver == 2: self._migrate_v2_store() self._save_store() elif ver != CURRENT_STORE_VERSION: # garbled version self._store = self._empty_store() log.warning('Bad pgp key store version. Initializing new.') else: # having store v1 or fresh install self._store = self._empty_store() self._migrate_v1_store() self._migrate_v2_store() self._save_store() @staticmethod def _empty_store(): return { '_version': CURRENT_STORE_VERSION, 'own_key_data': None, 'contact_key_data': {}, } def _migrate_v1_store(self): keys = {} attached_keys = app.settings.get_account_setting( self._account, 'attached_gpg_keys') if not attached_keys: return attached_keys = attached_keys.split() for i in range(len(attached_keys) // 2): keys[attached_keys[2 * i]] = attached_keys[2 * i + 1] for jid, key_id in keys.items(): self._set_contact_key_data_nosync(jid, (key_id, '')) own_key_id = app.settings.get_account_setting(self._account, 'keyid') own_key_user = app.settings.get_account_setting( self._account, 'keyname') if own_key_id: self._set_own_key_data_nosync((own_key_id, own_key_user)) attached_keys = app.settings.set_account_setting( self._account, 'attached_gpg_keys', '') self._log.info('Migration from store v1 was successful') def _migrate_v2_store(self): own_key_data = self.get_own_key_data() if own_key_data is not None: own_key_id, own_key_user = (own_key_data['key_id'], own_key_data['key_user']) try: own_key_fp = self._resolve_short_id(own_key_id, has_secret=True) self._set_own_key_data_nosync((own_key_fp, own_key_user)) except KeyResolveError: self._set_own_key_data_nosync(None) prune_list = [] for dict_key, key_data in self._store['contact_key_data'].items(): try: key_data['key_id'] = self._resolve_short_id(key_data['key_id']) except KeyResolveError: prune_list.append(dict_key) for dict_key in prune_list: del self._store['contact_key_data'][dict_key] self._store['_version'] = CURRENT_STORE_VERSION self._log.info('Migration from store v2 was successful') def _save_store(self): with self._store_path.open('w') as file: json.dump(self._store, file) def _get_dict_key(self, jid): return '%s-%s' % (self._account, jid) def _resolve_short_id(self, short_id, has_secret=False): candidates = self._list_keys_func( secret=has_secret, keys=(short_id,)).fingerprints if len(candidates) == 1: return candidates[0] elif len(candidates) > 1: self._log.critical('Key collision during migration. ' 'Key ID is %s. Removing binding...', repr(short_id)) else: self._log.warning('Key %s was not found during migration. ' 'Removing binding...', repr(short_id)) raise KeyResolveError def set_own_key_data(self, key_data): self._set_own_key_data_nosync(key_data) self._save_store() def _set_own_key_data_nosync(self, key_data): if key_data is None: self._store['own_key_data'] = None else: self._store['own_key_data'] = { 'key_id': key_data[0], 'key_user': key_data[1] } def get_own_key_data(self): return self._store['own_key_data'] def get_contact_key_data(self, jid): key_ids = self._store['contact_key_data'] dict_key = self._get_dict_key(jid) return key_ids.get(dict_key) def set_contact_key_data(self, jid, key_data): self._set_contact_key_data_nosync(jid, key_data) self._save_store() def _set_contact_key_data_nosync(self, jid, key_data): key_ids = self._store['contact_key_data'] dict_key = self._get_dict_key(jid) if key_data is None: key_ids[dict_key] = None else: key_ids[dict_key] = { 'key_id': key_data[0], 'key_user': key_data[1] }