Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'certbot-dns-route53/certbot_dns_route53/_internal/dns_route53.py')
-rw-r--r--certbot-dns-route53/certbot_dns_route53/_internal/dns_route53.py154
1 files changed, 154 insertions, 0 deletions
diff --git a/certbot-dns-route53/certbot_dns_route53/_internal/dns_route53.py b/certbot-dns-route53/certbot_dns_route53/_internal/dns_route53.py
new file mode 100644
index 000000000..637558304
--- /dev/null
+++ b/certbot-dns-route53/certbot_dns_route53/_internal/dns_route53.py
@@ -0,0 +1,154 @@
+"""Certbot Route53 authenticator plugin."""
+import collections
+import logging
+import time
+
+import boto3
+from botocore.exceptions import ClientError
+from botocore.exceptions import NoCredentialsError
+import zope.interface
+
+from acme.magic_typing import DefaultDict # pylint: disable=unused-import, no-name-in-module
+from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module
+from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
+from certbot import errors
+from certbot import interfaces
+from certbot.plugins import dns_common
+
+logger = logging.getLogger(__name__)
+
+INSTRUCTIONS = (
+ "To use certbot-dns-route53, configure credentials as described at "
+ "https://boto3.readthedocs.io/en/latest/guide/configuration.html#best-practices-for-configuring-credentials " # pylint: disable=line-too-long
+ "and add the necessary permissions for Route53 access.")
+
+
+@zope.interface.implementer(interfaces.IAuthenticator)
+@zope.interface.provider(interfaces.IPluginFactory)
+class Authenticator(dns_common.DNSAuthenticator):
+ """Route53 Authenticator
+
+ This authenticator solves a DNS01 challenge by uploading the answer to AWS
+ Route53.
+ """
+
+ description = ("Obtain certificates using a DNS TXT record (if you are using AWS Route53 for "
+ "DNS).")
+ ttl = 10
+
+ def __init__(self, *args, **kwargs):
+ super(Authenticator, self).__init__(*args, **kwargs)
+ self.r53 = boto3.client("route53")
+ self._resource_records = collections.defaultdict(list) # type: DefaultDict[str, List[Dict[str, str]]]
+
+ def more_info(self): # pylint: disable=missing-docstring,no-self-use
+ return "Solve a DNS01 challenge using AWS Route53"
+
+ def _setup_credentials(self):
+ pass
+
+ def _perform(self, domain, validation_name, validation): # pylint: disable=missing-docstring
+ pass
+
+ def perform(self, achalls):
+ self._attempt_cleanup = True
+
+ try:
+ change_ids = [
+ self._change_txt_record("UPSERT",
+ achall.validation_domain_name(achall.domain),
+ achall.validation(achall.account_key))
+ for achall in achalls
+ ]
+
+ for change_id in change_ids:
+ self._wait_for_change(change_id)
+ except (NoCredentialsError, ClientError) as e:
+ logger.debug('Encountered error during perform: %s', e, exc_info=True)
+ raise errors.PluginError("\n".join([str(e), INSTRUCTIONS]))
+ return [achall.response(achall.account_key) for achall in achalls]
+
+ def _cleanup(self, domain, validation_name, validation):
+ try:
+ self._change_txt_record("DELETE", validation_name, validation)
+ except (NoCredentialsError, ClientError) as e:
+ logger.debug('Encountered error during cleanup: %s', e, exc_info=True)
+
+ def _find_zone_id_for_domain(self, domain):
+ """Find the zone id responsible a given FQDN.
+
+ That is, the id for the zone whose name is the longest parent of the
+ domain.
+ """
+ paginator = self.r53.get_paginator("list_hosted_zones")
+ zones = []
+ target_labels = domain.rstrip(".").split(".")
+ for page in paginator.paginate():
+ for zone in page["HostedZones"]:
+ if zone["Config"]["PrivateZone"]:
+ continue
+
+ candidate_labels = zone["Name"].rstrip(".").split(".")
+ if candidate_labels == target_labels[-len(candidate_labels):]:
+ zones.append((zone["Name"], zone["Id"]))
+
+ if not zones:
+ raise errors.PluginError(
+ "Unable to find a Route53 hosted zone for {0}".format(domain)
+ )
+
+ # Order the zones that are suffixes for our desired to domain by
+ # length, this puts them in an order like:
+ # ["foo.bar.baz.com", "bar.baz.com", "baz.com", "com"]
+ # And then we choose the first one, which will be the most specific.
+ zones.sort(key=lambda z: len(z[0]), reverse=True)
+ return zones[0][1]
+
+ def _change_txt_record(self, action, validation_domain_name, validation):
+ zone_id = self._find_zone_id_for_domain(validation_domain_name)
+
+ rrecords = self._resource_records[validation_domain_name]
+ challenge = {"Value": '"{0}"'.format(validation)}
+ if action == "DELETE":
+ # Remove the record being deleted from the list of tracked records
+ rrecords.remove(challenge)
+ if rrecords:
+ # Need to update instead, as we're not deleting the rrset
+ action = "UPSERT"
+ else:
+ # Create a new list containing the record to use with DELETE
+ rrecords = [challenge]
+ else:
+ rrecords.append(challenge)
+
+ response = self.r53.change_resource_record_sets(
+ HostedZoneId=zone_id,
+ ChangeBatch={
+ "Comment": "certbot-dns-route53 certificate validation " + action,
+ "Changes": [
+ {
+ "Action": action,
+ "ResourceRecordSet": {
+ "Name": validation_domain_name,
+ "Type": "TXT",
+ "TTL": self.ttl,
+ "ResourceRecords": rrecords,
+ }
+ }
+ ]
+ }
+ )
+ return response["ChangeInfo"]["Id"]
+
+ def _wait_for_change(self, change_id):
+ """Wait for a change to be propagated to all Route53 DNS servers.
+ https://docs.aws.amazon.com/Route53/latest/APIReference/API_GetChange.html
+ """
+ for unused_n in range(0, 120):
+ response = self.r53.get_change(Id=change_id)
+ if response["ChangeInfo"]["Status"] == "INSYNC":
+ return
+ time.sleep(5)
+ raise errors.PluginError(
+ "Timed out waiting for Route53 change. Current status: %s" %
+ response["ChangeInfo"]["Status"])