Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'certbot-dns-digitalocean/certbot_dns_digitalocean/dns_digitalocean.py')
-rw-r--r--certbot-dns-digitalocean/certbot_dns_digitalocean/dns_digitalocean.py168
1 files changed, 0 insertions, 168 deletions
diff --git a/certbot-dns-digitalocean/certbot_dns_digitalocean/dns_digitalocean.py b/certbot-dns-digitalocean/certbot_dns_digitalocean/dns_digitalocean.py
deleted file mode 100644
index 5a4f22327..000000000
--- a/certbot-dns-digitalocean/certbot_dns_digitalocean/dns_digitalocean.py
+++ /dev/null
@@ -1,168 +0,0 @@
-"""DNS Authenticator for DigitalOcean."""
-import logging
-
-import digitalocean
-import zope.interface
-
-from certbot import errors
-from certbot import interfaces
-from certbot.plugins import dns_common
-
-logger = logging.getLogger(__name__)
-
-
-@zope.interface.implementer(interfaces.IAuthenticator)
-@zope.interface.provider(interfaces.IPluginFactory)
-class Authenticator(dns_common.DNSAuthenticator):
- """DNS Authenticator for DigitalOcean
-
- This Authenticator uses the DigitalOcean API to fulfill a dns-01 challenge.
- """
-
- description = 'Obtain certs using a DNS TXT record (if you are using DigitalOcean for DNS).'
-
- def __init__(self, *args, **kwargs):
- super(Authenticator, self).__init__(*args, **kwargs)
- self.credentials = None
-
- @classmethod
- def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
- super(Authenticator, cls).add_parser_arguments(add)
- add('credentials', help='DigitalOcean credentials INI file.')
-
- def more_info(self): # pylint: disable=missing-docstring,no-self-use
- return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
- 'the DigitalOcean API.'
-
- def _setup_credentials(self):
- self.credentials = self._configure_credentials(
- 'credentials',
- 'DigitalOcean credentials INI file',
- {
- 'token': 'API token for DigitalOcean account'
- }
- )
-
- def _perform(self, domain, validation_name, validation):
- self._get_digitalocean_client().add_txt_record(domain, validation_name, validation)
-
- def _cleanup(self, domain, validation_name, validation):
- self._get_digitalocean_client().del_txt_record(domain, validation_name, validation)
-
- def _get_digitalocean_client(self):
- return _DigitalOceanClient(self.credentials.conf('token'))
-
-
-class _DigitalOceanClient(object):
- """
- Encapsulates all communication with the DigitalOcean API.
- """
-
- def __init__(self, token):
- self.manager = digitalocean.Manager(token=token)
-
- def add_txt_record(self, domain_name, record_name, record_content):
- """
- Add a TXT record using the supplied information.
-
- :param str domain_name: The domain to use to associate the record with.
- :param str record_name: The record name (typically beginning with '_acme-challenge.').
- :param str record_content: The record content (typically the challenge validation).
- :raises certbot.errors.PluginError: if an error occurs communicating with the DigitalOcean
- API
- """
-
- try:
- domain = self._find_domain(domain_name)
- except digitalocean.Error as e:
- hint = None
-
- if str(e).startswith("Unable to authenticate"):
- hint = 'Did you provide a valid API token?'
-
- logger.debug('Error finding domain using the DigitalOcean API: %s', e)
- raise errors.PluginError('Error finding domain using the DigitalOcean API: {0}{1}'
- .format(e, ' ({0})'.format(hint) if hint else ''))
-
- try:
- result = domain.create_new_domain_record(
- type='TXT',
- name=self._compute_record_name(domain, record_name),
- data=record_content)
-
- record_id = result['domain_record']['id']
-
- logger.debug('Successfully added TXT record with id: %d', record_id)
- except digitalocean.Error as e:
- logger.debug('Error adding TXT record using the DigitalOcean API: %s', e)
- raise errors.PluginError('Error adding TXT record using the DigitalOcean API: {0}'
- .format(e))
-
- def del_txt_record(self, domain_name, record_name, record_content):
- """
- Delete a TXT record using the supplied information.
-
- Note that both the record's name and content are used to ensure that similar records
- created concurrently (e.g., due to concurrent invocations of this plugin) are not deleted.
-
- Failures are logged, but not raised.
-
- :param str domain_name: The domain to use to associate the record with.
- :param str record_name: The record name (typically beginning with '_acme-challenge.').
- :param str record_content: The record content (typically the challenge validation).
- """
-
- try:
- domain = self._find_domain(domain_name)
- except digitalocean.Error as e:
- logger.debug('Error finding domain using the DigitalOcean API: %s', e)
- return
-
- try:
- domain_records = domain.get_records()
-
- matching_records = [record for record in domain_records
- if record.type == 'TXT'
- and record.name == self._compute_record_name(domain, record_name)
- and record.data == record_content]
- except digitalocean.Error as e:
- logger.debug('Error getting DNS records using the DigitalOcean API: %s', e)
- return
-
- for record in matching_records:
- try:
- logger.debug('Removing TXT record with id: %s', record.id)
- record.destroy()
- except digitalocean.Error as e:
- logger.warning('Error deleting TXT record %s using the DigitalOcean API: %s',
- record.id, e)
-
- def _find_domain(self, domain_name):
- """
- Find the domain object for a given domain name.
-
- :param str domain_name: The domain name for which to find the corresponding Domain.
- :returns: The Domain, if found.
- :rtype: `~digitalocean.Domain`
- :raises certbot.errors.PluginError: if no matching Domain is found.
- """
-
- domain_name_guesses = dns_common.base_domain_name_guesses(domain_name)
-
- domains = self.manager.get_all_domains()
-
- for guess in domain_name_guesses:
- matches = [domain for domain in domains if domain.name == guess]
-
- if len(matches) > 0:
- domain = matches[0]
- logger.debug('Found base domain for %s using name %s', domain_name, guess)
- return domain
-
- raise errors.PluginError('Unable to determine base domain for {0} using names: {1}.'
- .format(domain_name, domain_name_guesses))
-
- @staticmethod
- def _compute_record_name(domain, full_record_name):
- # The domain, from DigitalOcean's point of view, is automatically appended.
- return full_record_name.rpartition("." + domain.name)[0]