Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dns_common.py « plugins « certbot - github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 931778b0707178139a02e2f44cfe18c70d0ebcdf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
"""Common code for DNS Authenticator Plugins."""

import abc
import logging
from time import sleep

import configobj
import zope.interface

from acme import challenges

from certbot import errors
from certbot import interfaces
from certbot.compat import filesystem
from certbot.compat import os
from certbot.display import ops
from certbot.display import util as display_util
from certbot.plugins import common

logger = logging.getLogger(__name__)


@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class DNSAuthenticator(common.Plugin):
    """Base class for DNS  Authenticators"""

    def __init__(self, config, name):
        super(DNSAuthenticator, self).__init__(config, name)

        self._attempt_cleanup = False

    @classmethod
    def add_parser_arguments(cls, add, default_propagation_seconds=10):  # pylint: disable=arguments-differ
        add('propagation-seconds',
            default=default_propagation_seconds,
            type=int,
            help='The number of seconds to wait for DNS to propagate before asking the ACME server '
                 'to verify the DNS record.')

    def get_chall_pref(self, unused_domain):  # pylint: disable=missing-docstring,no-self-use
        return [challenges.DNS01]

    def prepare(self): # pylint: disable=missing-docstring
        pass

    def perform(self, achalls): # pylint: disable=missing-docstring
        self._setup_credentials()

        self._attempt_cleanup = True

        responses = []
        for achall in achalls:
            domain = achall.domain
            validation_domain_name = achall.validation_domain_name(domain)
            validation = achall.validation(achall.account_key)

            self._perform(domain, validation_domain_name, validation)
            responses.append(achall.response(achall.account_key))

        # DNS updates take time to propagate and checking to see if the update has occurred is not
        # reliable (the machine this code is running on might be able to see an update before
        # the ACME server). So: we sleep for a short amount of time we believe to be long enough.
        logger.info("Waiting %d seconds for DNS changes to propagate",
                    self.conf('propagation-seconds'))
        sleep(self.conf('propagation-seconds'))

        return responses

    def cleanup(self, achalls):  # pylint: disable=missing-docstring
        if self._attempt_cleanup:
            for achall in achalls:
                domain = achall.domain
                validation_domain_name = achall.validation_domain_name(domain)
                validation = achall.validation(achall.account_key)

                self._cleanup(domain, validation_domain_name, validation)

    @abc.abstractmethod
    def _setup_credentials(self):  # pragma: no cover
        """
        Establish credentials, prompting if necessary.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def _perform(self, domain, validation_name, validation):  # pragma: no cover
        """
        Performs a dns-01 challenge by creating a DNS TXT record.

        :param str domain: The domain being validated.
        :param str validation_domain_name: The validation record domain name.
        :param str validation: The validation record content.
        :raises errors.PluginError: If the challenge cannot be performed
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def _cleanup(self, domain, validation_name, validation):  # pragma: no cover
        """
        Deletes the DNS TXT record which would have been created by `_perform_achall`.

        Fails gracefully if no such record exists.

        :param str domain: The domain being validated.
        :param str validation_domain_name: The validation record domain name.
        :param str validation: The validation record content.
        """
        raise NotImplementedError()

    def _configure(self, key, label):
        """
        Ensure that a configuration value is available.

        If necessary, prompts the user and stores the result.

        :param str key: The configuration key.
        :param str label: The user-friendly label for this piece of information.
        """

        configured_value = self.conf(key)
        if not configured_value:
            new_value = self._prompt_for_data(label)

            setattr(self.config, self.dest(key), new_value)

    def _configure_file(self, key, label, validator=None):
        """
        Ensure that a configuration value is available for a path.

        If necessary, prompts the user and stores the result.

        :param str key: The configuration key.
        :param str label: The user-friendly label for this piece of information.
        """

        configured_value = self.conf(key)
        if not configured_value:
            new_value = self._prompt_for_file(label, validator)

            setattr(self.config, self.dest(key), os.path.abspath(os.path.expanduser(new_value)))

    def _configure_credentials(self, key, label, required_variables=None, validator=None):
        """
        As `_configure_file`, but for a credential configuration file.

        If necessary, prompts the user and stores the result.

        Always stores absolute paths to avoid issues during renewal.

        :param str key: The configuration key.
        :param str label: The user-friendly label for this piece of information.
        :param dict required_variables: Map of variable which must be present to error to display.
        :param callable validator: A method which will be called to validate the
            `CredentialsConfiguration` resulting from the supplied input after it has been validated
            to contain the `required_variables`. Should throw a `~certbot.errors.PluginError` to
            indicate any issue.
        """

        def __validator(filename):
            configuration = CredentialsConfiguration(filename, self.dest)

            if required_variables:
                configuration.require(required_variables)

            if validator:
                validator(configuration)

        self._configure_file(key, label, __validator)

        credentials_configuration = CredentialsConfiguration(self.conf(key), self.dest)
        if required_variables:
            credentials_configuration.require(required_variables)

        if validator:
            validator(credentials_configuration)

        return credentials_configuration

    @staticmethod
    def _prompt_for_data(label):
        """
        Prompt the user for a piece of information.

        :param str label: The user-friendly label for this piece of information.
        :returns: The user's response (guaranteed non-empty).
        :rtype: str
        """

        def __validator(i):
            if not i:
                raise errors.PluginError('Please enter your {0}.'.format(label))

        code, response = ops.validated_input(
            __validator,
            'Input your {0}'.format(label),
            force_interactive=True)

        if code == display_util.OK:
            return response
        else:
            raise errors.PluginError('{0} required to proceed.'.format(label))

    @staticmethod
    def _prompt_for_file(label, validator=None):
        """
        Prompt the user for a path.

        :param str label: The user-friendly label for the file.
        :param callable validator: A method which will be called to validate the supplied input
            after it has been validated to be a non-empty path to an existing file. Should throw a
            `~certbot.errors.PluginError` to indicate any issue.
        :returns: The user's response (guaranteed to exist).
        :rtype: str
        """

        def __validator(filename):
            if not filename:
                raise errors.PluginError('Please enter a valid path to your {0}.'.format(label))

            filename = os.path.expanduser(filename)

            validate_file(filename)

            if validator:
                validator(filename)

        code, response = ops.validated_directory(
            __validator,
            'Input the path to your {0}'.format(label),
            force_interactive=True)

        if code == display_util.OK:
            return response
        else:
            raise errors.PluginError('{0} required to proceed.'.format(label))


class CredentialsConfiguration(object):
    """Represents a user-supplied filed which stores API credentials."""

    def __init__(self, filename, mapper=lambda x: x):
        """
        :param str filename: A path to the configuration file.
        :param callable mapper: A transformation to apply to configuration key names
        :raises errors.PluginError: If the file does not exist or is not a valid format.
        """
        validate_file_permissions(filename)

        try:
            self.confobj = configobj.ConfigObj(filename)
        except configobj.ConfigObjError as e:
            logger.debug("Error parsing credentials configuration: %s", e, exc_info=True)
            raise errors.PluginError("Error parsing credentials configuration: {0}".format(e))

        self.mapper = mapper

    def require(self, required_variables):
        """Ensures that the supplied set of variables are all present in the file.

        :param dict required_variables: Map of variable which must be present to error to display.
        :raises errors.PluginError: If one or more are missing.
        """
        messages = []

        for var in required_variables:
            if not self._has(var):
                messages.append('Property "{0}" not found (should be {1}).'
                                .format(self.mapper(var), required_variables[var]))
            elif not self._get(var):
                messages.append('Property "{0}" not set (should be {1}).'
                                .format(self.mapper(var), required_variables[var]))

        if messages:
            raise errors.PluginError(
                'Missing {0} in credentials configuration file {1}:\n * {2}'.format(
                        'property' if len(messages) == 1 else 'properties',
                        self.confobj.filename,
                        '\n * '.join(messages)
                    )
            )

    def conf(self, var):
        """Find a configuration value for variable `var`, as transformed by `mapper`.

        :param str var: The variable to get.
        :returns: The value of the variable.
        :rtype: str
        """

        return self._get(var)

    def _has(self, var):
        return self.mapper(var) in self.confobj

    def _get(self, var):
        return self.confobj.get(self.mapper(var))


def validate_file(filename):
    """Ensure that the specified file exists."""

    if not os.path.exists(filename):
        raise errors.PluginError('File not found: {0}'.format(filename))

    if os.path.isdir(filename):
        raise errors.PluginError('Path is a directory: {0}'.format(filename))


def validate_file_permissions(filename):
    """Ensure that the specified file exists and warn about unsafe permissions."""

    validate_file(filename)

    if filesystem.has_world_permissions(filename):
        logger.warning('Unsafe permissions on credentials configuration file: %s', filename)


def base_domain_name_guesses(domain):
    """Return a list of progressively less-specific domain names.

    One of these will probably be the domain name known to the DNS provider.

    :Example:

    >>> base_domain_name_guesses('foo.bar.baz.example.com')
    ['foo.bar.baz.example.com', 'bar.baz.example.com', 'baz.example.com', 'example.com', 'com']

    :param str domain: The domain for which to return guesses.
    :returns: The a list of less specific domain names.
    :rtype: list
    """

    fragments = domain.split('.')
    return ['.'.join(fragments[i:]) for i in range(0, len(fragments))]