diff options
Diffstat (limited to 'certbot-dns-rfc2136/tests/dns_rfc2136_test.py')
-rw-r--r-- | certbot-dns-rfc2136/tests/dns_rfc2136_test.py | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/certbot-dns-rfc2136/tests/dns_rfc2136_test.py b/certbot-dns-rfc2136/tests/dns_rfc2136_test.py new file mode 100644 index 000000000..c767dba23 --- /dev/null +++ b/certbot-dns-rfc2136/tests/dns_rfc2136_test.py @@ -0,0 +1,212 @@ +"""Tests for certbot_dns_rfc2136._internal.dns_rfc2136.""" + +import unittest + +import dns.flags +import dns.rcode +import dns.tsig +import mock + +from certbot import errors +from certbot.compat import os +from certbot.plugins import dns_test_common +from certbot.plugins.dns_test_common import DOMAIN +from certbot.tests import util as test_util + +SERVER = '192.0.2.1' +PORT = 53 +NAME = 'a-tsig-key.' +SECRET = 'SSB3b25kZXIgd2hvIHdpbGwgYm90aGVyIHRvIGRlY29kZSB0aGlzIHRleHQK' +VALID_CONFIG = {"rfc2136_server": SERVER, "rfc2136_name": NAME, "rfc2136_secret": SECRET} + + +class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest): + + def setUp(self): + from certbot_dns_rfc2136._internal.dns_rfc2136 import Authenticator + + super(AuthenticatorTest, self).setUp() + + path = os.path.join(self.tempdir, 'file.ini') + dns_test_common.write(VALID_CONFIG, path) + + self.config = mock.MagicMock(rfc2136_credentials=path, + rfc2136_propagation_seconds=0) # don't wait during tests + + self.auth = Authenticator(self.config, "rfc2136") + + self.mock_client = mock.MagicMock() + # _get_rfc2136_client | pylint: disable=protected-access + self.auth._get_rfc2136_client = mock.MagicMock(return_value=self.mock_client) + + def test_perform(self): + self.auth.perform([self.achall]) + + expected = [mock.call.add_txt_record('_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)] + self.assertEqual(expected, self.mock_client.mock_calls) + + def test_cleanup(self): + # _attempt_cleanup | pylint: disable=protected-access + self.auth._attempt_cleanup = True + self.auth.cleanup([self.achall]) + + expected = [mock.call.del_txt_record('_acme-challenge.'+DOMAIN, mock.ANY)] + self.assertEqual(expected, self.mock_client.mock_calls) + + def test_invalid_algorithm_raises(self): + config = VALID_CONFIG.copy() + config["rfc2136_algorithm"] = "INVALID" + dns_test_common.write(config, self.config.rfc2136_credentials) + + self.assertRaises(errors.PluginError, + self.auth.perform, + [self.achall]) + + def test_valid_algorithm_passes(self): + config = VALID_CONFIG.copy() + config["rfc2136_algorithm"] = "HMAC-sha512" + dns_test_common.write(config, self.config.rfc2136_credentials) + + self.auth.perform([self.achall]) + + +class RFC2136ClientTest(unittest.TestCase): + + def setUp(self): + from certbot_dns_rfc2136._internal.dns_rfc2136 import _RFC2136Client + + self.rfc2136_client = _RFC2136Client(SERVER, PORT, NAME, SECRET, dns.tsig.HMAC_MD5) + + @mock.patch("dns.query.tcp") + def test_add_txt_record(self, query_mock): + query_mock.return_value.rcode.return_value = dns.rcode.NOERROR + # _find_domain | pylint: disable=protected-access + self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") + + self.rfc2136_client.add_txt_record("bar", "baz", 42) + + query_mock.assert_called_with(mock.ANY, SERVER, port=PORT) + self.assertTrue("bar. 42 IN TXT \"baz\"" in str(query_mock.call_args[0][0])) + + @mock.patch("dns.query.tcp") + def test_add_txt_record_wraps_errors(self, query_mock): + query_mock.side_effect = Exception + # _find_domain | pylint: disable=protected-access + self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") + + self.assertRaises( + errors.PluginError, + self.rfc2136_client.add_txt_record, + "bar", "baz", 42) + + @mock.patch("dns.query.tcp") + def test_add_txt_record_server_error(self, query_mock): + query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN + # _find_domain | pylint: disable=protected-access + self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") + + self.assertRaises( + errors.PluginError, + self.rfc2136_client.add_txt_record, + "bar", "baz", 42) + + @mock.patch("dns.query.tcp") + def test_del_txt_record(self, query_mock): + query_mock.return_value.rcode.return_value = dns.rcode.NOERROR + # _find_domain | pylint: disable=protected-access + self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") + + self.rfc2136_client.del_txt_record("bar", "baz") + + query_mock.assert_called_with(mock.ANY, SERVER, port=PORT) + self.assertTrue("bar. 0 NONE TXT \"baz\"" in str(query_mock.call_args[0][0])) + + @mock.patch("dns.query.tcp") + def test_del_txt_record_wraps_errors(self, query_mock): + query_mock.side_effect = Exception + # _find_domain | pylint: disable=protected-access + self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") + + self.assertRaises( + errors.PluginError, + self.rfc2136_client.del_txt_record, + "bar", "baz") + + @mock.patch("dns.query.tcp") + def test_del_txt_record_server_error(self, query_mock): + query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN + # _find_domain | pylint: disable=protected-access + self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") + + self.assertRaises( + errors.PluginError, + self.rfc2136_client.del_txt_record, + "bar", "baz") + + def test_find_domain(self): + # _query_soa | pylint: disable=protected-access + self.rfc2136_client._query_soa = mock.MagicMock(side_effect=[False, False, True]) + + # _find_domain | pylint: disable=protected-access + domain = self.rfc2136_client._find_domain('foo.bar.'+DOMAIN) + + self.assertTrue(domain == DOMAIN) + + def test_find_domain_wraps_errors(self): + # _query_soa | pylint: disable=protected-access + self.rfc2136_client._query_soa = mock.MagicMock(return_value=False) + + self.assertRaises( + errors.PluginError, + # _find_domain | pylint: disable=protected-access + self.rfc2136_client._find_domain, + 'foo.bar.'+DOMAIN) + + @mock.patch("dns.query.tcp") + def test_query_soa_found(self, query_mock): + query_mock.return_value = mock.MagicMock(answer=[mock.MagicMock()], flags=dns.flags.AA) + query_mock.return_value.rcode.return_value = dns.rcode.NOERROR + + # _query_soa | pylint: disable=protected-access + result = self.rfc2136_client._query_soa(DOMAIN) + + query_mock.assert_called_with(mock.ANY, SERVER, port=PORT) + self.assertTrue(result) + + @mock.patch("dns.query.tcp") + def test_query_soa_not_found(self, query_mock): + query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN + + # _query_soa | pylint: disable=protected-access + result = self.rfc2136_client._query_soa(DOMAIN) + + query_mock.assert_called_with(mock.ANY, SERVER, port=PORT) + self.assertFalse(result) + + @mock.patch("dns.query.tcp") + def test_query_soa_wraps_errors(self, query_mock): + query_mock.side_effect = Exception + + self.assertRaises( + errors.PluginError, + # _query_soa | pylint: disable=protected-access + self.rfc2136_client._query_soa, + DOMAIN) + + @mock.patch("dns.query.udp") + @mock.patch("dns.query.tcp") + def test_query_soa_fallback_to_udp(self, tcp_mock, udp_mock): + tcp_mock.side_effect = OSError + udp_mock.return_value = mock.MagicMock(answer=[mock.MagicMock()], flags=dns.flags.AA) + udp_mock.return_value.rcode.return_value = dns.rcode.NOERROR + + # _query_soa | pylint: disable=protected-access + result = self.rfc2136_client._query_soa(DOMAIN) + + tcp_mock.assert_called_with(mock.ANY, SERVER, port=PORT) + udp_mock.assert_called_with(mock.ANY, SERVER, port=PORT) + self.assertTrue(result) + + +if __name__ == "__main__": + unittest.main() # pragma: no cover |