Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dns_ovh.py « _internal « certbot_dns_ovh « certbot-dns-ovh - github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: a495983f2d4219606bf307ab448f7ef007bacc17 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""DNS Authenticator for OVH DNS."""
import logging

from lexicon.providers import ovh
import zope.interface

from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins import dns_common_lexicon

logger = logging.getLogger(__name__)

TOKEN_URL = 'https://eu.api.ovh.com/createToken/ or https://ca.api.ovh.com/createToken/'


@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
    """DNS Authenticator for OVH

    This Authenticator uses the OVH API to fulfill a dns-01 challenge.
    """

    description = 'Obtain certificates using a DNS TXT record (if you are using OVH for DNS).'
    ttl = 60

    def __init__(self, *args, **kwargs):
        super(Authenticator, self).__init__(*args, **kwargs)
        self.credentials = None

    @classmethod
    def add_parser_arguments(cls, add):  # pylint: disable=arguments-differ
        super(Authenticator, cls).add_parser_arguments(add, default_propagation_seconds=30)
        add('credentials', help='OVH credentials INI file.')

    def more_info(self):  # pylint: disable=missing-docstring,no-self-use
        return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
               'the OVH API.'

    def _setup_credentials(self):
        self.credentials = self._configure_credentials(
            'credentials',
            'OVH credentials INI file',
            {
                'endpoint': 'OVH API endpoint (ovh-eu or ovh-ca)',
                'application-key': 'Application key for OVH API, obtained from {0}'
                .format(TOKEN_URL),
                'application-secret': 'Application secret for OVH API, obtained from {0}'
                .format(TOKEN_URL),
                'consumer-key': 'Consumer key for OVH API, obtained from {0}'
                .format(TOKEN_URL),
            }
        )

    def _perform(self, domain, validation_name, validation):
        self._get_ovh_client().add_txt_record(domain, validation_name, validation)

    def _cleanup(self, domain, validation_name, validation):
        self._get_ovh_client().del_txt_record(domain, validation_name, validation)

    def _get_ovh_client(self):
        return _OVHLexiconClient(
            self.credentials.conf('endpoint'),
            self.credentials.conf('application-key'),
            self.credentials.conf('application-secret'),
            self.credentials.conf('consumer-key'),
            self.ttl
        )


class _OVHLexiconClient(dns_common_lexicon.LexiconClient):
    """
    Encapsulates all communication with the OVH API via Lexicon.
    """

    def __init__(self, endpoint, application_key, application_secret, consumer_key, ttl):
        super(_OVHLexiconClient, self).__init__()

        config = dns_common_lexicon.build_lexicon_config('ovh', {
            'ttl': ttl,
        }, {
            'auth_entrypoint': endpoint,
            'auth_application_key': application_key,
            'auth_application_secret': application_secret,
            'auth_consumer_key': consumer_key,
        })

        self.provider = ovh.Provider(config)

    def _handle_http_error(self, e, domain_name):
        hint = None
        if str(e).startswith('400 Client Error:'):
            hint = 'Is your Application Secret value correct?'
        if str(e).startswith('403 Client Error:'):
            hint = 'Are your Application Key and Consumer Key values correct?'

        return errors.PluginError('Error determining zone identifier for {0}: {1}.{2}'
                                  .format(domain_name, e, ' ({0})'.format(hint) if hint else ''))

    def _handle_general_error(self, e, domain_name):
        if domain_name in str(e) and str(e).endswith('not found'):
            return

        super(_OVHLexiconClient, self)._handle_general_error(e, domain_name)