Welcome to mirror list, hosted at ThFree Co, Russian Federation.

util.py « tests « certbot - github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 698962516e1bd8a03fb8832bca61a4f8867bb321 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
"""Test utilities.

.. warning:: This module is not part of the public API.

"""
import multiprocessing
import os
import pkg_resources
import shutil
import tempfile
import unittest

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import mock
import OpenSSL
from six.moves import reload_module  # pylint: disable=import-error

from acme import jose

from certbot import constants
from certbot import interfaces
from certbot import storage
from certbot import util
from certbot import configuration

from certbot.display import util as display_util


def vector_path(*names):
    """Path to a test vector."""
    return pkg_resources.resource_filename(
        __name__, os.path.join('testdata', *names))


def load_vector(*names):
    """Load contents of a test vector."""
    # luckily, resource_string opens file in binary mode
    return pkg_resources.resource_string(
        __name__, os.path.join('testdata', *names))


def _guess_loader(filename, loader_pem, loader_der):
    _, ext = os.path.splitext(filename)
    if ext.lower() == '.pem':
        return loader_pem
    elif ext.lower() == '.der':
        return loader_der
    else:  # pragma: no cover
        raise ValueError("Loader could not be recognized based on extension")


def load_cert(*names):
    """Load certificate."""
    loader = _guess_loader(
        names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
    return OpenSSL.crypto.load_certificate(loader, load_vector(*names))


def load_comparable_cert(*names):
    """Load ComparableX509 cert."""
    return jose.ComparableX509(load_cert(*names))


def load_csr(*names):
    """Load certificate request."""
    loader = _guess_loader(
        names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
    return OpenSSL.crypto.load_certificate_request(loader, load_vector(*names))


def load_comparable_csr(*names):
    """Load ComparableX509 certificate request."""
    return jose.ComparableX509(load_csr(*names))


def load_rsa_private_key(*names):
    """Load RSA private key."""
    loader = _guess_loader(names[-1], serialization.load_pem_private_key,
                           serialization.load_der_private_key)
    return jose.ComparableRSAKey(loader(
        load_vector(*names), password=None, backend=default_backend()))


def load_pyopenssl_private_key(*names):
    """Load pyOpenSSL private key."""
    loader = _guess_loader(
        names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
    return OpenSSL.crypto.load_privatekey(loader, load_vector(*names))


def skip_unless(condition, reason):  # pragma: no cover
    """Skip tests unless a condition holds.

    This implements the basic functionality of unittest.skipUnless
    which is only available on Python 2.7+.

    :param bool condition: If ``False``, the test will be skipped
    :param str reason: the reason for skipping the test

    :rtype: callable
    :returns: decorator that hides tests unless condition is ``True``

    """
    if hasattr(unittest, "skipUnless"):
        return unittest.skipUnless(condition, reason)
    elif condition:
        return lambda cls: cls
    else:
        return lambda cls: None


def make_lineage(config_dir, testfile):
    """Creates a lineage defined by testfile.

    This creates the archive, live, and renewal directories if
    necessary and creates a simple lineage.

    :param str config_dir: path to the configuration directory
    :param str testfile: configuration file to base the lineage on

    :returns: path to the renewal conf file for the created lineage
    :rtype: str

    """
    lineage_name = testfile[:-len('.conf')]

    conf_dir = os.path.join(
        config_dir, constants.RENEWAL_CONFIGS_DIR)
    archive_dir = os.path.join(
        config_dir, constants.ARCHIVE_DIR, lineage_name)
    live_dir = os.path.join(
        config_dir, constants.LIVE_DIR, lineage_name)

    for directory in (archive_dir, conf_dir, live_dir,):
        if not os.path.exists(directory):
            os.makedirs(directory)

    sample_archive = vector_path('sample-archive')
    for kind in os.listdir(sample_archive):
        shutil.copyfile(os.path.join(sample_archive, kind),
                        os.path.join(archive_dir, kind))

    for kind in storage.ALL_FOUR:
        os.symlink(os.path.join(archive_dir, '{0}1.pem'.format(kind)),
                   os.path.join(live_dir, '{0}.pem'.format(kind)))

    conf_path = os.path.join(config_dir, conf_dir, testfile)
    with open(vector_path(testfile)) as src:
        with open(conf_path, 'w') as dst:
            dst.writelines(
                line.replace('MAGICDIR', config_dir) for line in src)

    return conf_path


def patch_get_utility(target='zope.component.getUtility'):
    """Patch zope.component.getUtility to use a special mock IDisplay.

    The mock IDisplay works like a regular mock object, except it also
    also asserts that methods are called with valid arguments.

    :param str target: path to patch

    :returns: mock zope.component.getUtility
    :rtype: mock.MagicMock

    """
    return mock.patch(target, new_callable=_create_get_utility_mock)


class FreezableMock(object):
    """Mock object with the ability to freeze attributes.

    This class works like a regular mock.MagicMock object, except
    attributes and behavior set before the object is frozen cannot
    be changed during tests.

    If a func argument is provided to the constructor, this function
    is called first when an instance of FreezableMock is called,
    followed by the usual behavior defined by MagicMock. The return
    value of func is ignored.

    """
    def __init__(self, frozen=False, func=None, return_value=mock.sentinel.DEFAULT):
        self._frozen_set = set() if frozen else set(('freeze',))
        self._func = func
        self._mock = mock.MagicMock()
        if return_value != mock.sentinel.DEFAULT:
            self.return_value = return_value
        self._frozen = frozen

    def freeze(self):
        """Freeze object preventing further changes."""
        self._frozen = True

    def __call__(self, *args, **kwargs):
        if self._func is not None:
            self._func(*args, **kwargs)
        return self._mock(*args, **kwargs)

    def __getattribute__(self, name):
        if name == '_frozen':
            try:
                return object.__getattribute__(self, name)
            except AttributeError:
                return False
        elif name in ('return_value', 'side_effect',):
            return getattr(object.__getattribute__(self, '_mock'), name)
        elif name == '_frozen_set' or name in self._frozen_set:
            return object.__getattribute__(self, name)
        else:
            return getattr(object.__getattribute__(self, '_mock'), name)

    def __setattr__(self, name, value):
        """ Before it is frozen, attributes are set on the FreezableMock
        instance and added to the _frozen_set. Attributes in the _frozen_set
        cannot be changed after the FreezableMock is frozen. In this case,
        they are set on the underlying _mock.

        In cases of return_value and side_effect, these attributes are always
        passed through to the instance's _mock and added to the _frozen_set
        before the object is frozen.

        """
        if self._frozen:
            if name in self._frozen_set:
                raise AttributeError('Cannot change frozen attribute ' + name)
            else:
                return setattr(self._mock, name, value)

        if name != '_frozen_set':
            self._frozen_set.add(name)

        if name in ('return_value', 'side_effect'):
            return setattr(self._mock, name, value)

        else:
            return object.__setattr__(self, name, value)


def _create_get_utility_mock():
    display = FreezableMock()
    for name in interfaces.IDisplay.names():  # pylint: disable=no-member
        if name != 'notification':
            frozen_mock = FreezableMock(frozen=True, func=_assert_valid_call)
            setattr(display, name, frozen_mock)
    display.freeze()
    return FreezableMock(frozen=True, return_value=display)


def _assert_valid_call(*args, **kwargs):
    assert_args = [args[0] if args else kwargs['message']]

    assert_kwargs = {}
    assert_kwargs['default'] = kwargs.get('default', None)
    assert_kwargs['cli_flag'] = kwargs.get('cli_flag', None)
    assert_kwargs['force_interactive'] = kwargs.get('force_interactive', False)

    # pylint: disable=star-args
    display_util.assert_valid_call(*assert_args, **assert_kwargs)


class TempDirTestCase(unittest.TestCase):
    """Base test class which sets up and tears down a temporary directory"""

    def setUp(self):
        self.tempdir = tempfile.mkdtemp()

    def tearDown(self):
        shutil.rmtree(self.tempdir)

class ConfigTestCase(TempDirTestCase):
    """Test class which sets up a NamespaceConfig object.

    """
    def setUp(self):
        super(ConfigTestCase, self).setUp()
        self.config = configuration.NamespaceConfig(
            mock.MagicMock(
                config_dir=os.path.join(self.tempdir, 'config'),
                work_dir=os.path.join(self.tempdir, 'work'),
                logs_dir=os.path.join(self.tempdir, 'logs'),
                server="example.com",
            )
        )

def lock_and_call(func, lock_path):
    """Grab a lock for lock_path and call func.

    :param callable func: object to call after acquiring the lock
    :param str lock_path: path to file or directory to lock

    """
    # Reload module to reset internal _LOCKS dictionary
    reload_module(util)

    # start child and wait for it to grab the lock
    cv = multiprocessing.Condition()
    cv.acquire()
    child_args = (cv, lock_path,)
    child = multiprocessing.Process(target=hold_lock, args=child_args)
    child.start()
    cv.wait()

    # call func and terminate the child
    func()
    cv.notify()
    cv.release()
    child.join()
    assert child.exitcode == 0


def hold_lock(cv, lock_path):  # pragma: no cover
    """Acquire a file lock at lock_path and wait to release it.

    :param multiprocessing.Condition cv: condition for synchronization
    :param str lock_path: path to the file lock

    """
    from certbot import lock
    if os.path.isdir(lock_path):
        my_lock = lock.lock_dir(lock_path)
    else:
        my_lock = lock.LockFile(lock_path)
    cv.acquire()
    cv.notify()
    cv.wait()
    my_lock.release()