diff options
Diffstat (limited to 'certbot-dns-rfc2136/certbot_dns_rfc2136/_internal/dns_rfc2136.py')
-rw-r--r-- | certbot-dns-rfc2136/certbot_dns_rfc2136/_internal/dns_rfc2136.py | 226 |
1 files changed, 226 insertions, 0 deletions
diff --git a/certbot-dns-rfc2136/certbot_dns_rfc2136/_internal/dns_rfc2136.py b/certbot-dns-rfc2136/certbot_dns_rfc2136/_internal/dns_rfc2136.py new file mode 100644 index 000000000..cb4d5addb --- /dev/null +++ b/certbot-dns-rfc2136/certbot_dns_rfc2136/_internal/dns_rfc2136.py @@ -0,0 +1,226 @@ +"""DNS Authenticator using RFC 2136 Dynamic Updates.""" +import logging + +import dns.flags +import dns.message +import dns.name +import dns.query +import dns.rdataclass +import dns.rdatatype +import dns.tsig +import dns.tsigkeyring +import dns.update +import zope.interface + +from certbot import errors +from certbot import interfaces +from certbot.plugins import dns_common + +logger = logging.getLogger(__name__) + + +@zope.interface.implementer(interfaces.IAuthenticator) +@zope.interface.provider(interfaces.IPluginFactory) +class Authenticator(dns_common.DNSAuthenticator): + """DNS Authenticator using RFC 2136 Dynamic Updates + + This Authenticator uses RFC 2136 Dynamic Updates to fulfull a dns-01 challenge. + """ + + ALGORITHMS = { + 'HMAC-MD5': dns.tsig.HMAC_MD5, + 'HMAC-SHA1': dns.tsig.HMAC_SHA1, + 'HMAC-SHA224': dns.tsig.HMAC_SHA224, + 'HMAC-SHA256': dns.tsig.HMAC_SHA256, + 'HMAC-SHA384': dns.tsig.HMAC_SHA384, + 'HMAC-SHA512': dns.tsig.HMAC_SHA512 + } + + PORT = 53 + + description = 'Obtain certificates using a DNS TXT record (if you are using BIND for DNS).' + ttl = 120 + + def __init__(self, *args, **kwargs): + super(Authenticator, self).__init__(*args, **kwargs) + self.credentials = None + + @classmethod + def add_parser_arguments(cls, add): # pylint: disable=arguments-differ + super(Authenticator, cls).add_parser_arguments(add, default_propagation_seconds=60) + add('credentials', help='RFC 2136 credentials INI file.') + + def more_info(self): # pylint: disable=missing-docstring,no-self-use + return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \ + 'RFC 2136 Dynamic Updates.' + + def _validate_algorithm(self, credentials): + algorithm = credentials.conf('algorithm') + if algorithm: + if not self.ALGORITHMS.get(algorithm.upper()): + raise errors.PluginError("Unknown algorithm: {0}.".format(algorithm)) + + def _setup_credentials(self): + self.credentials = self._configure_credentials( + 'credentials', + 'RFC 2136 credentials INI file', + { + 'name': 'TSIG key name', + 'secret': 'TSIG key secret', + 'server': 'The target DNS server' + }, + self._validate_algorithm + ) + + def _perform(self, _domain, validation_name, validation): + self._get_rfc2136_client().add_txt_record(validation_name, validation, self.ttl) + + def _cleanup(self, _domain, validation_name, validation): + self._get_rfc2136_client().del_txt_record(validation_name, validation) + + def _get_rfc2136_client(self): + return _RFC2136Client(self.credentials.conf('server'), + int(self.credentials.conf('port') or self.PORT), + self.credentials.conf('name'), + self.credentials.conf('secret'), + self.ALGORITHMS.get(self.credentials.conf('algorithm'), + dns.tsig.HMAC_MD5)) + + +class _RFC2136Client(object): + """ + Encapsulates all communication with the target DNS server. + """ + def __init__(self, server, port, key_name, key_secret, key_algorithm): + self.server = server + self.port = port + self.keyring = dns.tsigkeyring.from_text({ + key_name: key_secret + }) + self.algorithm = key_algorithm + + def add_txt_record(self, record_name, record_content, record_ttl): + """ + Add a TXT record using the supplied information. + + :param str record_name: The record name (typically beginning with '_acme-challenge.'). + :param str record_content: The record content (typically the challenge validation). + :param int record_ttl: The record TTL (number of seconds that the record may be cached). + :raises certbot.errors.PluginError: if an error occurs communicating with the DNS server + """ + + domain = self._find_domain(record_name) + + n = dns.name.from_text(record_name) + o = dns.name.from_text(domain) + rel = n.relativize(o) + + update = dns.update.Update( + domain, + keyring=self.keyring, + keyalgorithm=self.algorithm) + update.add(rel, record_ttl, dns.rdatatype.TXT, record_content) + + try: + response = dns.query.tcp(update, self.server, port=self.port) + except Exception as e: + raise errors.PluginError('Encountered error adding TXT record: {0}' + .format(e)) + rcode = response.rcode() + + if rcode == dns.rcode.NOERROR: + logger.debug('Successfully added TXT record %s', record_name) + else: + raise errors.PluginError('Received response from server: {0}' + .format(dns.rcode.to_text(rcode))) + + def del_txt_record(self, record_name, record_content): + """ + Delete a TXT record using the supplied information. + + :param str record_name: The record name (typically beginning with '_acme-challenge.'). + :param str record_content: The record content (typically the challenge validation). + :param int record_ttl: The record TTL (number of seconds that the record may be cached). + :raises certbot.errors.PluginError: if an error occurs communicating with the DNS server + """ + + domain = self._find_domain(record_name) + + n = dns.name.from_text(record_name) + o = dns.name.from_text(domain) + rel = n.relativize(o) + + update = dns.update.Update( + domain, + keyring=self.keyring, + keyalgorithm=self.algorithm) + update.delete(rel, dns.rdatatype.TXT, record_content) + + try: + response = dns.query.tcp(update, self.server, port=self.port) + except Exception as e: + raise errors.PluginError('Encountered error deleting TXT record: {0}' + .format(e)) + rcode = response.rcode() + + if rcode == dns.rcode.NOERROR: + logger.debug('Successfully deleted TXT record %s', record_name) + else: + raise errors.PluginError('Received response from server: {0}' + .format(dns.rcode.to_text(rcode))) + + def _find_domain(self, record_name): + """ + Find the closest domain with an SOA record for a given domain name. + + :param str record_name: The record name for which to find the closest SOA record. + :returns: The domain, if found. + :rtype: str + :raises certbot.errors.PluginError: if no SOA record can be found. + """ + + domain_name_guesses = dns_common.base_domain_name_guesses(record_name) + + # Loop through until we find an authoritative SOA record + for guess in domain_name_guesses: + if self._query_soa(guess): + return guess + + raise errors.PluginError('Unable to determine base domain for {0} using names: {1}.' + .format(record_name, domain_name_guesses)) + + def _query_soa(self, domain_name): + """ + Query a domain name for an authoritative SOA record. + + :param str domain_name: The domain name to query for an SOA record. + :returns: True if found, False otherwise. + :rtype: bool + :raises certbot.errors.PluginError: if no response is received. + """ + + domain = dns.name.from_text(domain_name) + + request = dns.message.make_query(domain, dns.rdatatype.SOA, dns.rdataclass.IN) + # Turn off Recursion Desired bit in query + request.flags ^= dns.flags.RD + + try: + try: + response = dns.query.tcp(request, self.server, port=self.port) + except OSError as e: + logger.debug('TCP query failed, fallback to UDP: %s', e) + response = dns.query.udp(request, self.server, port=self.port) + rcode = response.rcode() + + # Authoritative Answer bit should be set + if (rcode == dns.rcode.NOERROR and response.get_rrset(response.answer, + domain, dns.rdataclass.IN, dns.rdatatype.SOA) and response.flags & dns.flags.AA): + logger.debug('Received authoritative SOA response for %s', domain_name) + return True + + logger.debug('No authoritative SOA record found for %s', domain_name) + return False + except Exception as e: + raise errors.PluginError('Encountered error when making query: {0}' + .format(e)) |