Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'certbot-dns-route53/certbot_dns_route53/dns_route53.py')
-rw-r--r--certbot-dns-route53/certbot_dns_route53/dns_route53.py151
1 files changed, 0 insertions, 151 deletions
diff --git a/certbot-dns-route53/certbot_dns_route53/dns_route53.py b/certbot-dns-route53/certbot_dns_route53/dns_route53.py
deleted file mode 100644
index f71935de2..000000000
--- a/certbot-dns-route53/certbot_dns_route53/dns_route53.py
+++ /dev/null
@@ -1,151 +0,0 @@
-"""Certbot Route53 authenticator plugin."""
-import collections
-import logging
-import time
-
-import boto3
-import zope.interface
-from botocore.exceptions import NoCredentialsError, ClientError
-
-from certbot import errors
-from certbot import interfaces
-from certbot.plugins import dns_common
-
-from acme.magic_typing import DefaultDict, List, Dict # pylint: disable=unused-import, no-name-in-module
-
-logger = logging.getLogger(__name__)
-
-INSTRUCTIONS = (
- "To use certbot-dns-route53, configure credentials as described at "
- "https://boto3.readthedocs.io/en/latest/guide/configuration.html#best-practices-for-configuring-credentials " # pylint: disable=line-too-long
- "and add the necessary permissions for Route53 access.")
-
-@zope.interface.implementer(interfaces.IAuthenticator)
-@zope.interface.provider(interfaces.IPluginFactory)
-class Authenticator(dns_common.DNSAuthenticator):
- """Route53 Authenticator
-
- This authenticator solves a DNS01 challenge by uploading the answer to AWS
- Route53.
- """
-
- description = ("Obtain certificates using a DNS TXT record (if you are using AWS Route53 for "
- "DNS).")
- ttl = 10
-
- def __init__(self, *args, **kwargs):
- super(Authenticator, self).__init__(*args, **kwargs)
- self.r53 = boto3.client("route53")
- self._resource_records = collections.defaultdict(list) # type: DefaultDict[str, List[Dict[str, str]]]
-
- def more_info(self): # pylint: disable=missing-docstring,no-self-use
- return "Solve a DNS01 challenge using AWS Route53"
-
- def _setup_credentials(self):
- pass
-
- def _perform(self, domain, validation_domain_name, validation): # pylint: disable=missing-docstring
- pass
-
- def perform(self, achalls):
- self._attempt_cleanup = True
-
- try:
- change_ids = [
- self._change_txt_record("UPSERT",
- achall.validation_domain_name(achall.domain),
- achall.validation(achall.account_key))
- for achall in achalls
- ]
-
- for change_id in change_ids:
- self._wait_for_change(change_id)
- except (NoCredentialsError, ClientError) as e:
- logger.debug('Encountered error during perform: %s', e, exc_info=True)
- raise errors.PluginError("\n".join([str(e), INSTRUCTIONS]))
- return [achall.response(achall.account_key) for achall in achalls]
-
- def _cleanup(self, domain, validation_domain_name, validation):
- try:
- self._change_txt_record("DELETE", validation_domain_name, validation)
- except (NoCredentialsError, ClientError) as e:
- logger.debug('Encountered error during cleanup: %s', e, exc_info=True)
-
- def _find_zone_id_for_domain(self, domain):
- """Find the zone id responsible a given FQDN.
-
- That is, the id for the zone whose name is the longest parent of the
- domain.
- """
- paginator = self.r53.get_paginator("list_hosted_zones")
- zones = []
- target_labels = domain.rstrip(".").split(".")
- for page in paginator.paginate():
- for zone in page["HostedZones"]:
- if zone["Config"]["PrivateZone"]:
- continue
-
- candidate_labels = zone["Name"].rstrip(".").split(".")
- if candidate_labels == target_labels[-len(candidate_labels):]:
- zones.append((zone["Name"], zone["Id"]))
-
- if not zones:
- raise errors.PluginError(
- "Unable to find a Route53 hosted zone for {0}".format(domain)
- )
-
- # Order the zones that are suffixes for our desired to domain by
- # length, this puts them in an order like:
- # ["foo.bar.baz.com", "bar.baz.com", "baz.com", "com"]
- # And then we choose the first one, which will be the most specific.
- zones.sort(key=lambda z: len(z[0]), reverse=True)
- return zones[0][1]
-
- def _change_txt_record(self, action, validation_domain_name, validation):
- zone_id = self._find_zone_id_for_domain(validation_domain_name)
-
- rrecords = self._resource_records[validation_domain_name]
- challenge = {"Value": '"{0}"'.format(validation)}
- if action == "DELETE":
- # Remove the record being deleted from the list of tracked records
- rrecords.remove(challenge)
- if rrecords:
- # Need to update instead, as we're not deleting the rrset
- action = "UPSERT"
- else:
- # Create a new list containing the record to use with DELETE
- rrecords = [challenge]
- else:
- rrecords.append(challenge)
-
- response = self.r53.change_resource_record_sets(
- HostedZoneId=zone_id,
- ChangeBatch={
- "Comment": "certbot-dns-route53 certificate validation " + action,
- "Changes": [
- {
- "Action": action,
- "ResourceRecordSet": {
- "Name": validation_domain_name,
- "Type": "TXT",
- "TTL": self.ttl,
- "ResourceRecords": rrecords,
- }
- }
- ]
- }
- )
- return response["ChangeInfo"]["Id"]
-
- def _wait_for_change(self, change_id):
- """Wait for a change to be propagated to all Route53 DNS servers.
- https://docs.aws.amazon.com/Route53/latest/APIReference/API_GetChange.html
- """
- for unused_n in range(0, 120):
- response = self.r53.get_change(Id=change_id)
- if response["ChangeInfo"]["Status"] == "INSYNC":
- return
- time.sleep(5)
- raise errors.PluginError(
- "Timed out waiting for Route53 change. Current status: %s" %
- response["ChangeInfo"]["Status"])