Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'certbot-dns-digitalocean/certbot_dns_digitalocean/_internal/dns_digitalocean.py')
-rw-r--r--certbot-dns-digitalocean/certbot_dns_digitalocean/_internal/dns_digitalocean.py168
1 files changed, 168 insertions, 0 deletions
diff --git a/certbot-dns-digitalocean/certbot_dns_digitalocean/_internal/dns_digitalocean.py b/certbot-dns-digitalocean/certbot_dns_digitalocean/_internal/dns_digitalocean.py
new file mode 100644
index 000000000..7f3abbe31
--- /dev/null
+++ b/certbot-dns-digitalocean/certbot_dns_digitalocean/_internal/dns_digitalocean.py
@@ -0,0 +1,168 @@
+"""DNS Authenticator for DigitalOcean."""
+import logging
+
+import digitalocean
+import zope.interface
+
+from certbot import errors
+from certbot import interfaces
+from certbot.plugins import dns_common
+
+logger = logging.getLogger(__name__)
+
+
+@zope.interface.implementer(interfaces.IAuthenticator)
+@zope.interface.provider(interfaces.IPluginFactory)
+class Authenticator(dns_common.DNSAuthenticator):
+ """DNS Authenticator for DigitalOcean
+
+ This Authenticator uses the DigitalOcean API to fulfill a dns-01 challenge.
+ """
+
+ description = 'Obtain certs using a DNS TXT record (if you are using DigitalOcean for DNS).'
+
+ def __init__(self, *args, **kwargs):
+ super(Authenticator, self).__init__(*args, **kwargs)
+ self.credentials = None
+
+ @classmethod
+ def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
+ super(Authenticator, cls).add_parser_arguments(add)
+ add('credentials', help='DigitalOcean credentials INI file.')
+
+ def more_info(self): # pylint: disable=missing-docstring,no-self-use
+ return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
+ 'the DigitalOcean API.'
+
+ def _setup_credentials(self):
+ self.credentials = self._configure_credentials(
+ 'credentials',
+ 'DigitalOcean credentials INI file',
+ {
+ 'token': 'API token for DigitalOcean account'
+ }
+ )
+
+ def _perform(self, domain, validation_name, validation):
+ self._get_digitalocean_client().add_txt_record(domain, validation_name, validation)
+
+ def _cleanup(self, domain, validation_name, validation):
+ self._get_digitalocean_client().del_txt_record(domain, validation_name, validation)
+
+ def _get_digitalocean_client(self):
+ return _DigitalOceanClient(self.credentials.conf('token'))
+
+
+class _DigitalOceanClient(object):
+ """
+ Encapsulates all communication with the DigitalOcean API.
+ """
+
+ def __init__(self, token):
+ self.manager = digitalocean.Manager(token=token)
+
+ def add_txt_record(self, domain_name, record_name, record_content):
+ """
+ Add a TXT record using the supplied information.
+
+ :param str domain_name: The domain to use to associate the record with.
+ :param str record_name: The record name (typically beginning with '_acme-challenge.').
+ :param str record_content: The record content (typically the challenge validation).
+ :raises certbot.errors.PluginError: if an error occurs communicating with the DigitalOcean
+ API
+ """
+
+ try:
+ domain = self._find_domain(domain_name)
+ except digitalocean.Error as e:
+ hint = None
+
+ if str(e).startswith("Unable to authenticate"):
+ hint = 'Did you provide a valid API token?'
+
+ logger.debug('Error finding domain using the DigitalOcean API: %s', e)
+ raise errors.PluginError('Error finding domain using the DigitalOcean API: {0}{1}'
+ .format(e, ' ({0})'.format(hint) if hint else ''))
+
+ try:
+ result = domain.create_new_domain_record(
+ type='TXT',
+ name=self._compute_record_name(domain, record_name),
+ data=record_content)
+
+ record_id = result['domain_record']['id']
+
+ logger.debug('Successfully added TXT record with id: %d', record_id)
+ except digitalocean.Error as e:
+ logger.debug('Error adding TXT record using the DigitalOcean API: %s', e)
+ raise errors.PluginError('Error adding TXT record using the DigitalOcean API: {0}'
+ .format(e))
+
+ def del_txt_record(self, domain_name, record_name, record_content):
+ """
+ Delete a TXT record using the supplied information.
+
+ Note that both the record's name and content are used to ensure that similar records
+ created concurrently (e.g., due to concurrent invocations of this plugin) are not deleted.
+
+ Failures are logged, but not raised.
+
+ :param str domain_name: The domain to use to associate the record with.
+ :param str record_name: The record name (typically beginning with '_acme-challenge.').
+ :param str record_content: The record content (typically the challenge validation).
+ """
+
+ try:
+ domain = self._find_domain(domain_name)
+ except digitalocean.Error as e:
+ logger.debug('Error finding domain using the DigitalOcean API: %s', e)
+ return
+
+ try:
+ domain_records = domain.get_records()
+
+ matching_records = [record for record in domain_records
+ if record.type == 'TXT'
+ and record.name == self._compute_record_name(domain, record_name)
+ and record.data == record_content]
+ except digitalocean.Error as e:
+ logger.debug('Error getting DNS records using the DigitalOcean API: %s', e)
+ return
+
+ for record in matching_records:
+ try:
+ logger.debug('Removing TXT record with id: %s', record.id)
+ record.destroy()
+ except digitalocean.Error as e:
+ logger.warning('Error deleting TXT record %s using the DigitalOcean API: %s',
+ record.id, e)
+
+ def _find_domain(self, domain_name):
+ """
+ Find the domain object for a given domain name.
+
+ :param str domain_name: The domain name for which to find the corresponding Domain.
+ :returns: The Domain, if found.
+ :rtype: `~digitalocean.Domain`
+ :raises certbot.errors.PluginError: if no matching Domain is found.
+ """
+
+ domain_name_guesses = dns_common.base_domain_name_guesses(domain_name)
+
+ domains = self.manager.get_all_domains()
+
+ for guess in domain_name_guesses:
+ matches = [domain for domain in domains if domain.name == guess]
+
+ if matches:
+ domain = matches[0]
+ logger.debug('Found base domain for %s using name %s', domain_name, guess)
+ return domain
+
+ raise errors.PluginError('Unable to determine base domain for {0} using names: {1}.'
+ .format(domain_name, domain_name_guesses))
+
+ @staticmethod
+ def _compute_record_name(domain, full_record_name):
+ # The domain, from DigitalOcean's point of view, is automatically appended.
+ return full_record_name.rpartition("." + domain.name)[0]