1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
|
"""Tests for certbot_dns_digitalocean._internal.dns_digitalocean."""
import unittest
import digitalocean
import mock
from certbot import errors
from certbot.compat import os
from certbot.plugins import dns_test_common
from certbot.plugins.dns_test_common import DOMAIN
from certbot.tests import util as test_util
API_ERROR = digitalocean.DataReadError()
TOKEN = 'a-token'
class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest):
def setUp(self):
from certbot_dns_digitalocean._internal.dns_digitalocean import Authenticator
super(AuthenticatorTest, self).setUp()
path = os.path.join(self.tempdir, 'file.ini')
dns_test_common.write({"digitalocean_token": TOKEN}, path)
self.config = mock.MagicMock(digitalocean_credentials=path,
digitalocean_propagation_seconds=0) # don't wait during tests
self.auth = Authenticator(self.config, "digitalocean")
self.mock_client = mock.MagicMock()
# _get_digitalocean_client | pylint: disable=protected-access
self.auth._get_digitalocean_client = mock.MagicMock(return_value=self.mock_client)
def test_perform(self):
self.auth.perform([self.achall])
expected = [mock.call.add_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY)]
self.assertEqual(expected, self.mock_client.mock_calls)
def test_cleanup(self):
# _attempt_cleanup | pylint: disable=protected-access
self.auth._attempt_cleanup = True
self.auth.cleanup([self.achall])
expected = [mock.call.del_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY)]
self.assertEqual(expected, self.mock_client.mock_calls)
class DigitalOceanClientTest(unittest.TestCase):
id_num = 1
record_prefix = "_acme-challenge"
record_name = record_prefix + "." + DOMAIN
record_content = "bar"
def setUp(self):
from certbot_dns_digitalocean._internal.dns_digitalocean import _DigitalOceanClient
self.digitalocean_client = _DigitalOceanClient(TOKEN)
self.manager = mock.MagicMock()
self.digitalocean_client.manager = self.manager
def test_add_txt_record(self):
wrong_domain_mock = mock.MagicMock()
wrong_domain_mock.name = "other.invalid"
wrong_domain_mock.create_new_domain_record.side_effect = AssertionError('Wrong Domain')
domain_mock = mock.MagicMock()
domain_mock.name = DOMAIN
domain_mock.create_new_domain_record.return_value = {'domain_record': {'id': self.id_num}}
self.manager.get_all_domains.return_value = [wrong_domain_mock, domain_mock]
self.digitalocean_client.add_txt_record(DOMAIN, self.record_name, self.record_content)
domain_mock.create_new_domain_record.assert_called_with(type='TXT',
name=self.record_prefix,
data=self.record_content)
def test_add_txt_record_fail_to_find_domain(self):
self.manager.get_all_domains.return_value = []
self.assertRaises(errors.PluginError,
self.digitalocean_client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
def test_add_txt_record_error_finding_domain(self):
self.manager.get_all_domains.side_effect = API_ERROR
self.assertRaises(errors.PluginError,
self.digitalocean_client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
def test_add_txt_record_error_creating_record(self):
domain_mock = mock.MagicMock()
domain_mock.name = DOMAIN
domain_mock.create_new_domain_record.side_effect = API_ERROR
self.manager.get_all_domains.return_value = [domain_mock]
self.assertRaises(errors.PluginError,
self.digitalocean_client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
def test_del_txt_record(self):
first_record_mock = mock.MagicMock()
first_record_mock.type = 'TXT'
first_record_mock.name = "DIFFERENT"
first_record_mock.data = self.record_content
correct_record_mock = mock.MagicMock()
correct_record_mock.type = 'TXT'
correct_record_mock.name = self.record_prefix
correct_record_mock.data = self.record_content
last_record_mock = mock.MagicMock()
last_record_mock.type = 'TXT'
last_record_mock.name = self.record_prefix
last_record_mock.data = "DIFFERENT"
domain_mock = mock.MagicMock()
domain_mock.name = DOMAIN
domain_mock.get_records.return_value = [first_record_mock,
correct_record_mock,
last_record_mock]
self.manager.get_all_domains.return_value = [domain_mock]
self.digitalocean_client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.assertTrue(correct_record_mock.destroy.called)
self.assertFalse(first_record_mock.destroy.call_args_list)
self.assertFalse(last_record_mock.destroy.call_args_list)
def test_del_txt_record_error_finding_domain(self):
self.manager.get_all_domains.side_effect = API_ERROR
self.digitalocean_client.del_txt_record(DOMAIN, self.record_name, self.record_content)
def test_del_txt_record_error_finding_record(self):
domain_mock = mock.MagicMock()
domain_mock.name = DOMAIN
domain_mock.get_records.side_effect = API_ERROR
self.manager.get_all_domains.return_value = [domain_mock]
self.digitalocean_client.del_txt_record(DOMAIN, self.record_name, self.record_content)
def test_del_txt_record_error_deleting_record(self):
record_mock = mock.MagicMock()
record_mock.type = 'TXT'
record_mock.name = self.record_prefix
record_mock.data = self.record_content
record_mock.destroy.side_effect = API_ERROR
domain_mock = mock.MagicMock()
domain_mock.name = DOMAIN
domain_mock.get_records.return_value = [record_mock]
self.manager.get_all_domains.return_value = [domain_mock]
self.digitalocean_client.del_txt_record(DOMAIN, self.record_name, self.record_content)
if __name__ == "__main__":
unittest.main() # pragma: no cover
|