Welcome to mirror list, hosted at ThFree Co, Russian Federation.

misc.py « utils « certbot_integration_tests « certbot-ci - github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: b08f11e89917cd621d334d734cd4cccb27ef9f82 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
"""
Misc module contains stateless functions that could be used during pytest execution,
or outside during setup/teardown of the integration tests environment.
"""
import contextlib
import errno
import multiprocessing
import os
import re
import shutil
import stat
import sys
import tempfile
import time
import warnings

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.primitives.serialization import PrivateFormat
from OpenSSL import crypto
import pkg_resources
import requests
from six.moves import SimpleHTTPServer
from six.moves import socketserver

RSA_KEY_TYPE = 'rsa'
ECDSA_KEY_TYPE = 'ecdsa'


def check_until_timeout(url, attempts=30):
    """
    Wait and block until given url responds with status 200, or raise an exception
    after the specified number of attempts.
    :param str url: the URL to test
    :param int attempts: the number of times to try to connect to the URL
    :raise ValueError: exception raised if unable to reach the URL
    """
    try:
        import urllib3
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    except ImportError:
        # Handle old versions of request with vendorized urllib3
        from requests.packages.urllib3.exceptions import InsecureRequestWarning
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    for _ in range(attempts):
        time.sleep(1)
        try:
            if requests.get(url, verify=False).status_code == 200:
                return
        except requests.exceptions.ConnectionError:
            pass

    raise ValueError('Error, url did not respond after {0} attempts: {1}'.format(attempts, url))


class GracefulTCPServer(socketserver.TCPServer):
    """
    This subclass of TCPServer allows graceful reuse of an address that has
    just been released by another instance of TCPServer.
    """
    allow_reuse_address = True


def _run_server(port):
    GracefulTCPServer(('', port), SimpleHTTPServer.SimpleHTTPRequestHandler).serve_forever()


@contextlib.contextmanager
def create_http_server(port):
    """
    Setup and start an HTTP server for the given TCP port.
    This server stays active for the lifetime of the context, and is automatically
    stopped with context exit, while its temporary webroot is deleted.
    :param int port: the TCP port to use
    :return str: the temporary webroot attached to this server
    """
    current_cwd = os.getcwd()
    webroot = tempfile.mkdtemp()

    process = multiprocessing.Process(target=_run_server, args=(port,))

    try:
        # SimpleHTTPServer is designed to serve files from the current working directory at the
        # time it starts. So we temporarily change the cwd to our crafted webroot before launch.
        try:
            os.chdir(webroot)
            process.start()
        finally:
            os.chdir(current_cwd)

        check_until_timeout('http://localhost:{0}/'.format(port))

        yield webroot
    finally:
        try:
            if process.is_alive():
                process.terminate()
                process.join()  # Block until process is effectively terminated
        finally:
            shutil.rmtree(webroot)


def list_renewal_hooks_dirs(config_dir):
    """
    Find and return paths of all hook directories for the given certbot config directory
    :param str config_dir: path to the certbot config directory
    :return str[]: list of path to the standard hooks directory for this certbot instance
    """
    renewal_hooks_root = os.path.join(config_dir, 'renewal-hooks')
    return [os.path.join(renewal_hooks_root, item) for item in ['pre', 'deploy', 'post']]


def generate_test_file_hooks(config_dir, hook_probe):
    """
    Create a suite of certbot hook scripts and put them in the relevant hook directory
    for the given certbot configuration directory. These scripts, when executed, will write
    specific verbs in the given hook_probe file to allow asserting they have effectively
    been executed. The deploy hook also checks that the renewal environment variables are set.
    :param str config_dir: current certbot config directory
    :param hook_probe: path to the hook probe to test hook scripts execution
    """
    hook_path = pkg_resources.resource_filename('certbot_integration_tests', 'assets/hook.py')

    for hook_dir in list_renewal_hooks_dirs(config_dir):
        # We want an equivalent of bash `chmod -p $HOOK_DIR, that does not fail if one folder of
        # the hierarchy already exists. It is not the case of os.makedirs. Python 3 has an
        # optional parameter `exists_ok` to not fail on existing dir, but Python 2.7 does not.
        # So we pass through a try except pass for it. To be removed with dropped support on py27.
        try:
            os.makedirs(hook_dir)
        except OSError as error:
            if error.errno != errno.EEXIST:
                raise

        if os.name != 'nt':
            entrypoint_script_path = os.path.join(hook_dir, 'entrypoint.sh')
            entrypoint_script = '''\
#!/usr/bin/env bash
set -e
"{0}" "{1}" "{2}" "{3}"
'''.format(sys.executable, hook_path, entrypoint_script_path, hook_probe)
        else:
            entrypoint_script_path = os.path.join(hook_dir, 'entrypoint.bat')
            entrypoint_script = '''\
@echo off
"{0}" "{1}" "{2}" "{3}"
            '''.format(sys.executable, hook_path, entrypoint_script_path, hook_probe)

        with open(entrypoint_script_path, 'w') as file_h:
            file_h.write(entrypoint_script)

        os.chmod(entrypoint_script_path, os.stat(entrypoint_script_path).st_mode | stat.S_IEXEC)


@contextlib.contextmanager
def manual_http_hooks(http_server_root, http_port):
    """
    Generate suitable http-01 hooks command for test purpose in the given HTTP
    server webroot directory. These hooks command use temporary python scripts
    that are deleted upon context exit.
    :param str http_server_root: path to the HTTP server configured to serve http-01 challenges
    :param int http_port: HTTP port that the HTTP server listen on
    :return (str, str): a tuple containing the authentication hook and cleanup hook commands
    """
    tempdir = tempfile.mkdtemp()
    try:
        auth_script_path = os.path.join(tempdir, 'auth.py')
        with open(auth_script_path, 'w') as file_h:
            file_h.write('''\
#!/usr/bin/env python
import os
import requests
import time
import sys
challenge_dir = os.path.join('{0}', '.well-known', 'acme-challenge')
os.makedirs(challenge_dir)
challenge_file = os.path.join(challenge_dir, os.environ.get('CERTBOT_TOKEN'))
with open(challenge_file, 'w') as file_h:
    file_h.write(os.environ.get('CERTBOT_VALIDATION'))
url = 'http://localhost:{1}/.well-known/acme-challenge/' + os.environ.get('CERTBOT_TOKEN')
for _ in range(0, 10):
    time.sleep(1)
    try:
        if request.get(url).status_code == 200:
            sys.exit(0)
    except requests.exceptions.ConnectionError:
        pass
raise ValueError('Error, url did not respond after 10 attempts: {{0}}'.format(url))
'''.format(http_server_root.replace('\\', '\\\\'), http_port))
        os.chmod(auth_script_path, 0o755)

        cleanup_script_path = os.path.join(tempdir, 'cleanup.py')
        with open(cleanup_script_path, 'w') as file_h:
            file_h.write('''\
#!/usr/bin/env python
import os
import shutil
well_known = os.path.join('{0}', '.well-known')
shutil.rmtree(well_known)
'''.format(http_server_root.replace('\\', '\\\\')))
        os.chmod(cleanup_script_path, 0o755)

        yield ('{0} {1}'.format(sys.executable, auth_script_path),
               '{0} {1}'.format(sys.executable, cleanup_script_path))
    finally:
        shutil.rmtree(tempdir)


def generate_csr(domains, key_path, csr_path, key_type=RSA_KEY_TYPE):
    """
    Generate a private key, and a CSR for the given domains using this key.
    :param domains: the domain names to include in the CSR
    :type domains: `list` of `str`
    :param str key_path: path to the private key that will be generated
    :param str csr_path: path to the CSR that will be generated
    :param str key_type: type of the key (misc.RSA_KEY_TYPE or misc.ECDSA_KEY_TYPE)
    """
    if key_type == RSA_KEY_TYPE:
        key = crypto.PKey()
        key.generate_key(crypto.TYPE_RSA, 2048)
    elif key_type == ECDSA_KEY_TYPE:
        with warnings.catch_warnings():
            # Ignore a warning on some old versions of cryptography
            warnings.simplefilter('ignore', category=PendingDeprecationWarning)
            key = ec.generate_private_key(ec.SECP384R1(), default_backend())
        key = key.private_bytes(encoding=Encoding.PEM, format=PrivateFormat.TraditionalOpenSSL,
                                encryption_algorithm=NoEncryption())
        key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
    else:
        raise ValueError('Invalid key type: {0}'.format(key_type))

    with open(key_path, 'wb') as file_h:
        file_h.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key))

    req = crypto.X509Req()
    san = ', '.join(['DNS:{0}'.format(item) for item in domains])
    san_constraint = crypto.X509Extension(b'subjectAltName', False, san.encode('utf-8'))
    req.add_extensions([san_constraint])

    req.set_pubkey(key)
    req.set_version(2)
    req.sign(key, 'sha256')

    with open(csr_path, 'wb') as file_h:
        file_h.write(crypto.dump_certificate_request(crypto.FILETYPE_ASN1, req))


def read_certificate(cert_path):
    """
    Load the certificate from the provided path, and return a human readable version of it (TEXT mode).
    :param str cert_path: the path to the certificate
    :returns: the TEXT version of the certificate, as it would be displayed by openssl binary
    """
    with open(cert_path, 'rb') as file:
        data = file.read()

    cert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
    return crypto.dump_certificate(crypto.FILETYPE_TEXT, cert).decode('utf-8')


def load_sample_data_path(workspace):
    """
    Load the certbot configuration example designed to make OCSP tests, and return its path
    :param str workspace: current test workspace directory path
    :returns: the path to the loaded sample data directory
    :rtype: str
    """
    original = pkg_resources.resource_filename('certbot_integration_tests', 'assets/sample-config')
    copied = os.path.join(workspace, 'sample-config')
    shutil.copytree(original, copied, symlinks=True)

    if os.name == 'nt':
        # Fix the symlinks on Windows since GIT is not creating them upon checkout
        for lineage in ['a.encryption-example.com', 'b.encryption-example.com']:
            current_live = os.path.join(copied, 'live', lineage)
            for name in os.listdir(current_live):
                if name != 'README':
                    current_file = os.path.join(current_live, name)
                    with open(current_file) as file_h:
                        src = file_h.read()
                    os.unlink(current_file)
                    os.symlink(os.path.join(current_live, src), current_file)

    return copied


def echo(keyword, path=None):
    """
    Generate a platform independent executable command
    that echoes the given keyword into the given file.
    :param keyword: the keyword to echo (must be a single keyword)
    :param path: path to the file were keyword is echoed
    :return: the executable command
    """
    if not re.match(r'^\w+$', keyword):
        raise ValueError('Error, keyword `{0}` is not a single keyword.'
                         .format(keyword))
    return '{0} -c "from __future__ import print_function; print(\'{1}\')"{2}'.format(
        os.path.basename(sys.executable), keyword, ' >> "{0}"'.format(path) if path else '')