diff options
Diffstat (limited to 'certbot-dns-google/tests/dns_google_test.py')
-rw-r--r-- | certbot-dns-google/tests/dns_google_test.py | 331 |
1 files changed, 331 insertions, 0 deletions
diff --git a/certbot-dns-google/tests/dns_google_test.py b/certbot-dns-google/tests/dns_google_test.py new file mode 100644 index 000000000..647a75b05 --- /dev/null +++ b/certbot-dns-google/tests/dns_google_test.py @@ -0,0 +1,331 @@ +"""Tests for certbot_dns_google._internal.dns_google.""" + +import unittest + +from googleapiclient import discovery +from googleapiclient.errors import Error +from googleapiclient.http import HttpMock +from httplib2 import ServerNotFoundError +import mock + +from certbot import errors +from certbot.compat import os +from certbot.errors import PluginError +from certbot.plugins import dns_test_common +from certbot.plugins.dns_test_common import DOMAIN +from certbot.tests import util as test_util + +ACCOUNT_JSON_PATH = '/not/a/real/path.json' +API_ERROR = Error() +PROJECT_ID = "test-test-1" + + +class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest): + + def setUp(self): + super(AuthenticatorTest, self).setUp() + + from certbot_dns_google._internal.dns_google import Authenticator + + path = os.path.join(self.tempdir, 'file.json') + open(path, "wb").close() + + super(AuthenticatorTest, self).setUp() + self.config = mock.MagicMock(google_credentials=path, + google_propagation_seconds=0) # don't wait during tests + + self.auth = Authenticator(self.config, "google") + + self.mock_client = mock.MagicMock() + # _get_google_client | pylint: disable=protected-access + self.auth._get_google_client = mock.MagicMock(return_value=self.mock_client) + + def test_perform(self): + self.auth.perform([self.achall]) + + expected = [mock.call.add_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)] + self.assertEqual(expected, self.mock_client.mock_calls) + + def test_cleanup(self): + # _attempt_cleanup | pylint: disable=protected-access + self.auth._attempt_cleanup = True + self.auth.cleanup([self.achall]) + + expected = [mock.call.del_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)] + self.assertEqual(expected, self.mock_client.mock_calls) + + @mock.patch('httplib2.Http.request', side_effect=ServerNotFoundError) + def test_without_auth(self, unused_mock): + self.config.google_credentials = None + self.assertRaises(PluginError, self.auth.perform, [self.achall]) + + +class GoogleClientTest(unittest.TestCase): + record_name = "foo" + record_content = "bar" + record_ttl = 42 + zone = "ZONE_ID" + change = "an-id" + + def _setUp_client_with_mock(self, zone_request_side_effect): + from certbot_dns_google._internal.dns_google import _GoogleClient + + pwd = os.path.dirname(__file__) + rel_path = 'testdata/discovery.json' + discovery_file = os.path.join(pwd, rel_path) + http_mock = HttpMock(discovery_file, {'status': '200'}) + dns_api = discovery.build('dns', 'v1', http=http_mock) + + client = _GoogleClient(ACCOUNT_JSON_PATH, dns_api) + + # Setup + mock_mz = mock.MagicMock() + mock_mz.list.return_value.execute.side_effect = zone_request_side_effect + + mock_rrs = mock.MagicMock() + rrsets = {"rrsets": [{"name": "_acme-challenge.example.org.", "type": "TXT", + "rrdatas": ["\"example-txt-contents\""]}]} + mock_rrs.list.return_value.execute.return_value = rrsets + mock_changes = mock.MagicMock() + + client.dns.managedZones = mock.MagicMock(return_value=mock_mz) + client.dns.changes = mock.MagicMock(return_value=mock_changes) + client.dns.resourceRecordSets = mock.MagicMock(return_value=mock_rrs) + + return client, mock_changes + + @mock.patch('googleapiclient.discovery.build') + @mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name') + @mock.patch('certbot_dns_google._internal.dns_google._GoogleClient.get_project_id') + def test_client_without_credentials(self, get_project_id_mock, credential_mock, + unused_discovery_mock): + from certbot_dns_google._internal.dns_google import _GoogleClient + _GoogleClient(None) + self.assertFalse(credential_mock.called) + self.assertTrue(get_project_id_mock.called) + + @mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name') + @mock.patch('certbot_dns_google._internal.dns_google.open', + mock.mock_open(read_data='{"project_id": "' + PROJECT_ID + '"}'), create=True) + @mock.patch('certbot_dns_google._internal.dns_google._GoogleClient.get_project_id') + def test_add_txt_record(self, get_project_id_mock, credential_mock): + client, changes = self._setUp_client_with_mock([{'managedZones': [{'id': self.zone}]}]) + credential_mock.assert_called_once_with('/not/a/real/path.json', mock.ANY) + self.assertFalse(get_project_id_mock.called) + + client.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + expected_body = { + "kind": "dns#change", + "additions": [ + { + "kind": "dns#resourceRecordSet", + "type": "TXT", + "name": self.record_name + ".", + "rrdatas": [self.record_content, ], + "ttl": self.record_ttl, + }, + ], + } + + changes.create.assert_called_with(body=expected_body, + managedZone=self.zone, + project=PROJECT_ID) + + @mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name') + @mock.patch('certbot_dns_google._internal.dns_google.open', + mock.mock_open(read_data='{"project_id": "' + PROJECT_ID + '"}'), create=True) + def test_add_txt_record_and_poll(self, unused_credential_mock): + client, changes = self._setUp_client_with_mock([{'managedZones': [{'id': self.zone}]}]) + changes.create.return_value.execute.return_value = {'status': 'pending', 'id': self.change} + changes.get.return_value.execute.return_value = {'status': 'done'} + + client.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + changes.create.assert_called_with(body=mock.ANY, + managedZone=self.zone, + project=PROJECT_ID) + + changes.get.assert_called_with(changeId=self.change, + managedZone=self.zone, + project=PROJECT_ID) + + @mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name') + @mock.patch('certbot_dns_google._internal.dns_google.open', + mock.mock_open(read_data='{"project_id": "' + PROJECT_ID + '"}'), create=True) + def test_add_txt_record_delete_old(self, unused_credential_mock): + client, changes = self._setUp_client_with_mock( + [{'managedZones': [{'id': self.zone}]}]) + # pylint: disable=line-too-long + mock_get_rrs = "certbot_dns_google._internal.dns_google._GoogleClient.get_existing_txt_rrset" + with mock.patch(mock_get_rrs) as mock_rrs: + mock_rrs.return_value = ["sample-txt-contents"] + client.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + self.assertTrue(changes.create.called) + self.assertTrue("sample-txt-contents" in + changes.create.call_args_list[0][1]["body"]["deletions"][0]["rrdatas"]) + + @mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name') + @mock.patch('certbot_dns_google._internal.dns_google.open', + mock.mock_open(read_data='{"project_id": "' + PROJECT_ID + '"}'), create=True) + def test_add_txt_record_noop(self, unused_credential_mock): + client, changes = self._setUp_client_with_mock( + [{'managedZones': [{'id': self.zone}]}]) + client.add_txt_record(DOMAIN, "_acme-challenge.example.org", + "example-txt-contents", self.record_ttl) + self.assertFalse(changes.create.called) + + @mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name') + @mock.patch('certbot_dns_google._internal.dns_google.open', + mock.mock_open(read_data='{"project_id": "' + PROJECT_ID + '"}'), create=True) + def test_add_txt_record_error_during_zone_lookup(self, unused_credential_mock): + client, unused_changes = self._setUp_client_with_mock(API_ERROR) + + self.assertRaises(errors.PluginError, client.add_txt_record, + DOMAIN, self.record_name, self.record_content, self.record_ttl) + + @mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name') + @mock.patch('certbot_dns_google._internal.dns_google.open', + mock.mock_open(read_data='{"project_id": "' + PROJECT_ID + '"}'), create=True) + def test_add_txt_record_zone_not_found(self, unused_credential_mock): + client, unused_changes = self._setUp_client_with_mock([{'managedZones': []}, + {'managedZones': []}]) + + self.assertRaises(errors.PluginError, client.add_txt_record, + DOMAIN, self.record_name, self.record_content, self.record_ttl) + + @mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name') + @mock.patch('certbot_dns_google._internal.dns_google.open', + mock.mock_open(read_data='{"project_id": "' + PROJECT_ID + '"}'), create=True) + def test_add_txt_record_error_during_add(self, unused_credential_mock): + client, changes = self._setUp_client_with_mock([{'managedZones': [{'id': self.zone}]}]) + changes.create.side_effect = API_ERROR + + self.assertRaises(errors.PluginError, client.add_txt_record, + DOMAIN, self.record_name, self.record_content, self.record_ttl) + + @mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name') + @mock.patch('certbot_dns_google._internal.dns_google.open', + mock.mock_open(read_data='{"project_id": "' + PROJECT_ID + '"}'), create=True) + def test_del_txt_record(self, unused_credential_mock): + client, changes = self._setUp_client_with_mock([{'managedZones': [{'id': self.zone}]}]) + + # pylint: disable=line-too-long + mock_get_rrs = "certbot_dns_google._internal.dns_google._GoogleClient.get_existing_txt_rrset" + with mock.patch(mock_get_rrs) as mock_rrs: + mock_rrs.return_value = ["\"sample-txt-contents\"", + "\"example-txt-contents\""] + client.del_txt_record(DOMAIN, "_acme-challenge.example.org", + "example-txt-contents", self.record_ttl) + + expected_body = { + "kind": "dns#change", + "deletions": [ + { + "kind": "dns#resourceRecordSet", + "type": "TXT", + "name": "_acme-challenge.example.org.", + "rrdatas": ["\"sample-txt-contents\"", "\"example-txt-contents\""], + "ttl": self.record_ttl, + }, + ], + "additions": [ + { + "kind": "dns#resourceRecordSet", + "type": "TXT", + "name": "_acme-challenge.example.org.", + "rrdatas": ["\"sample-txt-contents\"", ], + "ttl": self.record_ttl, + }, + ], + } + + changes.create.assert_called_with(body=expected_body, + managedZone=self.zone, + project=PROJECT_ID) + + @mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name') + @mock.patch('certbot_dns_google._internal.dns_google.open', + mock.mock_open(read_data='{"project_id": "' + PROJECT_ID + '"}'), create=True) + def test_del_txt_record_error_during_zone_lookup(self, unused_credential_mock): + client, unused_changes = self._setUp_client_with_mock(API_ERROR) + + client.del_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + @mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name') + @mock.patch('certbot_dns_google._internal.dns_google.open', + mock.mock_open(read_data='{"project_id": "' + PROJECT_ID + '"}'), create=True) + def test_del_txt_record_zone_not_found(self, unused_credential_mock): + client, unused_changes = self._setUp_client_with_mock([{'managedZones': []}, + {'managedZones': []}]) + + client.del_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + @mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name') + @mock.patch('certbot_dns_google._internal.dns_google.open', + mock.mock_open(read_data='{"project_id": "' + PROJECT_ID + '"}'), create=True) + def test_del_txt_record_error_during_delete(self, unused_credential_mock): + client, changes = self._setUp_client_with_mock([{'managedZones': [{'id': self.zone}]}]) + changes.create.side_effect = API_ERROR + + client.del_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + + @mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name') + @mock.patch('certbot_dns_google._internal.dns_google.open', + mock.mock_open(read_data='{"project_id": "' + PROJECT_ID + '"}'), create=True) + def test_get_existing(self, unused_credential_mock): + client, unused_changes = self._setUp_client_with_mock( + [{'managedZones': [{'id': self.zone}]}]) + # Record name mocked in setUp + found = client.get_existing_txt_rrset(self.zone, "_acme-challenge.example.org") + self.assertEqual(found, ["\"example-txt-contents\""]) + not_found = client.get_existing_txt_rrset(self.zone, "nonexistent.tld") + self.assertEqual(not_found, None) + + @mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name') + @mock.patch('certbot_dns_google._internal.dns_google.open', + mock.mock_open(read_data='{"project_id": "' + PROJECT_ID + '"}'), create=True) + def test_get_existing_fallback(self, unused_credential_mock): + client, unused_changes = self._setUp_client_with_mock( + [{'managedZones': [{'id': self.zone}]}]) + mock_execute = client.dns.resourceRecordSets.return_value.list.return_value.execute + mock_execute.side_effect = API_ERROR + + rrset = client.get_existing_txt_rrset(self.zone, "_acme-challenge.example.org") + self.assertFalse(rrset) + + def test_get_project_id(self): + from certbot_dns_google._internal.dns_google import _GoogleClient + + response = DummyResponse() + response.status = 200 + + with mock.patch('httplib2.Http.request', return_value=(response, 'test-test-1')): + project_id = _GoogleClient.get_project_id() + self.assertEqual(project_id, 'test-test-1') + + with mock.patch('httplib2.Http.request', return_value=(response, b'test-test-1')): + project_id = _GoogleClient.get_project_id() + self.assertEqual(project_id, 'test-test-1') + + failed_response = DummyResponse() + failed_response.status = 404 + + with mock.patch('httplib2.Http.request', + return_value=(failed_response, "some detailed http error response")): + self.assertRaises(ValueError, _GoogleClient.get_project_id) + + with mock.patch('httplib2.Http.request', side_effect=ServerNotFoundError): + self.assertRaises(ServerNotFoundError, _GoogleClient.get_project_id) + + +class DummyResponse(object): + """ + Dummy object to create a fake HTTPResponse (the actual one requires a socket and we only + need the status attribute) + """ + def __init__(self): + self.status = 200 + +if __name__ == "__main__": + unittest.main() # pragma: no cover |