diff options
author | James Balazs <43336188+JamesBalazs@users.noreply.github.com> | 2022-04-06 19:34:26 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-06 19:34:26 +0300 |
commit | 0fb5094250ea3f03d8fe81e430888a7b6b44b8b2 (patch) | |
tree | fb24e9ea05cfdb0ae947ca5014f021d170f117e7 /acme | |
parent | 87216372dd52277b10fa040591ec6ddfa17825e5 (diff) |
Add subproblems to errors (#7046) (#9258)
* Add subproblems to errors (#7046)
* Fix can't assign attribute
* Tidy up string representations of errors and add decoders for subproblems / identifiers
* Add missing attributes to docstring
* Move change to 1.27.0 in changelog
Diffstat (limited to 'acme')
-rw-r--r-- | acme/acme/messages.py | 118 | ||||
-rw-r--r-- | acme/tests/messages_test.py | 19 |
2 files changed, 85 insertions, 52 deletions
diff --git a/acme/acme/messages.py b/acme/acme/messages.py index 286178d63..9b9ef5de2 100644 --- a/acme/acme/messages.py +++ b/acme/acme/messages.py @@ -79,6 +79,55 @@ def is_acme_error(err: BaseException) -> bool: return False +class _Constant(jose.JSONDeSerializable, Hashable): + """ACME constant.""" + __slots__ = ('name',) + POSSIBLE_NAMES: Dict[str, '_Constant'] = NotImplemented + + def __init__(self, name: str) -> None: + super().__init__() + self.POSSIBLE_NAMES[name] = self # pylint: disable=unsupported-assignment-operation + self.name = name + + def to_partial_json(self) -> str: + return self.name + + @classmethod + def from_json(cls, jobj: str) -> '_Constant': + if jobj not in cls.POSSIBLE_NAMES: # pylint: disable=unsupported-membership-test + raise jose.DeserializationError(f'{cls.__name__} not recognized') + return cls.POSSIBLE_NAMES[jobj] + + def __repr__(self) -> str: + return f'{self.__class__.__name__}({self.name})' + + def __eq__(self, other: Any) -> bool: + return isinstance(other, type(self)) and other.name == self.name + + def __hash__(self) -> int: + return hash((self.__class__, self.name)) + + +class IdentifierType(_Constant): + """ACME identifier type.""" + POSSIBLE_NAMES: Dict[str, _Constant] = {} + + +IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder +IDENTIFIER_IP = IdentifierType('ip') # IdentifierIP in pebble - not in Boulder yet + + +class Identifier(jose.JSONObjectWithFields): + """ACME identifier. + + :ivar IdentifierType typ: + :ivar str value: + + """ + typ: IdentifierType = jose.field('type', decoder=IdentifierType.from_json) + value: str = jose.field('value') + + class Error(jose.JSONObjectWithFields, errors.Error): """ACME error. @@ -87,11 +136,23 @@ class Error(jose.JSONObjectWithFields, errors.Error): :ivar str typ: :ivar str title: :ivar str detail: + :ivar Identifier identifier: + :ivar tuple subproblems: An array of ACME Errors which may be present when the CA + returns multiple errors related to the same request, `tuple` of `Error`. """ typ: str = jose.field('type', omitempty=True, default='about:blank') title: str = jose.field('title', omitempty=True) detail: str = jose.field('detail', omitempty=True) + identifier: Optional['Identifier'] = jose.field( + 'identifier', decoder=Identifier.from_json, omitempty=True) + subproblems: Optional[Tuple['Error', ...]] = jose.field('subproblems', omitempty=True) + + # Mypy does not understand the josepy magic happening here, and falsely claims + # that subproblems is redefined. Let's ignore the type check here. + @subproblems.decoder # type: ignore + def subproblems(value: List[Dict[str, Any]]) -> Tuple['Error', ...]: # type: ignore[misc] # pylint: disable=no-self-argument,missing-function-docstring + return tuple(Error.from_json(subproblem) for subproblem in value) @classmethod def with_code(cls, code: str, **kwargs: Any) -> 'Error': @@ -135,39 +196,16 @@ class Error(jose.JSONObjectWithFields, errors.Error): return None def __str__(self) -> str: - return b' :: '.join( + result = b' :: '.join( part.encode('ascii', 'backslashreplace') for part in (self.typ, self.description, self.detail, self.title) if part is not None).decode() - - -class _Constant(jose.JSONDeSerializable, Hashable): - """ACME constant.""" - __slots__ = ('name',) - POSSIBLE_NAMES: Dict[str, '_Constant'] = NotImplemented - - def __init__(self, name: str) -> None: - super().__init__() - self.POSSIBLE_NAMES[name] = self # pylint: disable=unsupported-assignment-operation - self.name = name - - def to_partial_json(self) -> str: - return self.name - - @classmethod - def from_json(cls, jobj: str) -> '_Constant': - if jobj not in cls.POSSIBLE_NAMES: # pylint: disable=unsupported-membership-test - raise jose.DeserializationError(f'{cls.__name__} not recognized') - return cls.POSSIBLE_NAMES[jobj] - - def __repr__(self) -> str: - return f'{self.__class__.__name__}({self.name})' - - def __eq__(self, other: Any) -> bool: - return isinstance(other, type(self)) and other.name == self.name - - def __hash__(self) -> int: - return hash((self.__class__, self.name)) + if self.identifier: + result = f'Problem for {self.identifier.value}: ' + result # pylint: disable=no-member + if self.subproblems and len(self.subproblems) > 0: + for subproblem in self.subproblems: + result += f'\n{subproblem}' + return result class Status(_Constant): @@ -185,26 +223,6 @@ STATUS_READY = Status('ready') STATUS_DEACTIVATED = Status('deactivated') -class IdentifierType(_Constant): - """ACME identifier type.""" - POSSIBLE_NAMES: Dict[str, _Constant] = {} - - -IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder -IDENTIFIER_IP = IdentifierType('ip') # IdentifierIP in pebble - not in Boulder yet - - -class Identifier(jose.JSONObjectWithFields): - """ACME identifier. - - :ivar IdentifierType typ: - :ivar str value: - - """ - typ: IdentifierType = jose.field('type', decoder=IdentifierType.from_json) - value: str = jose.field('value') - - class HasResourceType(Protocol): """ Represents a class with a resource_type class parameter of type string. diff --git a/acme/tests/messages_test.py b/acme/tests/messages_test.py index 3f0f29215..cf7e7629a 100644 --- a/acme/tests/messages_test.py +++ b/acme/tests/messages_test.py @@ -17,7 +17,7 @@ class ErrorTest(unittest.TestCase): """Tests for acme.messages.Error.""" def setUp(self): - from acme.messages import Error, ERROR_PREFIX + from acme.messages import Error, ERROR_PREFIX, Identifier, IDENTIFIER_FQDN self.error = Error.with_code('malformed', detail='foo', title='title') self.jobj = { 'detail': 'foo', @@ -25,6 +25,9 @@ class ErrorTest(unittest.TestCase): 'type': ERROR_PREFIX + 'malformed', } self.error_custom = Error(typ='custom', detail='bar') + self.identifier = Identifier(typ=IDENTIFIER_FQDN, value='example.com') + self.subproblem = Error.with_code('caa', detail='bar', title='title', identifier=self.identifier) + self.error_with_subproblems = Error.with_code('malformed', detail='foo', title='title', subproblems=[self.subproblem]) self.empty_error = Error() def test_default_typ(self): @@ -39,6 +42,14 @@ class ErrorTest(unittest.TestCase): from acme.messages import Error hash(Error.from_json(self.error.to_json())) + def test_from_json_with_subproblems(self): + from acme.messages import Error + + parsed_error = Error.from_json(self.error_with_subproblems.to_json()) + + self.assertEqual(1, len(parsed_error.subproblems)) + self.assertEqual(self.subproblem, parsed_error.subproblems[0]) + def test_description(self): self.assertEqual('The request message was malformed', self.error.description) self.assertIsNone(self.error_custom.description) @@ -73,7 +84,11 @@ class ErrorTest(unittest.TestCase): str(self.error), u"{0.typ} :: {0.description} :: {0.detail} :: {0.title}" .format(self.error)) - + self.assertEqual( + str(self.error_with_subproblems), + (u"{0.typ} :: {0.description} :: {0.detail} :: {0.title}\n"+ + u"Problem for {1.identifier.value}: {1.typ} :: {1.description} :: {1.detail} :: {1.title}").format( + self.error_with_subproblems, self.subproblem)) class ConstantTest(unittest.TestCase): """Tests for acme.messages._Constant.""" |