Welcome to mirror list, hosted at ThFree Co, Russian Federation.

http_01_test.py « tests « certbot-apache - github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 643a6bdd53018f8746683a07391af331548a235b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
"""Test for certbot_apache._internal.http_01."""
import unittest

import mock

from acme import challenges
from acme.magic_typing import List  # pylint: disable=unused-import, no-name-in-module
from certbot import achallenges
from certbot import errors
from certbot.compat import filesystem
from certbot.compat import os
from certbot.tests import acme_util
from certbot_apache._internal.parser import get_aug_path
import util

NUM_ACHALLS = 3


class ApacheHttp01Test(util.ApacheTest):
    """Test for certbot_apache._internal.http_01.ApacheHttp01."""

    def setUp(self, *args, **kwargs):  # pylint: disable=arguments-differ
        super(ApacheHttp01Test, self).setUp(*args, **kwargs)

        self.account_key = self.rsa512jwk
        self.achalls = []  # type: List[achallenges.KeyAuthorizationAnnotatedChallenge]
        vh_truth = util.get_vh_truth(
            self.temp_dir, "debian_apache_2_4/multiple_vhosts")
        # Takes the vhosts for encryption-example.demo, certbot.demo
        # and vhost.in.rootconf
        self.vhosts = [vh_truth[0], vh_truth[3], vh_truth[10]]

        for i in range(NUM_ACHALLS):
            self.achalls.append(
                achallenges.KeyAuthorizationAnnotatedChallenge(
                    challb=acme_util.chall_to_challb(
                        challenges.HTTP01(token=((chr(ord('a') + i).encode() * 16))),
                        "pending"),
                    domain=self.vhosts[i].name, account_key=self.account_key))

        modules = ["ssl", "rewrite", "authz_core", "authz_host"]
        for mod in modules:
            self.config.parser.modules.add("mod_{0}.c".format(mod))
            self.config.parser.modules.add(mod + "_module")

        from certbot_apache._internal.http_01 import ApacheHttp01
        self.http = ApacheHttp01(self.config)

    def test_empty_perform(self):
        self.assertFalse(self.http.perform())

    @mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.enable_mod")
    def test_enable_modules_apache_2_2(self, mock_enmod):
        self.config.version = (2, 2)
        self.config.parser.modules.remove("authz_host_module")
        self.config.parser.modules.remove("mod_authz_host.c")

        enmod_calls = self.common_enable_modules_test(mock_enmod)
        self.assertEqual(enmod_calls[0][0][0], "authz_host")

    @mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.enable_mod")
    def test_enable_modules_apache_2_4(self, mock_enmod):
        self.config.parser.modules.remove("authz_core_module")
        self.config.parser.modules.remove("mod_authz_core.c")

        enmod_calls = self.common_enable_modules_test(mock_enmod)
        self.assertEqual(enmod_calls[0][0][0], "authz_core")

    def common_enable_modules_test(self, mock_enmod):
        """Tests enabling mod_rewrite and other modules."""
        self.config.parser.modules.remove("rewrite_module")
        self.config.parser.modules.remove("mod_rewrite.c")

        self.http.prepare_http01_modules()

        self.assertTrue(mock_enmod.called)
        calls = mock_enmod.call_args_list
        other_calls = []
        for call in calls:
            if call[0][0] != "rewrite":
                other_calls.append(call)

        # If these lists are equal, we never enabled mod_rewrite
        self.assertNotEqual(calls, other_calls)
        return other_calls

    def test_same_vhost(self):
        vhost = next(v for v in self.config.vhosts if v.name == "certbot.demo")
        achalls = [
            achallenges.KeyAuthorizationAnnotatedChallenge(
                challb=acme_util.chall_to_challb(
                    challenges.HTTP01(token=((b'a' * 16))),
                    "pending"),
                domain=vhost.name, account_key=self.account_key),
            achallenges.KeyAuthorizationAnnotatedChallenge(
                challb=acme_util.chall_to_challb(
                    challenges.HTTP01(token=((b'b' * 16))),
                    "pending"),
                domain=next(iter(vhost.aliases)), account_key=self.account_key)
        ]
        self.common_perform_test(achalls, [vhost])

    def test_anonymous_vhost(self):
        vhosts = [v for v in self.config.vhosts if not v.ssl]
        achalls = [
            achallenges.KeyAuthorizationAnnotatedChallenge(
                challb=acme_util.chall_to_challb(
                    challenges.HTTP01(token=((b'a' * 16))),
                    "pending"),
                domain="something.nonexistent", account_key=self.account_key)]
        self.common_perform_test(achalls, vhosts)

    def test_configure_multiple_vhosts(self):
        vhosts = [v for v in self.config.vhosts if "duplicate.example.com" in v.get_names()]
        self.assertEqual(len(vhosts), 2)
        achalls = [
            achallenges.KeyAuthorizationAnnotatedChallenge(
                challb=acme_util.chall_to_challb(
                    challenges.HTTP01(token=((b'a' * 16))),
                    "pending"),
                domain="duplicate.example.com", account_key=self.account_key)]
        self.common_perform_test(achalls, vhosts)

    def test_no_vhost(self):
        for achall in self.achalls:
            self.http.add_chall(achall)
        self.config.config.http01_port = 12345
        self.assertRaises(errors.PluginError, self.http.perform)

    def test_perform_1_achall_apache_2_2(self):
        self.combinations_perform_test(num_achalls=1, minor_version=2)

    def test_perform_1_achall_apache_2_4(self):
        self.combinations_perform_test(num_achalls=1, minor_version=4)

    def test_perform_2_achall_apache_2_2(self):
        self.combinations_perform_test(num_achalls=2, minor_version=2)

    def test_perform_2_achall_apache_2_4(self):
        self.combinations_perform_test(num_achalls=2, minor_version=4)

    def test_perform_3_achall_apache_2_2(self):
        self.combinations_perform_test(num_achalls=3, minor_version=2)

    def test_perform_3_achall_apache_2_4(self):
        self.combinations_perform_test(num_achalls=3, minor_version=4)

    def test_activate_disabled_vhost(self):
        vhosts = [v for v in self.config.vhosts if v.name == "certbot.demo"]
        achalls = [
            achallenges.KeyAuthorizationAnnotatedChallenge(
                challb=acme_util.chall_to_challb(
                    challenges.HTTP01(token=((b'a' * 16))),
                    "pending"),
                domain="certbot.demo", account_key=self.account_key)]
        vhosts[0].enabled = False
        self.common_perform_test(achalls, vhosts)
        matches = self.config.parser.find_dir(
            "Include", vhosts[0].filep,
            get_aug_path(self.config.parser.loc["default"]))
        self.assertEqual(len(matches), 1)

    def combinations_perform_test(self, num_achalls, minor_version):
        """Test perform with the given achall count and Apache version."""
        achalls = self.achalls[:num_achalls]
        vhosts = self.vhosts[:num_achalls]
        self.config.version = (2, minor_version)
        self.common_perform_test(achalls, vhosts)

    def common_perform_test(self, achalls, vhosts):
        """Tests perform with the given achalls."""
        challenge_dir = self.http.challenge_dir
        self.assertFalse(os.path.exists(challenge_dir))
        for achall in achalls:
            self.http.add_chall(achall)

        expected_response = [
            achall.response(self.account_key) for achall in achalls]
        self.assertEqual(self.http.perform(), expected_response)

        self.assertTrue(os.path.isdir(self.http.challenge_dir))
        self.assertTrue(filesystem.has_min_permissions(self.http.challenge_dir, 0o755))
        self._test_challenge_conf()

        for achall in achalls:
            self._test_challenge_file(achall)

        for vhost in vhosts:
            matches = self.config.parser.find_dir("Include",
                                                self.http.challenge_conf_pre,
                                                vhost.path)
            self.assertEqual(len(matches), 1)
            matches = self.config.parser.find_dir("Include",
                                                self.http.challenge_conf_post,
                                                vhost.path)
            self.assertEqual(len(matches), 1)

        self.assertTrue(os.path.exists(challenge_dir))

    def _test_challenge_conf(self):
        with open(self.http.challenge_conf_pre) as f:
            pre_conf_contents = f.read()

        with open(self.http.challenge_conf_post) as f:
            post_conf_contents = f.read()

        self.assertTrue("RewriteEngine on" in pre_conf_contents)
        self.assertTrue("RewriteRule" in pre_conf_contents)

        self.assertTrue(self.http.challenge_dir in post_conf_contents)
        if self.config.version < (2, 4):
            self.assertTrue("Allow from all" in post_conf_contents)
        else:
            self.assertTrue("Require all granted" in post_conf_contents)

    def _test_challenge_file(self, achall):
        name = os.path.join(self.http.challenge_dir, achall.chall.encode("token"))
        validation = achall.validation(self.account_key)

        self.assertTrue(filesystem.has_min_permissions(name, 0o644))
        with open(name, 'rb') as f:
            self.assertEqual(f.read(), validation.encode())


if __name__ == "__main__":
    unittest.main()  # pragma: no cover