Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/gajim-plugins.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'openpgp/backend')
-rw-r--r--openpgp/backend/__init__.py0
-rw-r--r--openpgp/backend/gpgme.py196
-rw-r--r--openpgp/backend/pygpg.py183
-rw-r--r--openpgp/backend/sql.py102
4 files changed, 481 insertions, 0 deletions
diff --git a/openpgp/backend/__init__.py b/openpgp/backend/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/openpgp/backend/__init__.py
diff --git a/openpgp/backend/gpgme.py b/openpgp/backend/gpgme.py
new file mode 100644
index 0000000..238d256
--- /dev/null
+++ b/openpgp/backend/gpgme.py
@@ -0,0 +1,196 @@
+# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com>
+#
+# This file is part of Gajim.
+#
+# Gajim is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation; version 3 only.
+#
+# Gajim is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
+
+# XEP-0373: OpenPGP for XMPP
+
+
+import io
+from collections import namedtuple
+import logging
+
+import gpg
+
+from gajim.common import app
+
+KeyringItem = namedtuple('KeyringItem',
+ 'type keyid userid fingerprint')
+
+log = logging.getLogger('gajim.plugin_system.openpgp.pgpme')
+
+
+class PGPContext():
+ def __init__(self, jid, gnuhome):
+ self.context = gpg.Context(home_dir=str(gnuhome))
+ # self.create_new_key()
+ # self.get_key_by_name()
+ # self.get_key_by_fingerprint()
+ self.export_public_key()
+
+ def create_new_key(self):
+ parms = """<GnupgKeyParms format="internal">
+ Key-Type: RSA
+ Key-Length: 2048
+ Subkey-Type: RSA
+ Subkey-Length: 2048
+ Name-Real: Joe Tester
+ Name-Comment: with stupid passphrase
+ Name-Email: test@example.org
+ Passphrase: Crypt0R0cks
+ Expire-Date: 2020-12-31
+ </GnupgKeyParms>
+ """
+
+ with self.context as c:
+ c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, None, app.gajimpaths['MY_DATA'])
+ c.set_progress_cb(gpg.callbacks.progress_stdout)
+ c.op_genkey(parms, None, None)
+ print("Generated key with fingerprint {0}.".format(
+ c.op_genkey_result().fpr))
+
+ def get_all_keys(self):
+ c = gpg.Context()
+ for key in c.keylist():
+ user = key.uids[0]
+ print("Keys for %s (%s):" % (user.name, user.email))
+ for subkey in key.subkeys:
+ features = []
+ if subkey.can_authenticate:
+ features.append('auth')
+ if subkey.can_certify:
+ features.append('cert')
+ if subkey.can_encrypt:
+ features.append('encrypt')
+ if subkey.can_sign:
+ features.append('sign')
+ print(' %s %s' % (subkey.fpr, ','.join(features)))
+
+ def get_key_by_name(self):
+ c = gpg.Context()
+ for key in c.keylist('john'):
+ print(key.subkeys[0].fpr)
+
+ def get_key_by_fingerprint(self):
+ c = gpg.Context()
+ fingerprint = 'key fingerprint to search for'
+ try:
+ key = c.get_key(fingerprint)
+ print('%s (%s)' % (key.uids[0].name, key.uids[0].email))
+ except gpg.errors.KeyNotFound:
+ print("No key for fingerprint '%s'." % fingerprint)
+
+ def get_secret_key(self):
+ '''
+ Key(can_authenticate=1,
+ can_certify=1,
+ can_encrypt=1,
+ can_sign=1,
+ chain_id=None,
+ disabled=0,
+ expired=0,
+ fpr='7ECE1F88BAFCA37F168E1556A4DBDD1BA55FE3CE',
+ invalid=0,
+ is_qualified=0,
+ issuer_name=None,
+ issuer_serial=None,
+ keylist_mode=1,
+ last_update=0,
+ origin=0,
+ owner_trust=5,
+ protocol=0,
+ revoked=0,
+ secret=1,
+ subkeys=[
+ SubKey(can_authenticate=1,
+ can_certify=1,
+ can_encrypt=1,
+ can_sign=1,
+ card_number=None
+ curve=None,
+ disabled=0,
+ expired=0,
+ expires=0,
+ fpr='7ECE1F88BAFCA37F168E1556A4DBDD1BA55FE3CE',
+ invalid=0,
+ is_cardkey=0,
+ is_de_vs=1,
+ is_qualified=0,
+ keygrip='15BECB77301E4810ABB9CA6A9391158E575DABEC',
+ keyid='A4DBDD1BA55FE3CE',
+ length=2048,
+ pubkey_algo=1,
+ revoked=0,
+ secret=1,
+ timestamp=1525006759)],
+ uids=[
+ UID(address=None,
+ comment='',
+ email='',
+ invalid=0,
+ last_update=0,
+ name='xmpp:philw@jabber.at',
+ origin=0,
+ revoked=0,
+ signatures=[],
+ tofu=[],
+ uid='xmpp:philw@jabber.at',
+ validity=5)])
+ '''
+ for key in self.context.keylist(secret=True):
+ break
+ return key.fpr, key.fpr[-16:]
+
+ def get_keys(self, secret=False):
+ keys = []
+ for key in self.context.keylist():
+ for uid in key.uids:
+ if uid.uid.startswith('xmpp:'):
+ keys.append((key, uid.uid[5:]))
+ break
+ return keys
+
+ def export_public_key(self):
+ # print(dir(self.context))
+ result = self.context.key_export_minimal()
+ print(result)
+
+ def encrypt_decrypt_files(self):
+ c = gpg.Context()
+ recipient = c.get_key("fingerprint of recipient's key")
+
+ # Encrypt
+ with open('foo.txt', 'r') as input_file:
+ with open('foo.txt.gpg', 'wb') as output_file:
+ c.encrypt([recipient], 0, input_file, output_file)
+
+ # Decrypt
+ with open('foo.txt.gpg', 'rb') as input_file:
+ with open('foo2.txt', 'w') as output_file:
+ c.decrypt(input_file, output_file)
+
+ def encrypt(self):
+ c = gpg.Context()
+ recipient = c.get_key("fingerprint of recipient's key")
+
+ plaintext_string = u'plain text data'
+ plaintext_bytes = io.BytesIO(plaintext_string.encode('utf8'))
+ encrypted_bytes = io.BytesIO()
+ c.encrypt([recipient], 0, plaintext_bytes, encrypted_bytes)
+
+ def decrypt(self):
+ c = gpg.Context()
+ decrypted_bytes = io.BytesIO()
+ c.decrypt(encrypted_bytes, decrypted_bytes)
+ decrypted_string = decrypted_bytes.getvalue().decode('utf8')
diff --git a/openpgp/backend/pygpg.py b/openpgp/backend/pygpg.py
new file mode 100644
index 0000000..5b94e0e
--- /dev/null
+++ b/openpgp/backend/pygpg.py
@@ -0,0 +1,183 @@
+# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com>
+#
+# This file is part of Gajim.
+#
+# Gajim is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation; version 3 only.
+#
+# Gajim is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
+
+# XEP-0373: OpenPGP for XMPP
+
+import os
+import logging
+from collections import namedtuple
+
+import gnupg
+
+from gajim.common import app
+
+from openpgp.modules.util import DecryptionFailed
+
+log = logging.getLogger('gajim.plugin_system.openpgp.pygnupg')
+# gnupg.logger = log
+
+KeyringItem = namedtuple('KeyringItem', 'jid keyid fingerprint')
+
+
+class PGPContext(gnupg.GPG):
+ def __init__(self, jid, gnupghome):
+ gnupg.GPG.__init__(
+ self, gpgbinary=app.get_gpg_binary(), gnupghome=str(gnupghome))
+
+ self._passphrase = 'gajimopenpgppassphrase'
+ self._jid = jid
+ self._own_fingerprint = None
+
+ def _get_key_params(self, jid, passphrase):
+ '''
+ Generate --gen-key input
+ '''
+
+ params = {
+ 'Key-Type': 'RSA',
+ 'Key-Length': 2048,
+ 'Name-Real': 'xmpp:%s' % jid,
+ 'Passphrase': passphrase,
+ }
+
+ out = "Key-Type: %s\n" % params.pop('Key-Type')
+ for key, val in list(params.items()):
+ out += "%s: %s\n" % (key, val)
+ out += "%commit\n"
+ return out
+
+ def generate_key(self):
+ super().gen_key(self._get_key_params(self._jid, self._passphrase))
+
+ def encrypt(self, payload, keys):
+ recipients = [key.fingerprint for key in keys]
+ log.info('encrypt to:')
+ for fingerprint in recipients:
+ log.info(fingerprint)
+
+ result = super().encrypt(str(payload).encode('utf8'),
+ recipients,
+ sign=self._own_fingerprint,
+ always_trust=True,
+ passphrase=self._passphrase)
+
+ if result.ok:
+ error = ''
+ else:
+ error = result.status
+
+ return str(result), error
+
+ def decrypt(self, payload):
+ result = super().decrypt(payload,
+ always_trust=True,
+ passphrase=self._passphrase)
+ if not result.ok:
+ raise DecryptionFailed(result.status)
+
+ return result.data.decode('utf8')
+
+ def get_key(self, fingerprint):
+ return super().list_keys(keys=[fingerprint])
+
+ def get_keys(self, secret=False):
+ result = super().list_keys(secret=secret)
+ keys = []
+ for key in result:
+ item = self._make_keyring_item(key)
+ if item is None:
+ continue
+ keys.append(self._make_keyring_item(key))
+ return keys
+
+ @staticmethod
+ def _make_keyring_item(key):
+ userid = key['uids'][0]
+ if not userid.startswith('xmpp:'):
+ log.warning('Incorrect userid: %s found for key, '
+ 'key will be ignored', userid)
+ return
+ jid = userid[5:]
+ return KeyringItem(jid, key['keyid'], key['fingerprint'])
+
+ def import_key(self, data, jid):
+ log.info('Import key from %s', jid)
+ result = super().import_keys(data)
+ if not result:
+ log.error('Could not import key')
+ log.error(result.results[0])
+ return
+
+ if not self.validate_key(data, jid):
+ return None
+ key = self.get_key(result.results[0]['fingerprint'])
+ return self._make_keyring_item(key[0])
+
+ def validate_key(self, public_key, jid):
+ import tempfile
+ temppath = os.path.join(tempfile.gettempdir(), 'temp_pubkey')
+ with open(temppath, 'wb') as tempfile:
+ tempfile.write(public_key)
+
+ result = self.scan_keys(temppath)
+ if result:
+ for uid in result.uids:
+ if uid.startswith('xmpp:'):
+ if uid[5:] == jid:
+ key_found = True
+ else:
+ log.warning('Found wrong userid in key: %s != %s',
+ uid[5:], jid)
+ log.debug(result)
+ os.remove(temppath)
+ return False
+
+ if not key_found:
+ log.warning('No valid userid found in key')
+ log.debug(result)
+ os.remove(temppath)
+ return False
+
+ log.info('Key validation succesful')
+ os.remove(temppath)
+ return True
+
+ log.warning('Invalid key data: %s')
+ log.debug(result)
+ os.remove(temppath)
+ return False
+
+ def get_own_key_details(self):
+ result = super().list_keys(secret=True)
+ if not result:
+ return None, None
+
+ if len(result) > 1:
+ log.error('More than one secret key found')
+ return None, None
+
+ self._own_fingerprint = result[0]['fingerprint']
+ return self._own_fingerprint, int(result[0]['date'])
+
+ def export_key(self, fingerprint):
+ key = super().export_keys(
+ fingerprint, secret=False, armor=False, minimal=False,
+ passphrase=self._passphrase)
+ return key
+
+ def delete_key(self, fingerprint):
+ log.info('Delete Key: %s', fingerprint)
+ super().delete_keys(fingerprint, passphrase=self._passphrase)
diff --git a/openpgp/backend/sql.py b/openpgp/backend/sql.py
new file mode 100644
index 0000000..564b504
--- /dev/null
+++ b/openpgp/backend/sql.py
@@ -0,0 +1,102 @@
+# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com>
+#
+# This file is part of Gajim.
+#
+# Gajim is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation; version 3 only.
+#
+# Gajim is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
+
+# XEP-0373: OpenPGP for XMPP
+
+import sqlite3
+import logging
+from collections import namedtuple
+
+log = logging.getLogger('gajim.plugin_system.openpgp.sql')
+
+TABLE_LAYOUT = '''
+ CREATE TABLE contacts (
+ jid TEXT,
+ fingerprint TEXT,
+ active BOOLEAN,
+ trust INTEGER,
+ timestamp INTEGER,
+ comment TEXT
+ );
+ CREATE UNIQUE INDEX jid_fingerprint ON contacts (jid, fingerprint);'''
+
+
+class Storage:
+ def __init__(self, folder_path):
+ self._con = sqlite3.connect(folder_path / 'contacts.db',
+ detect_types=sqlite3.PARSE_DECLTYPES)
+ self._con.row_factory = self._namedtuple_factory
+ self._create_database()
+ self._migrate_database()
+ self._con.execute("PRAGMA synchronous=FULL;")
+ self._con.commit()
+
+ @staticmethod
+ def _namedtuple_factory(cursor, row):
+ fields = [col[0] for col in cursor.description]
+ Row = namedtuple("Row", fields)
+ named_row = Row(*row)
+ return named_row
+
+ def _user_version(self):
+ return self._con.execute('PRAGMA user_version').fetchone()[0]
+
+ def _create_database(self):
+ if not self._user_version():
+ log.info('Create contacts.db')
+ self._execute_query(TABLE_LAYOUT)
+
+ def _execute_query(self, query):
+ transaction = """
+ BEGIN TRANSACTION;
+ %s
+ PRAGMA user_version=1;
+ END TRANSACTION;
+ """ % (query)
+ self._con.executescript(transaction)
+
+ def _migrate_database(self):
+ pass
+
+ def load_contacts(self):
+ sql = 'SELECT * from contacts'
+ rows = self._con.execute(sql).fetchall()
+ if rows is not None:
+ return rows
+
+ def save_contact(self, db_values):
+ sql = '''REPLACE INTO
+ contacts(jid, fingerprint, active, trust, timestamp, comment)
+ VALUES(?, ?, ?, ?, ?, ?)'''
+ for values in db_values:
+ log.info('Store key: %s', values)
+ self._con.execute(sql, values)
+ self._con.commit()
+
+ def set_trust(self, jid, fingerprint, trust):
+ sql = 'UPDATE contacts SET trust = ? WHERE jid = ? AND fingerprint = ?'
+ log.info('Set Trust: %s %s %s', trust, jid, fingerprint)
+ self._con.execute(sql, (trust, jid, fingerprint))
+ self._con.commit()
+
+ def delete_key(self, jid, fingerprint):
+ sql = 'DELETE from contacts WHERE jid = ? AND fingerprint = ?'
+ log.info('Delete Key: %s %s', jid, fingerprint)
+ self._con.execute(sql, (jid, fingerprint))
+ self._con.commit()
+
+ def cleanup(self):
+ self._con.close()