Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dns_digitalocean.py « certbot_dns_digitalocean « certbot-dns-digitalocean - github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 5a4f22327f7feef1b5864fcfbe2461e3010a9828 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""DNS Authenticator for DigitalOcean."""
import logging

import digitalocean
import zope.interface

from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common

logger = logging.getLogger(__name__)


@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
    """DNS Authenticator for DigitalOcean

    This Authenticator uses the DigitalOcean API to fulfill a dns-01 challenge.
    """

    description = 'Obtain certs using a DNS TXT record (if you are using DigitalOcean for DNS).'

    def __init__(self, *args, **kwargs):
        super(Authenticator, self).__init__(*args, **kwargs)
        self.credentials = None

    @classmethod
    def add_parser_arguments(cls, add):  # pylint: disable=arguments-differ
        super(Authenticator, cls).add_parser_arguments(add)
        add('credentials', help='DigitalOcean credentials INI file.')

    def more_info(self):  # pylint: disable=missing-docstring,no-self-use
        return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
               'the DigitalOcean API.'

    def _setup_credentials(self):
        self.credentials = self._configure_credentials(
            'credentials',
            'DigitalOcean credentials INI file',
            {
                'token': 'API token for DigitalOcean account'
            }
        )

    def _perform(self, domain, validation_name, validation):
        self._get_digitalocean_client().add_txt_record(domain, validation_name, validation)

    def _cleanup(self, domain, validation_name, validation):
        self._get_digitalocean_client().del_txt_record(domain, validation_name, validation)

    def _get_digitalocean_client(self):
        return _DigitalOceanClient(self.credentials.conf('token'))


class _DigitalOceanClient(object):
    """
    Encapsulates all communication with the DigitalOcean API.
    """

    def __init__(self, token):
        self.manager = digitalocean.Manager(token=token)

    def add_txt_record(self, domain_name, record_name, record_content):
        """
        Add a TXT record using the supplied information.

        :param str domain_name: The domain to use to associate the record with.
        :param str record_name: The record name (typically beginning with '_acme-challenge.').
        :param str record_content: The record content (typically the challenge validation).
        :raises certbot.errors.PluginError: if an error occurs communicating with the DigitalOcean
                                            API
        """

        try:
            domain = self._find_domain(domain_name)
        except digitalocean.Error as e:
            hint = None

            if str(e).startswith("Unable to authenticate"):
                hint = 'Did you provide a valid API token?'

            logger.debug('Error finding domain using the DigitalOcean API: %s', e)
            raise errors.PluginError('Error finding domain using the DigitalOcean API: {0}{1}'
                                     .format(e, ' ({0})'.format(hint) if hint else ''))

        try:
            result = domain.create_new_domain_record(
                type='TXT',
                name=self._compute_record_name(domain, record_name),
                data=record_content)

            record_id = result['domain_record']['id']

            logger.debug('Successfully added TXT record with id: %d', record_id)
        except digitalocean.Error as e:
            logger.debug('Error adding TXT record using the DigitalOcean API: %s', e)
            raise errors.PluginError('Error adding TXT record using the DigitalOcean API: {0}'
                                     .format(e))

    def del_txt_record(self, domain_name, record_name, record_content):
        """
        Delete a TXT record using the supplied information.

        Note that both the record's name and content are used to ensure that similar records
        created concurrently (e.g., due to concurrent invocations of this plugin) are not deleted.

        Failures are logged, but not raised.

        :param str domain_name: The domain to use to associate the record with.
        :param str record_name: The record name (typically beginning with '_acme-challenge.').
        :param str record_content: The record content (typically the challenge validation).
        """

        try:
            domain = self._find_domain(domain_name)
        except digitalocean.Error as e:
            logger.debug('Error finding domain using the DigitalOcean API: %s', e)
            return

        try:
            domain_records = domain.get_records()

            matching_records = [record for record in domain_records
                                if record.type == 'TXT'
                                and record.name == self._compute_record_name(domain, record_name)
                                and record.data == record_content]
        except digitalocean.Error as e:
            logger.debug('Error getting DNS records using the DigitalOcean API: %s', e)
            return

        for record in matching_records:
            try:
                logger.debug('Removing TXT record with id: %s', record.id)
                record.destroy()
            except digitalocean.Error as e:
                logger.warning('Error deleting TXT record %s using the DigitalOcean API: %s',
                            record.id, e)

    def _find_domain(self, domain_name):
        """
        Find the domain object for a given domain name.

        :param str domain_name: The domain name for which to find the corresponding Domain.
        :returns: The Domain, if found.
        :rtype: `~digitalocean.Domain`
        :raises certbot.errors.PluginError: if no matching Domain is found.
        """

        domain_name_guesses = dns_common.base_domain_name_guesses(domain_name)

        domains = self.manager.get_all_domains()

        for guess in domain_name_guesses:
            matches = [domain for domain in domains if domain.name == guess]

            if len(matches) > 0:
                domain = matches[0]
                logger.debug('Found base domain for %s using name %s', domain_name, guess)
                return domain

        raise errors.PluginError('Unable to determine base domain for {0} using names: {1}.'
                                 .format(domain_name, domain_name_guesses))

    @staticmethod
    def _compute_record_name(domain, full_record_name):
        # The domain, from DigitalOcean's point of view, is automatically appended.
        return full_record_name.rpartition("." + domain.name)[0]