diff options
Diffstat (limited to 'certbot-dns-route53/certbot_dns_route53/_internal/dns_route53.py')
-rw-r--r-- | certbot-dns-route53/certbot_dns_route53/_internal/dns_route53.py | 154 |
1 files changed, 154 insertions, 0 deletions
diff --git a/certbot-dns-route53/certbot_dns_route53/_internal/dns_route53.py b/certbot-dns-route53/certbot_dns_route53/_internal/dns_route53.py new file mode 100644 index 000000000..637558304 --- /dev/null +++ b/certbot-dns-route53/certbot_dns_route53/_internal/dns_route53.py @@ -0,0 +1,154 @@ +"""Certbot Route53 authenticator plugin.""" +import collections +import logging +import time + +import boto3 +from botocore.exceptions import ClientError +from botocore.exceptions import NoCredentialsError +import zope.interface + +from acme.magic_typing import DefaultDict # pylint: disable=unused-import, no-name-in-module +from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module +from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module +from certbot import errors +from certbot import interfaces +from certbot.plugins import dns_common + +logger = logging.getLogger(__name__) + +INSTRUCTIONS = ( + "To use certbot-dns-route53, configure credentials as described at " + "https://boto3.readthedocs.io/en/latest/guide/configuration.html#best-practices-for-configuring-credentials " # pylint: disable=line-too-long + "and add the necessary permissions for Route53 access.") + + +@zope.interface.implementer(interfaces.IAuthenticator) +@zope.interface.provider(interfaces.IPluginFactory) +class Authenticator(dns_common.DNSAuthenticator): + """Route53 Authenticator + + This authenticator solves a DNS01 challenge by uploading the answer to AWS + Route53. + """ + + description = ("Obtain certificates using a DNS TXT record (if you are using AWS Route53 for " + "DNS).") + ttl = 10 + + def __init__(self, *args, **kwargs): + super(Authenticator, self).__init__(*args, **kwargs) + self.r53 = boto3.client("route53") + self._resource_records = collections.defaultdict(list) # type: DefaultDict[str, List[Dict[str, str]]] + + def more_info(self): # pylint: disable=missing-docstring,no-self-use + return "Solve a DNS01 challenge using AWS Route53" + + def _setup_credentials(self): + pass + + def _perform(self, domain, validation_name, validation): # pylint: disable=missing-docstring + pass + + def perform(self, achalls): + self._attempt_cleanup = True + + try: + change_ids = [ + self._change_txt_record("UPSERT", + achall.validation_domain_name(achall.domain), + achall.validation(achall.account_key)) + for achall in achalls + ] + + for change_id in change_ids: + self._wait_for_change(change_id) + except (NoCredentialsError, ClientError) as e: + logger.debug('Encountered error during perform: %s', e, exc_info=True) + raise errors.PluginError("\n".join([str(e), INSTRUCTIONS])) + return [achall.response(achall.account_key) for achall in achalls] + + def _cleanup(self, domain, validation_name, validation): + try: + self._change_txt_record("DELETE", validation_name, validation) + except (NoCredentialsError, ClientError) as e: + logger.debug('Encountered error during cleanup: %s', e, exc_info=True) + + def _find_zone_id_for_domain(self, domain): + """Find the zone id responsible a given FQDN. + + That is, the id for the zone whose name is the longest parent of the + domain. + """ + paginator = self.r53.get_paginator("list_hosted_zones") + zones = [] + target_labels = domain.rstrip(".").split(".") + for page in paginator.paginate(): + for zone in page["HostedZones"]: + if zone["Config"]["PrivateZone"]: + continue + + candidate_labels = zone["Name"].rstrip(".").split(".") + if candidate_labels == target_labels[-len(candidate_labels):]: + zones.append((zone["Name"], zone["Id"])) + + if not zones: + raise errors.PluginError( + "Unable to find a Route53 hosted zone for {0}".format(domain) + ) + + # Order the zones that are suffixes for our desired to domain by + # length, this puts them in an order like: + # ["foo.bar.baz.com", "bar.baz.com", "baz.com", "com"] + # And then we choose the first one, which will be the most specific. + zones.sort(key=lambda z: len(z[0]), reverse=True) + return zones[0][1] + + def _change_txt_record(self, action, validation_domain_name, validation): + zone_id = self._find_zone_id_for_domain(validation_domain_name) + + rrecords = self._resource_records[validation_domain_name] + challenge = {"Value": '"{0}"'.format(validation)} + if action == "DELETE": + # Remove the record being deleted from the list of tracked records + rrecords.remove(challenge) + if rrecords: + # Need to update instead, as we're not deleting the rrset + action = "UPSERT" + else: + # Create a new list containing the record to use with DELETE + rrecords = [challenge] + else: + rrecords.append(challenge) + + response = self.r53.change_resource_record_sets( + HostedZoneId=zone_id, + ChangeBatch={ + "Comment": "certbot-dns-route53 certificate validation " + action, + "Changes": [ + { + "Action": action, + "ResourceRecordSet": { + "Name": validation_domain_name, + "Type": "TXT", + "TTL": self.ttl, + "ResourceRecords": rrecords, + } + } + ] + } + ) + return response["ChangeInfo"]["Id"] + + def _wait_for_change(self, change_id): + """Wait for a change to be propagated to all Route53 DNS servers. + https://docs.aws.amazon.com/Route53/latest/APIReference/API_GetChange.html + """ + for unused_n in range(0, 120): + response = self.r53.get_change(Id=change_id) + if response["ChangeInfo"]["Status"] == "INSYNC": + return + time.sleep(5) + raise errors.PluginError( + "Timed out waiting for Route53 change. Current status: %s" % + response["ChangeInfo"]["Status"]) |