Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/acme
diff options
context:
space:
mode:
Diffstat (limited to 'acme')
-rw-r--r--acme/acme/challenges.py3
-rw-r--r--acme/acme/client.py3
-rw-r--r--acme/acme/messages.py13
-rw-r--r--acme/acme/mixins.py65
-rw-r--r--acme/tests/challenges_test.py13
-rw-r--r--acme/tests/client_test.py3
-rw-r--r--acme/tests/messages_test.py14
7 files changed, 106 insertions, 8 deletions
diff --git a/acme/acme/challenges.py b/acme/acme/challenges.py
index 0b112be00..b9c6b7eb2 100644
--- a/acme/acme/challenges.py
+++ b/acme/acme/challenges.py
@@ -16,6 +16,7 @@ from OpenSSL import crypto
from acme import crypto_util
from acme import errors
from acme import fields
+from acme.mixins import ResourceMixin, TypeMixin
logger = logging.getLogger(__name__)
@@ -34,7 +35,7 @@ class Challenge(jose.TypedJSONObjectWithFields):
return UnrecognizedChallenge.from_json(jobj)
-class ChallengeResponse(jose.TypedJSONObjectWithFields):
+class ChallengeResponse(ResourceMixin, TypeMixin, jose.TypedJSONObjectWithFields):
# _fields_to_partial_json
"""ACME challenge response."""
TYPES = {} # type: dict
diff --git a/acme/acme/client.py b/acme/acme/client.py
index cecb727c7..cbe543f91 100644
--- a/acme/acme/client.py
+++ b/acme/acme/client.py
@@ -25,6 +25,7 @@ from acme.magic_typing import Dict
from acme.magic_typing import List
from acme.magic_typing import Set
from acme.magic_typing import Text
+from acme.mixins import VersionedLEACMEMixin
logger = logging.getLogger(__name__)
@@ -987,6 +988,8 @@ class ClientNetwork(object):
:rtype: `josepy.JWS`
"""
+ if isinstance(obj, VersionedLEACMEMixin):
+ obj.le_acme_version = acme_version
jobj = obj.json_dumps(indent=2).encode() if obj else b''
logger.debug('JWS payload:\n%s', jobj)
kwargs = {
diff --git a/acme/acme/messages.py b/acme/acme/messages.py
index f8f4bfbe7..90059a6fb 100644
--- a/acme/acme/messages.py
+++ b/acme/acme/messages.py
@@ -9,6 +9,7 @@ from acme import errors
from acme import fields
from acme import jws
from acme import util
+from acme.mixins import ResourceMixin
try:
from collections.abc import Hashable
@@ -356,13 +357,13 @@ class Registration(ResourceBody):
@Directory.register
-class NewRegistration(Registration):
+class NewRegistration(ResourceMixin, Registration):
"""New registration."""
resource_type = 'new-reg'
resource = fields.Resource(resource_type)
-class UpdateRegistration(Registration):
+class UpdateRegistration(ResourceMixin, Registration):
"""Update registration."""
resource_type = 'reg'
resource = fields.Resource(resource_type)
@@ -498,13 +499,13 @@ class Authorization(ResourceBody):
@Directory.register
-class NewAuthorization(Authorization):
+class NewAuthorization(ResourceMixin, Authorization):
"""New authorization."""
resource_type = 'new-authz'
resource = fields.Resource(resource_type)
-class UpdateAuthorization(Authorization):
+class UpdateAuthorization(ResourceMixin, Authorization):
"""Update authorization."""
resource_type = 'authz'
resource = fields.Resource(resource_type)
@@ -522,7 +523,7 @@ class AuthorizationResource(ResourceWithURI):
@Directory.register
-class CertificateRequest(jose.JSONObjectWithFields):
+class CertificateRequest(ResourceMixin, jose.JSONObjectWithFields):
"""ACME new-cert request.
:ivar josepy.util.ComparableX509 csr:
@@ -548,7 +549,7 @@ class CertificateResource(ResourceWithURI):
@Directory.register
-class Revocation(jose.JSONObjectWithFields):
+class Revocation(ResourceMixin, jose.JSONObjectWithFields):
"""Revocation message.
:ivar .ComparableX509 certificate: `OpenSSL.crypto.X509` wrapped in
diff --git a/acme/acme/mixins.py b/acme/acme/mixins.py
new file mode 100644
index 000000000..1cd050ccc
--- /dev/null
+++ b/acme/acme/mixins.py
@@ -0,0 +1,65 @@
+"""Useful mixins for Challenge and Resource objects"""
+
+
+class VersionedLEACMEMixin(object):
+ """This mixin stores the version of Let's Encrypt's endpoint being used."""
+ @property
+ def le_acme_version(self):
+ """Define the version of ACME protocol to use"""
+ return getattr(self, '_le_acme_version', 1)
+
+ @le_acme_version.setter
+ def le_acme_version(self, version):
+ # We need to use object.__setattr__ to not depend on the specific implementation of
+ # __setattr__ in current class (eg. jose.TypedJSONObjectWithFields raises AttributeError
+ # for any attempt to set an attribute to make objects immutable).
+ object.__setattr__(self, '_le_acme_version', version)
+
+ def __setattr__(self, key, value):
+ if key == 'le_acme_version':
+ # Required for @property to operate properly. See comment above.
+ object.__setattr__(self, key, value)
+ else:
+ super(VersionedLEACMEMixin, self).__setattr__(key, value) # pragma: no cover
+
+
+class ResourceMixin(VersionedLEACMEMixin):
+ """
+ This mixin generates a RFC8555 compliant JWS payload
+ by removing the `resource` field if needed (eg. ACME v2 protocol).
+ """
+ def to_partial_json(self):
+ """See josepy.JSONDeserializable.to_partial_json()"""
+ return _safe_jobj_compliance(super(ResourceMixin, self),
+ 'to_partial_json', 'resource')
+
+ def fields_to_partial_json(self):
+ """See josepy.JSONObjectWithFields.fields_to_partial_json()"""
+ return _safe_jobj_compliance(super(ResourceMixin, self),
+ 'fields_to_partial_json', 'resource')
+
+
+class TypeMixin(VersionedLEACMEMixin):
+ """
+ This mixin allows generation of a RFC8555 compliant JWS payload
+ by removing the `type` field if needed (eg. ACME v2 protocol).
+ """
+ def to_partial_json(self):
+ """See josepy.JSONDeserializable.to_partial_json()"""
+ return _safe_jobj_compliance(super(TypeMixin, self),
+ 'to_partial_json', 'type')
+
+ def fields_to_partial_json(self):
+ """See josepy.JSONObjectWithFields.fields_to_partial_json()"""
+ return _safe_jobj_compliance(super(TypeMixin, self),
+ 'fields_to_partial_json', 'type')
+
+
+def _safe_jobj_compliance(instance, jobj_method, uncompliant_field):
+ if hasattr(instance, jobj_method):
+ jobj = getattr(instance, jobj_method)()
+ if instance.le_acme_version == 2:
+ jobj.pop(uncompliant_field, None)
+ return jobj
+
+ raise AttributeError('Method {0}() is not implemented.'.format(jobj_method)) # pragma: no cover
diff --git a/acme/tests/challenges_test.py b/acme/tests/challenges_test.py
index 2b44d677d..433c7bb82 100644
--- a/acme/tests/challenges_test.py
+++ b/acme/tests/challenges_test.py
@@ -478,5 +478,18 @@ class DNSResponseTest(unittest.TestCase):
self.msg.check_validation(self.chall, KEY.public_key()))
+class JWSPayloadRFC8555Compliant(unittest.TestCase):
+ """Test for RFC8555 compliance of JWS generated from resources/challenges"""
+ def test_challenge_payload(self):
+ from acme.challenges import HTTP01Response
+
+ challenge_body = HTTP01Response()
+ challenge_body.le_acme_version = 2
+
+ jobj = challenge_body.json_dumps(indent=2).encode()
+ # RFC8555 states that challenge responses must have an empty payload.
+ self.assertEqual(jobj, b'{}')
+
+
if __name__ == '__main__':
unittest.main() # pragma: no cover
diff --git a/acme/tests/client_test.py b/acme/tests/client_test.py
index a4966140f..1e132d79f 100644
--- a/acme/tests/client_test.py
+++ b/acme/tests/client_test.py
@@ -16,6 +16,7 @@ from acme import errors
from acme import jws as acme_jws
from acme import messages
from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module
+from acme.mixins import VersionedLEACMEMixin
import messages_test
import test_util
@@ -886,7 +887,7 @@ class ClientV2Test(ClientTestBase):
self.client.net.get.assert_not_called()
-class MockJSONDeSerializable(jose.JSONDeSerializable):
+class MockJSONDeSerializable(VersionedLEACMEMixin, jose.JSONDeSerializable):
# pylint: disable=missing-docstring
def __init__(self, value):
self.value = value
diff --git a/acme/tests/messages_test.py b/acme/tests/messages_test.py
index b9b70266b..d53fb764c 100644
--- a/acme/tests/messages_test.py
+++ b/acme/tests/messages_test.py
@@ -453,6 +453,7 @@ class OrderResourceTest(unittest.TestCase):
'authorizations': None,
})
+
class NewOrderTest(unittest.TestCase):
"""Tests for acme.messages.NewOrder."""
@@ -467,5 +468,18 @@ class NewOrderTest(unittest.TestCase):
})
+class JWSPayloadRFC8555Compliant(unittest.TestCase):
+ """Test for RFC8555 compliance of JWS generated from resources/challenges"""
+ def test_message_payload(self):
+ from acme.messages import NewAuthorization
+
+ new_order = NewAuthorization()
+ new_order.le_acme_version = 2
+
+ jobj = new_order.json_dumps(indent=2).encode()
+ # RFC8555 states that JWS bodies must not have a resource field.
+ self.assertEqual(jobj, b'{}')
+
+
if __name__ == '__main__':
unittest.main() # pragma: no cover