diff --git a/M2Crypto/DSA.py b/M2Crypto/DSA.py index 57d123b..325e418 100644 --- a/M2Crypto/DSA.py +++ b/M2Crypto/DSA.py @@ -396,6 +396,29 @@ def load_key_bio(bio, callback=util.passphrase_callback): raise DSAError('problem loading DSA key pair') return DSA(dsa, 1) +def pub_key_from_params(p, q, g, pub): + """ + Factory function that instantiates a DSA_pub object using + the parameters and public key specified. + + @type p: str + @param p: value of p, a "byte string" + @type q: str + @param q: value of q, a "byte string" + @type g: str + @param g: value of g, a "byte string" + @type pub: str + @param pub: value of the public key, a "byte string" + @rtype: DSA_pub + @return: instance of DSA_pub. + """ + dsa = m2.dsa_new() + m2.dsa_set_p(dsa, p) + m2.dsa_set_q(dsa, q) + m2.dsa_set_g(dsa, g) + m2.dsa_set_pub(dsa, pub) + return DSA_pub(dsa, 1) + def load_pub_key(file, callback=util.passphrase_callback): """ diff --git a/M2Crypto/EC.py b/M2Crypto/EC.py index a4a9faf..800a705 100644 --- a/M2Crypto/EC.py +++ b/M2Crypto/EC.py @@ -254,6 +254,13 @@ class EC_pub(EC): self.der = m2.ec_key_get_public_der(self.ec) return self.der + def get_key(self): + """ + Returns the public key as a byte string. + """ + assert self.check_key(), 'key is not initialised' + return m2.ec_key_get_public_key(self.ec) + save_key = EC.save_pub_key save_key_bio = EC.save_pub_key_bio @@ -333,3 +340,9 @@ def pub_key_from_der(der): Create EC_pub from DER. """ return EC_pub(m2.ec_key_from_pubkey_der(der), 1) + +def pub_key_from_params(curve, bytes): + """ + Create EC_pub from curve name and octet string. + """ + return EC_pub(m2.ec_key_from_pubkey_params(curve, bytes), 1) diff --git a/M2Crypto/EVP.py b/M2Crypto/EVP.py index 12618a2..28303bd 100644 --- a/M2Crypto/EVP.py +++ b/M2Crypto/EVP.py @@ -40,8 +40,13 @@ class MessageDigest: def __init__(self, algo): md = getattr(m2, algo, None) if md is None: - raise ValueError('unknown algorithm', algo) - self.md = md() + # if the digest algorithm isn't found as an attribute of the m2 + # module, try to look up the digest using get_digestbyname() + self.md = m2.get_digestbyname(algo) + if self.md is None: + raise ValueError('unknown algorithm', algo) + else: + self.md = md() self.ctx = m2.md_ctx_new() m2.digest_init(self.ctx, self.md) @@ -389,6 +394,25 @@ def load_key_bio(bio, callback=util.passphrase_callback): raise EVPError(Err.get_error()) return PKey(cptr, 1) +def load_key_bio_pubkey(bio, callback=util.passphrase_callback): + """ + Load an M2Crypto.EVP.PKey from a public key as a M2Crypto.BIO object. + + @type bio: M2Crypto.BIO + @param bio: M2Crypto.BIO object containing the key in PEM format. + + @type callback: Python callable + @param callback: A Python callable object that is invoked + to acquire a passphrase with which to protect the key. + + @rtype: M2Crypto.EVP.PKey + @return: M2Crypto.EVP.PKey object. + """ + cptr = m2.pkey_read_pem_pubkey(bio._ptr(), callback) + if cptr is None: + raise EVPError(Err.get_error()) + return PKey(cptr, 1) + def load_key_string(string, callback=util.passphrase_callback): """ Load an M2Crypto.EVP.PKey from a string. @@ -405,3 +429,20 @@ def load_key_string(string, callback=util.passphrase_callback): """ bio = BIO.MemoryBuffer(string) return load_key_bio(bio, callback) + +def load_key_string_pubkey(string, callback=util.passphrase_callback): + """ + Load an M2Crypto.EVP.PKey from a public key as a string. + + @type string: string + @param string: String containing the key in PEM format. + + @type callback: Python callable + @param callback: A Python callable object that is invoked + to acquire a passphrase with which to protect the key. + + @rtype: M2Crypto.EVP.PKey + @return: M2Crypto.EVP.PKey object. + """ + bio = BIO.MemoryBuffer(string) + return load_key_bio_pubkey(bio, callback) diff --git a/SWIG/_dsa.i b/SWIG/_dsa.i index a35dd88..a6da42d 100644 --- a/SWIG/_dsa.i +++ b/SWIG/_dsa.i @@ -153,6 +153,25 @@ PyObject *dsa_set_g(DSA *dsa, PyObject *value) { Py_INCREF(Py_None); return Py_None; } + +PyObject *dsa_set_pub(DSA *dsa, PyObject *value) { + BIGNUM *bn; + const void *vbuf; + int vlen; + + if (m2_PyObject_AsReadBufferInt(value, &vbuf, &vlen) == -1) + return NULL; + + if (!(bn = BN_mpi2bn((unsigned char *)vbuf, vlen, NULL))) { + PyErr_SetString(_dsa_err, ERR_reason_error_string(ERR_get_error())); + return NULL; + } + if (dsa->pub_key) + BN_free(dsa->pub_key); + dsa->pub_key = bn; + Py_INCREF(Py_None); + return Py_None; +} %} %inline %{ diff --git a/SWIG/_ec.i b/SWIG/_ec.i index f0e52bd..9065c10 100644 --- a/SWIG/_ec.i +++ b/SWIG/_ec.i @@ -189,6 +189,43 @@ PyObject *ec_key_get_public_der(EC_KEY *key) { return pyo; } + +PyObject *ec_key_get_public_key(EC_KEY *key) { + + unsigned char *src=NULL; + void *dst=NULL; + int src_len=0; + Py_ssize_t dst_len=0; + PyObject *pyo=NULL; + int ret=0; + + /* Convert to binary */ + src_len = i2o_ECPublicKey(key, &src); + if (src_len < 0) + { + PyErr_SetString(_ec_err, ERR_reason_error_string(ERR_get_error())); + return NULL; + } + + /* Create a PyBuffer containing a copy of the binary, + * to simplify memory deallocation + */ + pyo = PyBuffer_New( src_len ); + ret = PyObject_AsWriteBuffer( pyo, &dst, &dst_len ); + assert( src_len == dst_len ); + if (ret < 0) + { + Py_DECREF(pyo); + OPENSSL_free(src); + PyErr_SetString(_ec_err, "cannot get write buffer"); + return NULL; + } + memcpy( dst, src, src_len ); + OPENSSL_free(src); + + return pyo; +} + %} %threadallow ec_key_read_pubkey; @@ -404,6 +441,32 @@ EC_KEY* ec_key_from_pubkey_der(PyObject *pubkey) { return keypair; } +EC_KEY* ec_key_from_pubkey_params(int nid, PyObject *pubkey) { + const void *keypairbuf; + Py_ssize_t keypairbuflen; + const unsigned char *tempBuf; + EC_KEY *keypair; + + if (PyObject_AsReadBuffer(pubkey, &keypairbuf, &keypairbuflen) == -1) + { + return NULL; + } + + keypair = ec_key_new_by_curve_name(nid); + if (!keypair) { + PyErr_SetString(_ec_err, ERR_reason_error_string(ERR_get_error())); + return NULL; + } + + tempBuf = (const unsigned char *)keypairbuf; + if ((o2i_ECPublicKey( &keypair, &tempBuf, keypairbuflen)) == 0) + { + PyErr_SetString(_ec_err, ERR_reason_error_string(ERR_get_error())); + return NULL; + } + return keypair; +} + // According to [SEC2] the degree of the group is defined as EC key length int ec_key_keylen(EC_KEY *key) { diff --git a/SWIG/_evp.i b/SWIG/_evp.i index 85382db..033897b 100644 --- a/SWIG/_evp.i +++ b/SWIG/_evp.i @@ -49,6 +49,9 @@ extern const EVP_MD *EVP_sha512(void); %rename(digest_init) EVP_DigestInit; extern int EVP_DigestInit(EVP_MD_CTX *, const EVP_MD *); +%rename(get_digestbyname) EVP_get_digestbyname; +extern EVP_MD *EVP_get_digestbyname(const char * name); + %rename(des_ecb) EVP_des_ecb; extern const EVP_CIPHER *EVP_des_ecb(void); %rename(des_ede_ecb) EVP_des_ede; @@ -519,6 +522,17 @@ EVP_PKEY *pkey_read_pem(BIO *f, PyObject *pyfunc) { return pk; } +EVP_PKEY *pkey_read_pem_pubkey(BIO *f, PyObject *pyfunc) { + EVP_PKEY *pk; + + Py_INCREF(pyfunc); + Py_BEGIN_ALLOW_THREADS + pk = PEM_read_bio_PUBKEY(f, NULL, passphrase_callback, (void *)pyfunc); + Py_END_ALLOW_THREADS + Py_DECREF(pyfunc); + return pk; +} + int pkey_assign_rsa(EVP_PKEY *pkey, RSA *rsa) { return EVP_PKEY_assign_RSA(pkey, rsa); } diff --git a/tests/test_dsa.py b/tests/test_dsa.py index 27d1f61..c224a53 100644 --- a/tests/test_dsa.py +++ b/tests/test_dsa.py @@ -99,6 +99,19 @@ class DSATestCase(unittest.TestCase): r, s = dsa2.sign(self.data) assert dsa2.verify(self.data, r, s) + def test_pub_key_from_params(self): + dsa = DSA.gen_params(1024, self.callback) + dsa.gen_key() + assert len(dsa) == 1024 + p = dsa.p + q = dsa.q + g = dsa.g + pub = dsa.pub + dsa2 = DSA.pub_key_from_params(p,q,g,pub) + assert dsa2.check_key() + r,s = dsa.sign(self.data) + assert dsa2.verify(self.data, r, s) + def suite(): return unittest.makeSuite(DSATestCase) diff --git a/tests/test_ecdsa.py b/tests/test_ecdsa.py index d6c75d1..d28be96 100644 --- a/tests/test_ecdsa.py +++ b/tests/test_ecdsa.py @@ -70,6 +70,16 @@ class ECDSATestCase(unittest.TestCase): ec = EC.gen_params(EC.NID_sect233k1) self.assertEqual(len(ec), 233) + def test_pub_key_from_params(self): + curve = EC.NID_X9_62_prime256v1 + ec = EC.gen_params(curve) + ec.gen_key() + ec_pub = ec.pub() + k = ec_pub.get_key() + ec2 = EC.pub_key_from_params(curve, k) + assert ec2.check_key() + r, s = ec.sign_dsa(self.data) + assert ec2.verify_dsa(self.data, r, s) def suite(): return unittest.makeSuite(ECDSATestCase) diff --git a/tests/test_evp.py b/tests/test_evp.py index 8cf7d12..bddec84 100644 --- a/tests/test_evp.py +++ b/tests/test_evp.py @@ -58,6 +58,9 @@ class EVPTestCase(unittest.TestCase): # A quick but not thorough sanity check self.assertEqual(len(der_blob), 160) + def test_get_digestbyname(self): + self.assertEqual(m2.get_digestbyname('sha513'), None) + self.assertNotEqual(m2.get_digestbyname('sha1'), None) def test_MessageDigest(self): with self.assertRaises(ValueError): @@ -66,6 +69,19 @@ class EVPTestCase(unittest.TestCase): self.assertEqual(md.update('Hello'), 1) self.assertEqual(util.octx_to_num(md.final()), 1415821221623963719413415453263690387336440359920) + # temporarily remove sha1 from m2 + old_sha1 = m2.sha1 + del m2.sha1 + + # now run the same test again, relying on EVP.MessageDigest() to call + # get_digestbyname() under the hood + md = EVP.MessageDigest('sha1') + self.assertEqual(md.update('Hello'), 1) + self.assertEqual(util.octx_to_num(md.final()), 1415821221623963719413415453263690387336440359920) + + # put sha1 back in place + m2.sha1 = old_sha1 + def test_as_der_capture_key(self): """ Test DER encoding the PKey instance after assigning @@ -140,6 +156,26 @@ class EVPTestCase(unittest.TestCase): rsa3 = RSA.gen_key(1024, 3, callback=self._gen_callback) self.assertNotEqual(rsa.sign(digest), rsa3.sign(digest)) + def test_load_key_string_pubkey(self): + """ + Testing creating a PKey instance from PEM string. + """ + rsa = RSA.gen_key(1024, 3, callback=self._gen_callback) + self.assertIsInstance(rsa, RSA.RSA) + + rsa_pem = BIO.MemoryBuffer() + rsa.save_pub_key_bio(rsa_pem) + pkey = EVP.load_key_string_pubkey(rsa_pem.read()) + rsa2 = pkey.get_rsa() + self.assertIsInstance(rsa2, RSA.RSA_pub) + self.assertEqual(rsa.e, rsa2.e) + self.assertEqual(rsa.n, rsa2.n) + pem = rsa.as_pem(callback=self._pass_callback) + pem2 = rsa2.as_pem() + assert pem + assert pem2 + self.assertNotEqual(pem, pem2) + def test_get_rsa_fail(self): """ Testing trying to retrieve the RSA key from the PKey instance