Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dns_cloudflare.py « _internal « certbot_dns_cloudflare « certbot-dns-cloudflare - github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 22124ac04ffdd93cc7dffa7bfde9d33722321068 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""DNS Authenticator for Cloudflare."""
import logging

import CloudFlare
import zope.interface

from acme.magic_typing import Any
from acme.magic_typing import Dict
from acme.magic_typing import List

from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common

logger = logging.getLogger(__name__)

ACCOUNT_URL = 'https://dash.cloudflare.com/profile/api-tokens'


@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
    """DNS Authenticator for Cloudflare

    This Authenticator uses the Cloudflare API to fulfill a dns-01 challenge.
    """

    description = ('Obtain certificates using a DNS TXT record (if you are using Cloudflare for '
                   'DNS).')
    ttl = 120

    def __init__(self, *args, **kwargs):
        super(Authenticator, self).__init__(*args, **kwargs)
        self.credentials = None

    @classmethod
    def add_parser_arguments(cls, add):  # pylint: disable=arguments-differ
        super(Authenticator, cls).add_parser_arguments(add)
        add('credentials', help='Cloudflare credentials INI file.')

    def more_info(self):  # pylint: disable=missing-docstring,no-self-use
        return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
               'the Cloudflare API.'

    def _validate_credentials(self, credentials):
        token = credentials.conf('api-token')
        email = credentials.conf('email')
        key = credentials.conf('api-key')
        if token:
            if email or key:
                raise errors.PluginError('{}: dns_cloudflare_email and dns_cloudflare_api_key are '
                                         'not needed when using an API Token'
                                         .format(credentials.confobj.filename))
        elif email or key:
            if not email:
                raise errors.PluginError('{}: dns_cloudflare_email is required when using a Global '
                                         'API Key. (should be email address associated with '
                                         'Cloudflare account)'.format(credentials.confobj.filename))
            if not key:
                raise errors.PluginError('{}: dns_cloudflare_api_key is required when using a '
                                         'Global API Key. (see {})'
                                         .format(credentials.confobj.filename, ACCOUNT_URL))
        else:
            raise errors.PluginError('{}: Either dns_cloudflare_api_token (recommended), or '
                                     'dns_cloudflare_email and dns_cloudflare_api_key are required.'
                                     ' (see {})'.format(credentials.confobj.filename, ACCOUNT_URL))

    def _setup_credentials(self):
        self.credentials = self._configure_credentials(
            'credentials',
            'Cloudflare credentials INI file',
            None,
            self._validate_credentials
        )

    def _perform(self, domain, validation_name, validation):
        self._get_cloudflare_client().add_txt_record(domain, validation_name, validation, self.ttl)

    def _cleanup(self, domain, validation_name, validation):
        self._get_cloudflare_client().del_txt_record(domain, validation_name, validation)

    def _get_cloudflare_client(self):
        if self.credentials.conf('api-token'):
            return _CloudflareClient(None, self.credentials.conf('api-token'))
        return _CloudflareClient(self.credentials.conf('email'), self.credentials.conf('api-key'))


class _CloudflareClient(object):
    """
    Encapsulates all communication with the Cloudflare API.
    """

    def __init__(self, email, api_key):
        self.cf = CloudFlare.CloudFlare(email, api_key)

    def add_txt_record(self, domain, record_name, record_content, record_ttl):
        """
        Add a TXT record using the supplied information.

        :param str domain: The domain to use to look up the Cloudflare zone.
        :param str record_name: The record name (typically beginning with '_acme-challenge.').
        :param str record_content: The record content (typically the challenge validation).
        :param int record_ttl: The record TTL (number of seconds that the record may be cached).
        :raises certbot.errors.PluginError: if an error occurs communicating with the Cloudflare API
        """

        zone_id = self._find_zone_id(domain)

        data = {'type': 'TXT',
                'name': record_name,
                'content': record_content,
                'ttl': record_ttl}

        try:
            logger.debug('Attempting to add record to zone %s: %s', zone_id, data)
            self.cf.zones.dns_records.post(zone_id, data=data)  # zones | pylint: disable=no-member
        except CloudFlare.exceptions.CloudFlareAPIError as e:
            code = int(e)
            hint = None

            if code == 9109:
                hint = 'Does your API token have "Zone:DNS:Edit" permissions?'

            logger.error('Encountered CloudFlareAPIError adding TXT record: %d %s', e, e)
            raise errors.PluginError('Error communicating with the Cloudflare API: {0}{1}'
                                     .format(e, ' ({0})'.format(hint) if hint else ''))

        record_id = self._find_txt_record_id(zone_id, record_name, record_content)
        logger.debug('Successfully added TXT record with record_id: %s', record_id)

    def del_txt_record(self, domain, record_name, record_content):
        """
        Delete a TXT record using the supplied information.

        Note that both the record's name and content are used to ensure that similar records
        created concurrently (e.g., due to concurrent invocations of this plugin) are not deleted.

        Failures are logged, but not raised.

        :param str domain: The domain to use to look up the Cloudflare zone.
        :param str record_name: The record name (typically beginning with '_acme-challenge.').
        :param str record_content: The record content (typically the challenge validation).
        """

        try:
            zone_id = self._find_zone_id(domain)
        except errors.PluginError as e:
            logger.debug('Encountered error finding zone_id during deletion: %s', e)
            return

        if zone_id:
            record_id = self._find_txt_record_id(zone_id, record_name, record_content)
            if record_id:
                try:
                    # zones | pylint: disable=no-member
                    self.cf.zones.dns_records.delete(zone_id, record_id)
                    logger.debug('Successfully deleted TXT record.')
                except CloudFlare.exceptions.CloudFlareAPIError as e:
                    logger.warning('Encountered CloudFlareAPIError deleting TXT record: %s', e)
            else:
                logger.debug('TXT record not found; no cleanup needed.')
        else:
            logger.debug('Zone not found; no cleanup needed.')

    def _find_zone_id(self, domain):
        """
        Find the zone_id for a given domain.

        :param str domain: The domain for which to find the zone_id.
        :returns: The zone_id, if found.
        :rtype: str
        :raises certbot.errors.PluginError: if no zone_id is found.
        """

        zone_name_guesses = dns_common.base_domain_name_guesses(domain)
        zones = []  # type: List[Dict[str, Any]]
        code = msg = None

        for zone_name in zone_name_guesses:
            params = {'name': zone_name,
                      'per_page': 1}

            try:
                zones = self.cf.zones.get(params=params)  # zones | pylint: disable=no-member
            except CloudFlare.exceptions.CloudFlareAPIError as e:
                code = int(e)
                msg = str(e)
                hint = None

                if code == 6003:
                    hint = ('Did you copy your entire API token/key? To use Cloudflare tokens, '
                            'you\'ll need the python package cloudflare>=2.3.1.{}'
                    .format(' This certbot is running cloudflare ' + str(CloudFlare.__version__)
                    if hasattr(CloudFlare, '__version__') else ''))
                elif code == 9103:
                    hint = 'Did you enter the correct email address and Global key?'
                elif code == 9109:
                    hint = 'Did you enter a valid Cloudflare Token?'

                if hint:
                    raise errors.PluginError('Error determining zone_id: {0} {1}. Please confirm '
                                  'that you have supplied valid Cloudflare API credentials. ({2})'
                                                                         .format(code, msg, hint))
                else:
                    logger.debug('Unrecognised CloudFlareAPIError while finding zone_id: %d %s. '
                                 'Continuing with next zone guess...', e, e)

            if zones:
                zone_id = zones[0]['id']
                logger.debug('Found zone_id of %s for %s using name %s', zone_id, domain, zone_name)
                return zone_id

        raise errors.PluginError('Unable to determine zone_id for {0} using zone names: {1}. '
                                'Please confirm that the domain name has been entered correctly '
                                'and is already associated with the supplied Cloudflare account.{2}'
                                .format(domain, zone_name_guesses, ' The error from Cloudflare was:'
                                ' {0} {1}'.format(code, msg) if code is not None else ''))

    def _find_txt_record_id(self, zone_id, record_name, record_content):
        """
        Find the record_id for a TXT record with the given name and content.

        :param str zone_id: The zone_id which contains the record.
        :param str record_name: The record name (typically beginning with '_acme-challenge.').
        :param str record_content: The record content (typically the challenge validation).
        :returns: The record_id, if found.
        :rtype: str
        """

        params = {'type': 'TXT',
                  'name': record_name,
                  'content': record_content,
                  'per_page': 1}
        try:
            # zones | pylint: disable=no-member
            records = self.cf.zones.dns_records.get(zone_id, params=params)
        except CloudFlare.exceptions.CloudFlareAPIError as e:
            logger.debug('Encountered CloudFlareAPIError getting TXT record_id: %s', e)
            records = []

        if records:
            # Cleanup is returning the system to the state we found it. If, for some reason,
            # there are multiple matching records, we only delete one because we only added one.
            return records[0]['id']
        logger.debug('Unable to find TXT record.')
        return None