Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'certbot-dns-rfc2136/tests/dns_rfc2136_test.py')
-rw-r--r--certbot-dns-rfc2136/tests/dns_rfc2136_test.py212
1 files changed, 212 insertions, 0 deletions
diff --git a/certbot-dns-rfc2136/tests/dns_rfc2136_test.py b/certbot-dns-rfc2136/tests/dns_rfc2136_test.py
new file mode 100644
index 000000000..c767dba23
--- /dev/null
+++ b/certbot-dns-rfc2136/tests/dns_rfc2136_test.py
@@ -0,0 +1,212 @@
+"""Tests for certbot_dns_rfc2136._internal.dns_rfc2136."""
+
+import unittest
+
+import dns.flags
+import dns.rcode
+import dns.tsig
+import mock
+
+from certbot import errors
+from certbot.compat import os
+from certbot.plugins import dns_test_common
+from certbot.plugins.dns_test_common import DOMAIN
+from certbot.tests import util as test_util
+
+SERVER = '192.0.2.1'
+PORT = 53
+NAME = 'a-tsig-key.'
+SECRET = 'SSB3b25kZXIgd2hvIHdpbGwgYm90aGVyIHRvIGRlY29kZSB0aGlzIHRleHQK'
+VALID_CONFIG = {"rfc2136_server": SERVER, "rfc2136_name": NAME, "rfc2136_secret": SECRET}
+
+
+class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest):
+
+ def setUp(self):
+ from certbot_dns_rfc2136._internal.dns_rfc2136 import Authenticator
+
+ super(AuthenticatorTest, self).setUp()
+
+ path = os.path.join(self.tempdir, 'file.ini')
+ dns_test_common.write(VALID_CONFIG, path)
+
+ self.config = mock.MagicMock(rfc2136_credentials=path,
+ rfc2136_propagation_seconds=0) # don't wait during tests
+
+ self.auth = Authenticator(self.config, "rfc2136")
+
+ self.mock_client = mock.MagicMock()
+ # _get_rfc2136_client | pylint: disable=protected-access
+ self.auth._get_rfc2136_client = mock.MagicMock(return_value=self.mock_client)
+
+ def test_perform(self):
+ self.auth.perform([self.achall])
+
+ expected = [mock.call.add_txt_record('_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)]
+ self.assertEqual(expected, self.mock_client.mock_calls)
+
+ def test_cleanup(self):
+ # _attempt_cleanup | pylint: disable=protected-access
+ self.auth._attempt_cleanup = True
+ self.auth.cleanup([self.achall])
+
+ expected = [mock.call.del_txt_record('_acme-challenge.'+DOMAIN, mock.ANY)]
+ self.assertEqual(expected, self.mock_client.mock_calls)
+
+ def test_invalid_algorithm_raises(self):
+ config = VALID_CONFIG.copy()
+ config["rfc2136_algorithm"] = "INVALID"
+ dns_test_common.write(config, self.config.rfc2136_credentials)
+
+ self.assertRaises(errors.PluginError,
+ self.auth.perform,
+ [self.achall])
+
+ def test_valid_algorithm_passes(self):
+ config = VALID_CONFIG.copy()
+ config["rfc2136_algorithm"] = "HMAC-sha512"
+ dns_test_common.write(config, self.config.rfc2136_credentials)
+
+ self.auth.perform([self.achall])
+
+
+class RFC2136ClientTest(unittest.TestCase):
+
+ def setUp(self):
+ from certbot_dns_rfc2136._internal.dns_rfc2136 import _RFC2136Client
+
+ self.rfc2136_client = _RFC2136Client(SERVER, PORT, NAME, SECRET, dns.tsig.HMAC_MD5)
+
+ @mock.patch("dns.query.tcp")
+ def test_add_txt_record(self, query_mock):
+ query_mock.return_value.rcode.return_value = dns.rcode.NOERROR
+ # _find_domain | pylint: disable=protected-access
+ self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
+
+ self.rfc2136_client.add_txt_record("bar", "baz", 42)
+
+ query_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
+ self.assertTrue("bar. 42 IN TXT \"baz\"" in str(query_mock.call_args[0][0]))
+
+ @mock.patch("dns.query.tcp")
+ def test_add_txt_record_wraps_errors(self, query_mock):
+ query_mock.side_effect = Exception
+ # _find_domain | pylint: disable=protected-access
+ self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
+
+ self.assertRaises(
+ errors.PluginError,
+ self.rfc2136_client.add_txt_record,
+ "bar", "baz", 42)
+
+ @mock.patch("dns.query.tcp")
+ def test_add_txt_record_server_error(self, query_mock):
+ query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN
+ # _find_domain | pylint: disable=protected-access
+ self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
+
+ self.assertRaises(
+ errors.PluginError,
+ self.rfc2136_client.add_txt_record,
+ "bar", "baz", 42)
+
+ @mock.patch("dns.query.tcp")
+ def test_del_txt_record(self, query_mock):
+ query_mock.return_value.rcode.return_value = dns.rcode.NOERROR
+ # _find_domain | pylint: disable=protected-access
+ self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
+
+ self.rfc2136_client.del_txt_record("bar", "baz")
+
+ query_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
+ self.assertTrue("bar. 0 NONE TXT \"baz\"" in str(query_mock.call_args[0][0]))
+
+ @mock.patch("dns.query.tcp")
+ def test_del_txt_record_wraps_errors(self, query_mock):
+ query_mock.side_effect = Exception
+ # _find_domain | pylint: disable=protected-access
+ self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
+
+ self.assertRaises(
+ errors.PluginError,
+ self.rfc2136_client.del_txt_record,
+ "bar", "baz")
+
+ @mock.patch("dns.query.tcp")
+ def test_del_txt_record_server_error(self, query_mock):
+ query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN
+ # _find_domain | pylint: disable=protected-access
+ self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
+
+ self.assertRaises(
+ errors.PluginError,
+ self.rfc2136_client.del_txt_record,
+ "bar", "baz")
+
+ def test_find_domain(self):
+ # _query_soa | pylint: disable=protected-access
+ self.rfc2136_client._query_soa = mock.MagicMock(side_effect=[False, False, True])
+
+ # _find_domain | pylint: disable=protected-access
+ domain = self.rfc2136_client._find_domain('foo.bar.'+DOMAIN)
+
+ self.assertTrue(domain == DOMAIN)
+
+ def test_find_domain_wraps_errors(self):
+ # _query_soa | pylint: disable=protected-access
+ self.rfc2136_client._query_soa = mock.MagicMock(return_value=False)
+
+ self.assertRaises(
+ errors.PluginError,
+ # _find_domain | pylint: disable=protected-access
+ self.rfc2136_client._find_domain,
+ 'foo.bar.'+DOMAIN)
+
+ @mock.patch("dns.query.tcp")
+ def test_query_soa_found(self, query_mock):
+ query_mock.return_value = mock.MagicMock(answer=[mock.MagicMock()], flags=dns.flags.AA)
+ query_mock.return_value.rcode.return_value = dns.rcode.NOERROR
+
+ # _query_soa | pylint: disable=protected-access
+ result = self.rfc2136_client._query_soa(DOMAIN)
+
+ query_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
+ self.assertTrue(result)
+
+ @mock.patch("dns.query.tcp")
+ def test_query_soa_not_found(self, query_mock):
+ query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN
+
+ # _query_soa | pylint: disable=protected-access
+ result = self.rfc2136_client._query_soa(DOMAIN)
+
+ query_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
+ self.assertFalse(result)
+
+ @mock.patch("dns.query.tcp")
+ def test_query_soa_wraps_errors(self, query_mock):
+ query_mock.side_effect = Exception
+
+ self.assertRaises(
+ errors.PluginError,
+ # _query_soa | pylint: disable=protected-access
+ self.rfc2136_client._query_soa,
+ DOMAIN)
+
+ @mock.patch("dns.query.udp")
+ @mock.patch("dns.query.tcp")
+ def test_query_soa_fallback_to_udp(self, tcp_mock, udp_mock):
+ tcp_mock.side_effect = OSError
+ udp_mock.return_value = mock.MagicMock(answer=[mock.MagicMock()], flags=dns.flags.AA)
+ udp_mock.return_value.rcode.return_value = dns.rcode.NOERROR
+
+ # _query_soa | pylint: disable=protected-access
+ result = self.rfc2136_client._query_soa(DOMAIN)
+
+ tcp_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
+ udp_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
+ self.assertTrue(result)
+
+
+if __name__ == "__main__":
+ unittest.main() # pragma: no cover