"""Tests for acme.messages.""" import unittest from unittest import mock import josepy as jose from acme import challenges import test_util CERT = test_util.load_comparable_cert('cert.der') CSR = test_util.load_comparable_csr('csr.der') KEY = test_util.load_rsa_private_key('rsa512_key.pem') class ErrorTest(unittest.TestCase): """Tests for acme.messages.Error.""" def setUp(self): from acme.messages import Error, ERROR_PREFIX self.error = Error.with_code('malformed', detail='foo', title='title') self.jobj = { 'detail': 'foo', 'title': 'some title', 'type': ERROR_PREFIX + 'malformed', } self.error_custom = Error(typ='custom', detail='bar') self.empty_error = Error() def test_default_typ(self): from acme.messages import Error self.assertEqual(Error().typ, 'about:blank') def test_from_json_empty(self): from acme.messages import Error self.assertEqual(Error(), Error.from_json('{}')) def test_from_json_hashable(self): from acme.messages import Error hash(Error.from_json(self.error.to_json())) def test_description(self): self.assertEqual('The request message was malformed', self.error.description) self.assertTrue(self.error_custom.description is None) def test_code(self): from acme.messages import Error self.assertEqual('malformed', self.error.code) self.assertEqual(None, self.error_custom.code) self.assertEqual(None, Error().code) def test_is_acme_error(self): from acme.messages import is_acme_error, Error self.assertTrue(is_acme_error(self.error)) self.assertFalse(is_acme_error(self.error_custom)) self.assertFalse(is_acme_error(Error())) self.assertFalse(is_acme_error(self.empty_error)) self.assertFalse(is_acme_error("must pet all the {dogs|rabbits}")) def test_unicode_error(self): from acme.messages import Error, is_acme_error arabic_error = Error.with_code( 'malformed', detail=u'\u0639\u062f\u0627\u0644\u0629', title='title') self.assertTrue(is_acme_error(arabic_error)) def test_with_code(self): from acme.messages import Error, is_acme_error self.assertTrue(is_acme_error(Error.with_code('badCSR'))) self.assertRaises(ValueError, Error.with_code, 'not an ACME error code') def test_str(self): self.assertEqual( str(self.error), u"{0.typ} :: {0.description} :: {0.detail} :: {0.title}" .format(self.error)) class ConstantTest(unittest.TestCase): """Tests for acme.messages._Constant.""" def setUp(self): from acme.messages import _Constant class MockConstant(_Constant): # pylint: disable=missing-docstring POSSIBLE_NAMES = {} # type: Dict self.MockConstant = MockConstant # pylint: disable=invalid-name self.const_a = MockConstant('a') self.const_b = MockConstant('b') def test_to_partial_json(self): self.assertEqual('a', self.const_a.to_partial_json()) self.assertEqual('b', self.const_b.to_partial_json()) def test_from_json(self): self.assertEqual(self.const_a, self.MockConstant.from_json('a')) self.assertRaises( jose.DeserializationError, self.MockConstant.from_json, 'c') def test_from_json_hashable(self): hash(self.MockConstant.from_json('a')) def test_repr(self): self.assertEqual('MockConstant(a)', repr(self.const_a)) self.assertEqual('MockConstant(b)', repr(self.const_b)) def test_equality(self): const_a_prime = self.MockConstant('a') self.assertNotEqual(self.const_a, self.const_b) self.assertEqual(self.const_a, const_a_prime) self.assertNotEqual(self.const_a, self.const_b) self.assertEqual(self.const_a, const_a_prime) class DirectoryTest(unittest.TestCase): """Tests for acme.messages.Directory.""" def setUp(self): from acme.messages import Directory self.dir = Directory({ 'new-reg': 'reg', mock.MagicMock(resource_type='new-cert'): 'cert', 'meta': Directory.Meta( terms_of_service='https://example.com/acme/terms', website='https://www.example.com/', caa_identities=['example.com'], ), }) def test_init_wrong_key_value_success(self): # pylint: disable=no-self-use from acme.messages import Directory Directory({'foo': 'bar'}) def test_getitem(self): self.assertEqual('reg', self.dir['new-reg']) from acme.messages import NewRegistration self.assertEqual('reg', self.dir[NewRegistration]) self.assertEqual('reg', self.dir[NewRegistration()]) def test_getitem_fails_with_key_error(self): self.assertRaises(KeyError, self.dir.__getitem__, 'foo') def test_getattr(self): self.assertEqual('reg', self.dir.new_reg) def test_getattr_fails_with_attribute_error(self): self.assertRaises(AttributeError, self.dir.__getattr__, 'foo') def test_to_json(self): self.assertEqual(self.dir.to_json(), { 'new-reg': 'reg', 'new-cert': 'cert', 'meta': { 'terms-of-service': 'https://example.com/acme/terms', 'website': 'https://www.example.com/', 'caaIdentities': ['example.com'], }, }) def test_from_json_deserialization_unknown_key_success(self): # pylint: disable=no-self-use from acme.messages import Directory Directory.from_json({'foo': 'bar'}) def test_iter_meta(self): result = False for k in self.dir.meta: if k == 'terms_of_service': result = self.dir.meta[k] == 'https://example.com/acme/terms' self.assertTrue(result) class ExternalAccountBindingTest(unittest.TestCase): def setUp(self): from acme.messages import Directory self.key = jose.jwk.JWKRSA(key=KEY.public_key()) self.kid = "kid-for-testing" self.hmac_key = "hmac-key-for-testing" self.dir = Directory({ 'newAccount': 'http://url/acme/new-account', }) def test_from_data(self): from acme.messages import ExternalAccountBinding eab = ExternalAccountBinding.from_data(self.key, self.kid, self.hmac_key, self.dir) self.assertEqual(len(eab), 3) self.assertEqual(sorted(eab.keys()), sorted(['protected', 'payload', 'signature'])) class RegistrationTest(unittest.TestCase): """Tests for acme.messages.Registration.""" def setUp(self): key = jose.jwk.JWKRSA(key=KEY.public_key()) contact = ( 'mailto:admin@foo.com', 'tel:1234', ) agreement = 'https://letsencrypt.org/terms' from acme.messages import Registration self.reg = Registration(key=key, contact=contact, agreement=agreement) self.reg_none = Registration() self.jobj_to = { 'contact': contact, 'agreement': agreement, 'key': key, } self.jobj_from = self.jobj_to.copy() self.jobj_from['key'] = key.to_json() def test_from_data(self): from acme.messages import Registration reg = Registration.from_data(phone='1234', email='admin@foo.com') self.assertEqual(reg.contact, ( 'tel:1234', 'mailto:admin@foo.com', )) def test_new_registration_from_data_with_eab(self): from acme.messages import NewRegistration, ExternalAccountBinding, Directory key = jose.jwk.JWKRSA(key=KEY.public_key()) kid = "kid-for-testing" hmac_key = "hmac-key-for-testing" directory = Directory({ 'newAccount': 'http://url/acme/new-account', }) eab = ExternalAccountBinding.from_data(key, kid, hmac_key, directory) reg = NewRegistration.from_data(email='admin@foo.com', external_account_binding=eab) self.assertEqual(reg.contact, ( 'mailto:admin@foo.com', )) self.assertEqual(sorted(reg.external_account_binding.keys()), sorted(['protected', 'payload', 'signature'])) def test_phones(self): self.assertEqual(('1234',), self.reg.phones) def test_emails(self): self.assertEqual(('admin@foo.com',), self.reg.emails) def test_to_partial_json(self): self.assertEqual(self.jobj_to, self.reg.to_partial_json()) def test_from_json(self): from acme.messages import Registration self.assertEqual(self.reg, Registration.from_json(self.jobj_from)) def test_from_json_hashable(self): from acme.messages import Registration hash(Registration.from_json(self.jobj_from)) def test_default_not_transmitted(self): from acme.messages import NewRegistration empty_new_reg = NewRegistration() new_reg_with_contact = NewRegistration(contact=()) self.assertEqual(empty_new_reg.contact, ()) self.assertEqual(new_reg_with_contact.contact, ()) self.assertTrue('contact' not in empty_new_reg.to_partial_json()) self.assertTrue('contact' not in empty_new_reg.fields_to_partial_json()) self.assertTrue('contact' in new_reg_with_contact.to_partial_json()) self.assertTrue('contact' in new_reg_with_contact.fields_to_partial_json()) class UpdateRegistrationTest(unittest.TestCase): """Tests for acme.messages.UpdateRegistration.""" def test_empty(self): from acme.messages import UpdateRegistration jstring = '{"resource": "reg"}' self.assertEqual(jstring, UpdateRegistration().json_dumps()) self.assertEqual( UpdateRegistration(), UpdateRegistration.json_loads(jstring)) class RegistrationResourceTest(unittest.TestCase): """Tests for acme.messages.RegistrationResource.""" def setUp(self): from acme.messages import RegistrationResource self.regr = RegistrationResource( body=mock.sentinel.body, uri=mock.sentinel.uri, terms_of_service=mock.sentinel.terms_of_service) def test_to_partial_json(self): self.assertEqual(self.regr.to_json(), { 'body': mock.sentinel.body, 'uri': mock.sentinel.uri, 'terms_of_service': mock.sentinel.terms_of_service, }) class ChallengeResourceTest(unittest.TestCase): """Tests for acme.messages.ChallengeResource.""" def test_uri(self): from acme.messages import ChallengeResource self.assertEqual('http://challb', ChallengeResource(body=mock.MagicMock( uri='http://challb'), authzr_uri='http://authz').uri) class ChallengeBodyTest(unittest.TestCase): """Tests for acme.messages.ChallengeBody.""" def setUp(self): self.chall = challenges.DNS(token=jose.b64decode( 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA')) from acme.messages import ChallengeBody from acme.messages import Error from acme.messages import STATUS_INVALID self.status = STATUS_INVALID error = Error.with_code('serverInternal', detail='Unable to communicate with DNS server') self.challb = ChallengeBody( uri='http://challb', chall=self.chall, status=self.status, error=error) self.jobj_to = { 'uri': 'http://challb', 'status': self.status, 'type': 'dns', 'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA', 'error': error, } self.jobj_from = self.jobj_to.copy() self.jobj_from['status'] = 'invalid' self.jobj_from['error'] = { 'type': 'urn:ietf:params:acme:error:serverInternal', 'detail': 'Unable to communicate with DNS server', } def test_encode(self): self.assertEqual(self.challb.encode('uri'), self.challb.uri) def test_to_partial_json(self): self.assertEqual(self.jobj_to, self.challb.to_partial_json()) def test_from_json(self): from acme.messages import ChallengeBody self.assertEqual(self.challb, ChallengeBody.from_json(self.jobj_from)) def test_from_json_hashable(self): from acme.messages import ChallengeBody hash(ChallengeBody.from_json(self.jobj_from)) def test_proxy(self): self.assertEqual(jose.b64decode( 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA'), self.challb.token) class AuthorizationTest(unittest.TestCase): """Tests for acme.messages.Authorization.""" def setUp(self): from acme.messages import ChallengeBody from acme.messages import STATUS_VALID self.challbs = ( ChallengeBody( uri='http://challb1', status=STATUS_VALID, chall=challenges.HTTP01(token=b'IlirfxKKXAsHtmzK29Pj8A')), ChallengeBody(uri='http://challb2', status=STATUS_VALID, chall=challenges.DNS( token=b'DGyRejmCefe7v4NfDGDKfA')), ) combinations = ((0,), (1,)) from acme.messages import Authorization from acme.messages import Identifier from acme.messages import IDENTIFIER_FQDN identifier = Identifier(typ=IDENTIFIER_FQDN, value='example.com') self.authz = Authorization( identifier=identifier, combinations=combinations, challenges=self.challbs) self.jobj_from = { 'identifier': identifier.to_json(), 'challenges': [challb.to_json() for challb in self.challbs], 'combinations': combinations, } def test_from_json(self): from acme.messages import Authorization Authorization.from_json(self.jobj_from) def test_from_json_hashable(self): from acme.messages import Authorization hash(Authorization.from_json(self.jobj_from)) def test_resolved_combinations(self): self.assertEqual(self.authz.resolved_combinations, ( (self.challbs[0],), (self.challbs[1],), )) class AuthorizationResourceTest(unittest.TestCase): """Tests for acme.messages.AuthorizationResource.""" def test_json_de_serializable(self): from acme.messages import AuthorizationResource authzr = AuthorizationResource( uri=mock.sentinel.uri, body=mock.sentinel.body) self.assertTrue(isinstance(authzr, jose.JSONDeSerializable)) class CertificateRequestTest(unittest.TestCase): """Tests for acme.messages.CertificateRequest.""" def setUp(self): from acme.messages import CertificateRequest self.req = CertificateRequest(csr=CSR) def test_json_de_serializable(self): self.assertTrue(isinstance(self.req, jose.JSONDeSerializable)) from acme.messages import CertificateRequest self.assertEqual( self.req, CertificateRequest.from_json(self.req.to_json())) class CertificateResourceTest(unittest.TestCase): """Tests for acme.messages.CertificateResourceTest.""" def setUp(self): from acme.messages import CertificateResource self.certr = CertificateResource( body=CERT, uri=mock.sentinel.uri, authzrs=(), cert_chain_uri=mock.sentinel.cert_chain_uri) def test_json_de_serializable(self): self.assertTrue(isinstance(self.certr, jose.JSONDeSerializable)) from acme.messages import CertificateResource self.assertEqual( self.certr, CertificateResource.from_json(self.certr.to_json())) class RevocationTest(unittest.TestCase): """Tests for acme.messages.RevocationTest.""" def setUp(self): from acme.messages import Revocation self.rev = Revocation(certificate=CERT) def test_from_json_hashable(self): from acme.messages import Revocation hash(Revocation.from_json(self.rev.to_json())) class OrderResourceTest(unittest.TestCase): """Tests for acme.messages.OrderResource.""" def setUp(self): from acme.messages import OrderResource self.regr = OrderResource( body=mock.sentinel.body, uri=mock.sentinel.uri) def test_to_partial_json(self): self.assertEqual(self.regr.to_json(), { 'body': mock.sentinel.body, 'uri': mock.sentinel.uri, 'authorizations': None, }) class NewOrderTest(unittest.TestCase): """Tests for acme.messages.NewOrder.""" def setUp(self): from acme.messages import NewOrder self.reg = NewOrder( identifiers=mock.sentinel.identifiers) def test_to_partial_json(self): self.assertEqual(self.reg.to_json(), { 'identifiers': mock.sentinel.identifiers, }) class JWSPayloadRFC8555Compliant(unittest.TestCase): """Test for RFC8555 compliance of JWS generated from resources/challenges""" def test_message_payload(self): from acme.messages import NewAuthorization new_order = NewAuthorization() new_order.le_acme_version = 2 jobj = new_order.json_dumps(indent=2).encode() # RFC8555 states that JWS bodies must not have a resource field. self.assertEqual(jobj, b'{}') if __name__ == '__main__': unittest.main() # pragma: no cover