Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrad Warren <bmw@users.noreply.github.com>2018-07-16 17:54:36 +0300
committerGitHub <noreply@github.com>2018-07-16 17:54:36 +0300
commit997496388721c9ea1092d60bd6fce7fa641fac9b (patch)
tree64cbdd5ccd3424fe535e6a95b6777f9c59ca79d9
parent95e271bfcd9efcb6f42d2a712d7dcfc3a21408d6 (diff)
Handle existing ACMEv1 and ACMEv2 accounts (#6214) (#6215)
Fixes #6207. As noted by Erica: - we no longer need to check if it exists before linking to it, because we delete properly. - the previously excisting check on if server is in `LE_REUSE_SERVERS` before unlinking is nice, but probably not necessary, especially since we don't officially support people doing weird things with symlinks in our directories, and because we rmdir which will fail if it's not empty anyway. * Create single account symlink. * refactor _delete_accounts_dir_for_server_path * add symlinked account dir deletion * add tests (cherry picked from commit 9b0d2714c1404190a4e70457da732dd08bbe861e)
-rw-r--r--certbot/account.py81
-rw-r--r--certbot/tests/account_test.py20
2 files changed, 74 insertions, 27 deletions
diff --git a/certbot/account.py b/certbot/account.py
index f2ed5cfd5..59ceb42e0 100644
--- a/certbot/account.py
+++ b/certbot/account.py
@@ -1,5 +1,6 @@
"""Creates ACME accounts for server."""
import datetime
+import functools
import hashlib
import logging
import os
@@ -191,6 +192,11 @@ class AccountFileStorage(interfaces.AccountStorage):
def find_all(self):
return self._find_all_for_server_path(self.config.server_path)
+ def _symlink_to_account_dir(self, prev_server_path, server_path, account_id):
+ prev_account_dir = self._account_dir_path_for_server_path(account_id, prev_server_path)
+ new_account_dir = self._account_dir_path_for_server_path(account_id, server_path)
+ os.symlink(prev_account_dir, new_account_dir)
+
def _symlink_to_accounts_dir(self, prev_server_path, server_path):
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
if os.path.islink(accounts_dir):
@@ -207,7 +213,12 @@ class AccountFileStorage(interfaces.AccountStorage):
prev_server_path = constants.LE_REUSE_SERVERS[server_path]
prev_loaded_account = self._load_for_server_path(account_id, prev_server_path)
# we didn't error so we found something, so create a symlink to that
- self._symlink_to_accounts_dir(prev_server_path, server_path)
+ accounts_dir = self.config.accounts_dir_for_server_path(server_path)
+ # If accounts_dir isn't empty, make an account specific symlink
+ if os.listdir(accounts_dir):
+ self._symlink_to_account_dir(prev_server_path, server_path, account_id)
+ else:
+ self._symlink_to_accounts_dir(prev_server_path, server_path)
return prev_loaded_account
else:
raise errors.AccountNotFound(
@@ -250,49 +261,65 @@ class AccountFileStorage(interfaces.AccountStorage):
:param account_id: id of account which should be deleted
"""
- # Step 1: remove the account itself
account_dir_path = self._account_dir_path(account_id)
if not os.path.isdir(account_dir_path):
raise errors.AccountNotFound(
"Account at %s does not exist" % account_dir_path)
- shutil.rmtree(account_dir_path)
+ # Step 1: Delete account specific links and the directory
+ self._delete_account_dir_for_server_path(account_id, self.config.server_path)
- # Step 2: remove the directory if it's empty, and linked directories
+ # Step 2: Remove any accounts links and directories that are now empty
if not os.listdir(self.config.accounts_dir):
self._delete_accounts_dir_for_server_path(self.config.server_path)
+ def _delete_account_dir_for_server_path(self, account_id, server_path):
+ link_func = functools.partial(self._account_dir_path_for_server_path, account_id)
+ nonsymlinked_dir = self._delete_links_and_find_target_dir(server_path, link_func)
+ shutil.rmtree(nonsymlinked_dir)
+
def _delete_accounts_dir_for_server_path(self, server_path):
- accounts_dir_path = self.config.accounts_dir_for_server_path(server_path)
+ link_func = self.config.accounts_dir_for_server_path
+ nonsymlinked_dir = self._delete_links_and_find_target_dir(server_path, link_func)
+ os.rmdir(nonsymlinked_dir)
+
+ def _delete_links_and_find_target_dir(self, server_path, link_func):
+ """Delete symlinks and return the nonsymlinked directory path.
+
+ :param str server_path: file path based on server
+ :param callable link_func: callable that returns possible links
+ given a server_path
+
+ :returns: the final, non-symlinked target
+ :rtype: str
+
+ """
+ dir_path = link_func(server_path)
# does an appropriate directory link to me? if so, make sure that's gone
reused_servers = {}
for k in constants.LE_REUSE_SERVERS:
reused_servers[constants.LE_REUSE_SERVERS[k]] = k
- # is there a next one up? call that and be done
- if server_path in reused_servers:
- next_server_path = reused_servers[server_path]
- next_accounts_dir_path = self.config.accounts_dir_for_server_path(next_server_path)
- if os.path.islink(next_accounts_dir_path) \
- and os.readlink(next_accounts_dir_path) == accounts_dir_path:
- self._delete_accounts_dir_for_server_path(next_server_path)
- return
+ # is there a next one up?
+ possible_next_link = True
+ while possible_next_link:
+ possible_next_link = False
+ if server_path in reused_servers:
+ next_server_path = reused_servers[server_path]
+ next_dir_path = link_func(next_server_path)
+ if os.path.islink(next_dir_path) and os.readlink(next_dir_path) == dir_path:
+ possible_next_link = True
+ server_path = next_server_path
+ dir_path = next_dir_path
# if there's not a next one up to delete, then delete me
- # and whatever I link to if applicable
- if os.path.islink(accounts_dir_path):
- # save my info then delete me
- target = os.readlink(accounts_dir_path)
- os.unlink(accounts_dir_path)
- # then delete whatever I linked to, if appropriate
- if server_path in constants.LE_REUSE_SERVERS:
- prev_server_path = constants.LE_REUSE_SERVERS[server_path]
- prev_accounts_dir_path = self.config.accounts_dir_for_server_path(prev_server_path)
- if target == prev_accounts_dir_path:
- self._delete_accounts_dir_for_server_path(prev_server_path)
- else:
- # just delete me
- os.rmdir(accounts_dir_path)
+ # and whatever I link to
+ while os.path.islink(dir_path):
+ target = os.readlink(dir_path)
+ os.unlink(dir_path)
+ dir_path = target
+
+ return dir_path
def _save(self, account, acme, regr_only):
account_dir_path = self._account_dir_path(account.id)
diff --git a/certbot/tests/account_test.py b/certbot/tests/account_test.py
index e0ec3d5f8..701478336 100644
--- a/certbot/tests/account_test.py
+++ b/certbot/tests/account_test.py
@@ -249,6 +249,14 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
account = self.storage.load(self.acc.id)
self.assertEqual(prev_account, account)
+ def test_upgrade_load_single_account(self):
+ self._set_server('https://acme-staging.api.letsencrypt.org/directory')
+ self.storage.save(self.acc, self.mock_client)
+ prev_account = self.storage.load(self.acc.id)
+ self._set_server_and_stop_symlink('https://acme-staging-v02.api.letsencrypt.org/directory')
+ account = self.storage.load(self.acc.id)
+ self.assertEqual(prev_account, account)
+
def test_load_ioerror(self):
self.storage.save(self.acc, self.mock_client)
mock_open = mock.mock_open()
@@ -307,6 +315,18 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
self._test_delete_folders('https://acme-staging-v02.api.letsencrypt.org/directory')
self._assert_symlinked_account_removed()
+ def _set_server_and_stop_symlink(self, server_path):
+ self._set_server(server_path)
+ with open(os.path.join(self.config.accounts_dir, 'foo'), 'w') as f:
+ f.write('bar')
+
+ def test_delete_shared_account_up(self):
+ self._set_server_and_stop_symlink('https://acme-staging-v02.api.letsencrypt.org/directory')
+ self._test_delete_folders('https://acme-staging.api.letsencrypt.org/directory')
+
+ def test_delete_shared_account_down(self):
+ self._set_server_and_stop_symlink('https://acme-staging-v02.api.letsencrypt.org/directory')
+ self._test_delete_folders('https://acme-staging.api.letsencrypt.org/directory')
if __name__ == "__main__":
unittest.main() # pragma: no cover