Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'certbot-dns-rfc2136/certbot_dns_rfc2136/dns_rfc2136.py')
-rw-r--r--certbot-dns-rfc2136/certbot_dns_rfc2136/dns_rfc2136.py223
1 files changed, 0 insertions, 223 deletions
diff --git a/certbot-dns-rfc2136/certbot_dns_rfc2136/dns_rfc2136.py b/certbot-dns-rfc2136/certbot_dns_rfc2136/dns_rfc2136.py
deleted file mode 100644
index b8c01cdd3..000000000
--- a/certbot-dns-rfc2136/certbot_dns_rfc2136/dns_rfc2136.py
+++ /dev/null
@@ -1,223 +0,0 @@
-"""DNS Authenticator using RFC 2136 Dynamic Updates."""
-import logging
-
-import dns.flags
-import dns.message
-import dns.name
-import dns.query
-import dns.rdataclass
-import dns.rdatatype
-import dns.tsig
-import dns.tsigkeyring
-import dns.update
-import zope.interface
-
-from certbot import errors
-from certbot import interfaces
-from certbot.plugins import dns_common
-
-logger = logging.getLogger(__name__)
-
-
-@zope.interface.implementer(interfaces.IAuthenticator)
-@zope.interface.provider(interfaces.IPluginFactory)
-class Authenticator(dns_common.DNSAuthenticator):
- """DNS Authenticator using RFC 2136 Dynamic Updates
-
- This Authenticator uses RFC 2136 Dynamic Updates to fulfull a dns-01 challenge.
- """
-
- ALGORITHMS = {
- 'HMAC-MD5': dns.tsig.HMAC_MD5,
- 'HMAC-SHA1': dns.tsig.HMAC_SHA1,
- 'HMAC-SHA224': dns.tsig.HMAC_SHA224,
- 'HMAC-SHA256': dns.tsig.HMAC_SHA256,
- 'HMAC-SHA384': dns.tsig.HMAC_SHA384,
- 'HMAC-SHA512': dns.tsig.HMAC_SHA512
- }
-
- PORT = 53
-
- description = 'Obtain certificates using a DNS TXT record (if you are using BIND for DNS).'
- ttl = 120
-
- def __init__(self, *args, **kwargs):
- super(Authenticator, self).__init__(*args, **kwargs)
- self.credentials = None
-
- @classmethod
- def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
- super(Authenticator, cls).add_parser_arguments(add, default_propagation_seconds=60)
- add('credentials', help='RFC 2136 credentials INI file.')
-
- def more_info(self): # pylint: disable=missing-docstring,no-self-use
- return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
- 'RFC 2136 Dynamic Updates.'
-
- def _validate_algorithm(self, credentials):
- algorithm = credentials.conf('algorithm')
- if algorithm:
- if not self.ALGORITHMS.get(algorithm):
- raise errors.PluginError("Unknown algorithm: {0}.".format(algorithm))
-
- def _setup_credentials(self):
- self.credentials = self._configure_credentials(
- 'credentials',
- 'RFC 2136 credentials INI file',
- {
- 'name': 'TSIG key name',
- 'secret': 'TSIG key secret',
- 'server': 'The target DNS server'
- },
- self._validate_algorithm
- )
-
- def _perform(self, _domain, validation_name, validation):
- self._get_rfc2136_client().add_txt_record(validation_name, validation, self.ttl)
-
- def _cleanup(self, _domain, validation_name, validation):
- self._get_rfc2136_client().del_txt_record(validation_name, validation)
-
- def _get_rfc2136_client(self):
- return _RFC2136Client(self.credentials.conf('server'),
- int(self.credentials.conf('port') or self.PORT),
- self.credentials.conf('name'),
- self.credentials.conf('secret'),
- self.ALGORITHMS.get(self.credentials.conf('algorithm'),
- dns.tsig.HMAC_MD5))
-
-
-class _RFC2136Client(object):
- """
- Encapsulates all communication with the target DNS server.
- """
- def __init__(self, server, port, key_name, key_secret, key_algorithm):
- self.server = server
- self.port = port
- self.keyring = dns.tsigkeyring.from_text({
- key_name: key_secret
- })
- self.algorithm = key_algorithm
-
- def add_txt_record(self, record_name, record_content, record_ttl):
- """
- Add a TXT record using the supplied information.
-
- :param str record_name: The record name (typically beginning with '_acme-challenge.').
- :param str record_content: The record content (typically the challenge validation).
- :param int record_ttl: The record TTL (number of seconds that the record may be cached).
- :raises certbot.errors.PluginError: if an error occurs communicating with the DNS server
- """
-
- domain = self._find_domain(record_name)
-
- n = dns.name.from_text(record_name)
- o = dns.name.from_text(domain)
- rel = n.relativize(o)
-
- update = dns.update.Update(
- domain,
- keyring=self.keyring,
- keyalgorithm=self.algorithm)
- update.add(rel, record_ttl, dns.rdatatype.TXT, record_content)
-
- try:
- response = dns.query.tcp(update, self.server, port=self.port)
- except Exception as e:
- raise errors.PluginError('Encountered error adding TXT record: {0}'
- .format(e))
- rcode = response.rcode()
-
- if rcode == dns.rcode.NOERROR:
- logger.debug('Successfully added TXT record')
- else:
- raise errors.PluginError('Received response from server: {0}'
- .format(dns.rcode.to_text(rcode)))
-
- def del_txt_record(self, record_name, record_content):
- """
- Delete a TXT record using the supplied information.
-
- :param str record_name: The record name (typically beginning with '_acme-challenge.').
- :param str record_content: The record content (typically the challenge validation).
- :param int record_ttl: The record TTL (number of seconds that the record may be cached).
- :raises certbot.errors.PluginError: if an error occurs communicating with the DNS server
- """
-
- domain = self._find_domain(record_name)
-
- n = dns.name.from_text(record_name)
- o = dns.name.from_text(domain)
- rel = n.relativize(o)
-
- update = dns.update.Update(
- domain,
- keyring=self.keyring,
- keyalgorithm=self.algorithm)
- update.delete(rel, dns.rdatatype.TXT, record_content)
-
- try:
- response = dns.query.tcp(update, self.server, port=self.port)
- except Exception as e:
- raise errors.PluginError('Encountered error deleting TXT record: {0}'
- .format(e))
- rcode = response.rcode()
-
- if rcode == dns.rcode.NOERROR:
- logger.debug('Successfully deleted TXT record')
- else:
- raise errors.PluginError('Received response from server: {0}'
- .format(dns.rcode.to_text(rcode)))
-
- def _find_domain(self, record_name):
- """
- Find the closest domain with an SOA record for a given domain name.
-
- :param str record_name: The record name for which to find the closest SOA record.
- :returns: The domain, if found.
- :rtype: str
- :raises certbot.errors.PluginError: if no SOA record can be found.
- """
-
- domain_name_guesses = dns_common.base_domain_name_guesses(record_name)
-
- # Loop through until we find an authoritative SOA record
- for guess in domain_name_guesses:
- if self._query_soa(guess):
- return guess
-
- raise errors.PluginError('Unable to determine base domain for {0} using names: {1}.'
- .format(record_name, domain_name_guesses))
-
- def _query_soa(self, domain_name):
- """
- Query a domain name for an authoritative SOA record.
-
- :param str domain_name: The domain name to query for an SOA record.
- :returns: True if found, False otherwise.
- :rtype: bool
- :raises certbot.errors.PluginError: if no response is received.
- """
-
- domain = dns.name.from_text(domain_name)
-
- request = dns.message.make_query(domain, dns.rdatatype.SOA, dns.rdataclass.IN)
- # Turn off Recursion Desired bit in query
- request.flags ^= dns.flags.RD
-
- try:
- response = dns.query.udp(request, self.server, port=self.port)
- rcode = response.rcode()
-
- # Authoritative Answer bit should be set
- if (rcode == dns.rcode.NOERROR and response.get_rrset(response.answer,
- domain, dns.rdataclass.IN, dns.rdatatype.SOA) and response.flags & dns.flags.AA):
- logger.debug('Received authoritative SOA response for %s', domain_name)
- return True
-
- logger.debug('No authoritative SOA record found for %s', domain_name)
- return False
- except Exception as e:
- raise errors.PluginError('Encountered error when making query: {0}'
- .format(e))
-