Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdrien Ferrand <adferrand@users.noreply.github.com>2021-11-24 23:47:36 +0300
committerGitHub <noreply@github.com>2021-11-24 23:47:36 +0300
commit250d7b15420e96a09dbb8d615054530a42b1bf30 (patch)
tree71da9af66b09236f1c57c6d2b3814a7d7b3f040b
parent19147e1b8c3bbe6f55799f9d55c2f1027d69c0f4 (diff)
Add type annotations to the certbot package (part 3) (#9086)
* Extract from #9084 * Cast/ignore types during the transition * Fix after review * Fix lint * Update certbot/certbot/_internal/storage.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/storage.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/main.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/main.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/client.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/client.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/auth_handler.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/auth_handler.py Co-authored-by: alexzorin <alex@zor.io> * Update certbot/certbot/_internal/auth_handler.py Co-authored-by: alexzorin <alex@zor.io> * Remove a cast usage * Fix import * Remove now useless cast * Update certbot/certbot/_internal/client.py Co-authored-by: alexzorin <alex@zor.io> Co-authored-by: alexzorin <alex@zor.io>
-rw-r--r--certbot/certbot/_internal/account.py70
-rw-r--r--certbot/certbot/_internal/auth_handler.py97
-rw-r--r--certbot/certbot/_internal/cert_manager.py86
-rw-r--r--certbot/certbot/_internal/client.py208
-rw-r--r--certbot/certbot/_internal/constants.py4
-rw-r--r--certbot/certbot/_internal/eff.py17
-rw-r--r--certbot/certbot/_internal/error_handler.py25
-rw-r--r--certbot/certbot/_internal/hooks.py31
-rw-r--r--certbot/certbot/_internal/lock.py11
-rw-r--r--certbot/certbot/_internal/log.py41
-rw-r--r--certbot/certbot/_internal/main.py180
-rw-r--r--certbot/certbot/_internal/renewal.py41
-rw-r--r--certbot/certbot/_internal/reporter.py7
-rw-r--r--certbot/certbot/_internal/snap_config.py15
-rw-r--r--certbot/certbot/_internal/storage.py160
-rw-r--r--certbot/certbot/_internal/updater.py22
16 files changed, 651 insertions, 364 deletions
diff --git a/certbot/certbot/_internal/account.py b/certbot/certbot/_internal/account.py
index 3d45b8ed6..07a869789 100644
--- a/certbot/certbot/_internal/account.py
+++ b/certbot/certbot/_internal/account.py
@@ -5,9 +5,13 @@ import hashlib
import logging
import shutil
import socket
-from typing import cast
from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Dict
+from typing import List
from typing import Mapping
+from typing import Optional
from cryptography.hazmat.primitives import serialization
import josepy as jose
@@ -17,6 +21,7 @@ import pytz
from acme import fields as acme_fields
from acme import messages
from acme.client import ClientBase # pylint: disable=unused-import
+from certbot import configuration
from certbot import errors
from certbot import interfaces
from certbot import util
@@ -53,7 +58,8 @@ class Account:
creation_host = jose.Field("creation_host")
register_to_eff = jose.Field("register_to_eff", omitempty=True)
- def __init__(self, regr, key, meta=None):
+ def __init__(self, regr: messages.RegistrationResource, key: jose.JWK,
+ meta: Optional['Meta'] = None) -> None:
self.key = key
self.regr = regr
self.meta = self.Meta(
@@ -84,16 +90,16 @@ class Account:
# account key (and thus its fingerprint) to be updated...
@property
- def slug(self):
+ def slug(self) -> str:
"""Short account identification string, useful for UI."""
return "{1}@{0} ({2})".format(pyrfc3339.generate(
self.meta.creation_dt), self.meta.creation_host, self.id[:4])
- def __repr__(self):
+ def __repr__(self) -> str:
return "<{0}({1}, {2}, {3})>".format(
self.__class__.__name__, self.regr, self.id, self.meta)
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
return (isinstance(other, self.__class__) and
self.key == other.key and self.regr == other.regr and
self.meta == other.meta)
@@ -102,18 +108,18 @@ class Account:
class AccountMemoryStorage(interfaces.AccountStorage):
"""In-memory account storage."""
- def __init__(self, initial_accounts=None):
+ def __init__(self, initial_accounts: Dict[str, Account] = None) -> None:
self.accounts = initial_accounts if initial_accounts is not None else {}
- def find_all(self):
+ def find_all(self) -> List[Account]:
return list(self.accounts.values())
- def save(self, account, client):
+ def save(self, account: Account, client: ClientBase) -> None:
if account.id in self.accounts:
logger.debug("Overwriting account: %s", account.id)
self.accounts[account.id] = account
- def load(self, account_id):
+ def load(self, account_id: str) -> Account:
try:
return self.accounts[account_id]
except KeyError:
@@ -136,30 +142,30 @@ class AccountFileStorage(interfaces.AccountStorage):
:ivar certbot.configuration.NamespaceConfig config: Client configuration
"""
- def __init__(self, config):
+ def __init__(self, config: configuration.NamespaceConfig) -> None:
self.config = config
util.make_or_verify_dir(config.accounts_dir, 0o700, self.config.strict_permissions)
- def _account_dir_path(self, account_id):
+ def _account_dir_path(self, account_id: str) -> str:
return self._account_dir_path_for_server_path(account_id, self.config.server_path)
- def _account_dir_path_for_server_path(self, account_id, server_path):
+ def _account_dir_path_for_server_path(self, account_id: str, server_path: str) -> str:
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
return os.path.join(accounts_dir, account_id)
@classmethod
- def _regr_path(cls, account_dir_path):
+ def _regr_path(cls, account_dir_path: str) -> str:
return os.path.join(account_dir_path, "regr.json")
@classmethod
- def _key_path(cls, account_dir_path):
+ def _key_path(cls, account_dir_path: str) -> str:
return os.path.join(account_dir_path, "private_key.json")
@classmethod
- def _metadata_path(cls, account_dir_path):
+ def _metadata_path(cls, account_dir_path: str) -> str:
return os.path.join(account_dir_path, "meta.json")
- def _find_all_for_server_path(self, server_path):
+ def _find_all_for_server_path(self, server_path: str) -> List[Account]:
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
try:
candidates = os.listdir(accounts_dir)
@@ -186,15 +192,16 @@ class AccountFileStorage(interfaces.AccountStorage):
accounts = prev_accounts
return accounts
- def find_all(self):
+ def find_all(self) -> List[Account]:
return self._find_all_for_server_path(self.config.server_path)
- def _symlink_to_account_dir(self, prev_server_path, server_path, account_id):
+ def _symlink_to_account_dir(self, prev_server_path: str, server_path: str,
+ account_id: str) -> None:
prev_account_dir = self._account_dir_path_for_server_path(account_id, prev_server_path)
new_account_dir = self._account_dir_path_for_server_path(account_id, server_path)
os.symlink(prev_account_dir, new_account_dir)
- def _symlink_to_accounts_dir(self, prev_server_path, server_path):
+ def _symlink_to_accounts_dir(self, prev_server_path: str, server_path: str) -> None:
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
if os.path.islink(accounts_dir):
os.unlink(accounts_dir)
@@ -203,7 +210,7 @@ class AccountFileStorage(interfaces.AccountStorage):
prev_account_dir = self.config.accounts_dir_for_server_path(prev_server_path)
os.symlink(prev_account_dir, accounts_dir)
- def _load_for_server_path(self, account_id, server_path):
+ def _load_for_server_path(self, account_id: str, server_path: str) -> Account:
account_dir_path = self._account_dir_path_for_server_path(account_id, server_path)
if not os.path.isdir(account_dir_path): # isdir is also true for symlinks
if server_path in constants.LE_REUSE_SERVERS:
@@ -222,17 +229,21 @@ class AccountFileStorage(interfaces.AccountStorage):
try:
with open(self._regr_path(account_dir_path)) as regr_file:
- regr = messages.RegistrationResource.json_loads(regr_file.read())
+ # TODO: Remove cast when https://github.com/certbot/certbot/pull/9073 is merged.
+ regr = cast(messages.RegistrationResource,
+ messages.RegistrationResource.json_loads(regr_file.read()))
with open(self._key_path(account_dir_path)) as key_file:
- key = jose.JWK.json_loads(key_file.read())
+ # TODO: Remove cast when https://github.com/certbot/certbot/pull/9073 is merged.
+ key = cast(jose.JWK, jose.JWK.json_loads(key_file.read()))
with open(self._metadata_path(account_dir_path)) as metadata_file:
- meta = Account.Meta.json_loads(metadata_file.read())
+ # TODO: Remove cast when https://github.com/certbot/certbot/pull/9073 is merged.
+ meta = cast(Account.Meta, Account.Meta.json_loads(metadata_file.read()))
except IOError as error:
raise errors.AccountStorageError(error)
return Account(regr, key, meta)
- def load(self, account_id):
+ def load(self, account_id: str) -> Account:
return self._load_for_server_path(account_id, self.config.server_path)
def save(self, account: Account, client: ClientBase) -> None:
@@ -275,7 +286,7 @@ class AccountFileStorage(interfaces.AccountStorage):
except IOError as error:
raise errors.AccountStorageError(error)
- def delete(self, account_id):
+ def delete(self, account_id: str) -> None:
"""Delete registration info from disk
:param account_id: id of account which should be deleted
@@ -292,17 +303,18 @@ class AccountFileStorage(interfaces.AccountStorage):
if not os.listdir(self.config.accounts_dir):
self._delete_accounts_dir_for_server_path(self.config.server_path)
- def _delete_account_dir_for_server_path(self, account_id, server_path):
+ def _delete_account_dir_for_server_path(self, account_id: str, server_path: str) -> None:
link_func = functools.partial(self._account_dir_path_for_server_path, account_id)
nonsymlinked_dir = self._delete_links_and_find_target_dir(server_path, link_func)
shutil.rmtree(nonsymlinked_dir)
- def _delete_accounts_dir_for_server_path(self, server_path):
+ def _delete_accounts_dir_for_server_path(self, server_path: str) -> None:
link_func = self.config.accounts_dir_for_server_path
nonsymlinked_dir = self._delete_links_and_find_target_dir(server_path, link_func)
os.rmdir(nonsymlinked_dir)
- def _delete_links_and_find_target_dir(self, server_path, link_func):
+ def _delete_links_and_find_target_dir(self, server_path: str,
+ link_func: Callable[[str], str]) -> str:
"""Delete symlinks and return the nonsymlinked directory path.
:param str server_path: file path based on server
@@ -368,6 +380,6 @@ class AccountFileStorage(interfaces.AccountStorage):
uri=regr.uri)
regr_file.write(regr.json_dumps())
- def _update_meta(self, account, dir_path):
+ def _update_meta(self, account: Account, dir_path: str) -> None:
with open(self._metadata_path(dir_path), "w") as metadata_file:
metadata_file.write(account.meta.json_dumps())
diff --git a/certbot/certbot/_internal/auth_handler.py b/certbot/certbot/_internal/auth_handler.py
index f9f6b96c9..0fa1daf8e 100644
--- a/certbot/certbot/_internal/auth_handler.py
+++ b/certbot/certbot/_internal/auth_handler.py
@@ -3,15 +3,25 @@ import datetime
import logging
import time
from typing import Dict
+from typing import Iterable
from typing import List
+from typing import Optional
from typing import Tuple
+from typing import Type
+
+import josepy
+from requests.models import Response
from acme import challenges
+from acme import client
from acme import errors as acme_errors
from acme import messages
from certbot import achallenges
+from certbot import configuration
from certbot import errors
+from certbot import interfaces
from certbot._internal import error_handler
+from certbot._internal.account import Account
from certbot.display import util as display_util
from certbot.plugins import common as plugin_common
@@ -34,14 +44,17 @@ class AuthHandler:
type strings with the most preferred challenge listed first
"""
- def __init__(self, auth, acme_client, account, pref_challs):
+ def __init__(self, auth: interfaces.Authenticator, acme_client: Optional[client.ClientV2],
+ account: Optional[Account], pref_challs: List[str]) -> None:
self.auth = auth
self.acme = acme_client
self.account = account
self.pref_challs = pref_challs
- def handle_authorizations(self, orderr, config, best_effort=False, max_retries=30):
+ def handle_authorizations(self, orderr: messages.OrderResource,
+ config: configuration.NamespaceConfig, best_effort: bool = False,
+ max_retries: int = 30) -> List[messages.AuthorizationResource]:
"""
Retrieve all authorizations, perform all challenges required to validate
these authorizations, then poll and wait for the authorization to be checked.
@@ -57,6 +70,8 @@ class AuthHandler:
authzrs = orderr.authorizations[:]
if not authzrs:
raise errors.AuthorizationError('No authorization to handle.')
+ if not self.acme:
+ raise errors.Error("No ACME client defined, authorizations cannot be handled.")
# Retrieve challenges that need to be performed to validate authorizations.
achalls = self._choose_challenges(authzrs)
@@ -97,6 +112,8 @@ class AuthHandler:
return authzrs_validated
+ raise errors.Error("An unexpected error occurred while handling the authorizations.")
+
def deactivate_valid_authorizations(self, orderr: messages.OrderResource) -> Tuple[List, List]:
"""
Deactivate all `valid` authorizations in the order, so that they cannot be re-used
@@ -106,6 +123,9 @@ class AuthHandler:
list of unsuccessfully deactivated authorizations.
:rtype: tuple
"""
+ if not self.acme:
+ raise errors.Error("No ACME client defined, cannot deactivate valid authorizations.")
+
to_deactivate = [authzr for authzr in orderr.authorizations
if authzr.body.status == messages.STATUS_VALID]
deactivated = []
@@ -121,17 +141,22 @@ class AuthHandler:
return (deactivated, failed)
- def _poll_authorizations(self, authzrs, max_retries, best_effort):
+ def _poll_authorizations(self, authzrs: List[messages.AuthorizationResource], max_retries: int,
+ best_effort: bool) -> None:
"""
Poll the ACME CA server, to wait for confirmation that authorizations have their challenges
all verified. The poll may occur several times, until all authorizations are checked
(valid or invalid), or after a maximum of retries.
"""
- authzrs_to_check = {index: (authzr, None)
+ if not self.acme:
+ raise errors.Error("No ACME client defined, cannot poll authorizations.")
+
+ authzrs_to_check: Dict[int, Tuple[messages.AuthorizationResource,
+ Optional[Response]]] = {index: (authzr, None)
for index, authzr in enumerate(authzrs)}
authzrs_failed_to_report = []
# Give an initial second to the ACME CA server to check the authorizations
- sleep_seconds = 1
+ sleep_seconds: float = 1
for _ in range(max_retries):
# Wait for appropriate time (from Retry-After, initial wait, or no wait)
if sleep_seconds > 0:
@@ -166,8 +191,10 @@ class AuthHandler:
# and wait this time before polling again in next loop iteration.
# From all the pending authorizations, we take the greatest Retry-After value
# to avoid polling an authorization before its relevant Retry-After value.
+ # (by construction resp cannot be None at that time, but mypy do not know it).
retry_after = max(self.acme.retry_after(resp, 3)
- for _, resp in authzrs_to_check.values())
+ for _, resp in authzrs_to_check.values()
+ if resp is not None)
sleep_seconds = (retry_after - datetime.datetime.now()).total_seconds()
# In case of failed authzrs, create a report to the user.
@@ -181,12 +208,16 @@ class AuthHandler:
# Here authzrs_to_check is still not empty, meaning we exceeded the max polling attempt.
raise errors.AuthorizationError('All authorizations were not finalized by the CA.')
- def _choose_challenges(self, authzrs):
+ def _choose_challenges(self, authzrs: Iterable[messages.AuthorizationResource]
+ ) -> List[achallenges.AnnotatedChallenge]:
"""
Retrieve necessary and pending challenges to satisfy server.
NB: Necessary and already validated challenges are not retrieved,
as they can be reused for a certificate issuance.
"""
+ if not self.acme:
+ raise errors.Error("No ACME client defined, cannot choose the challenges.")
+
pending_authzrs = [authzr for authzr in authzrs
if authzr.body.status != messages.STATUS_VALID]
achalls: List[achallenges.AnnotatedChallenge] = []
@@ -208,7 +239,7 @@ class AuthHandler:
return achalls
- def _get_chall_pref(self, domain):
+ def _get_chall_pref(self, domain: str) -> List[Type[challenges.Challenge]]:
"""Return list of challenge preferences.
:param str domain: domain for which you are requesting preferences
@@ -230,7 +261,7 @@ class AuthHandler:
chall_prefs.extend(plugin_pref)
return chall_prefs
- def _cleanup_challenges(self, achalls):
+ def _cleanup_challenges(self, achalls: List[achallenges.AnnotatedChallenge]) -> None:
"""Cleanup challenges.
:param achalls: annotated challenges to cleanup
@@ -240,7 +271,8 @@ class AuthHandler:
logger.info("Cleaning up challenges")
self.auth.cleanup(achalls)
- def _challenge_factory(self, authzr, path):
+ def _challenge_factory(self, authzr: messages.AuthorizationResource,
+ path: List[int]) -> List[achallenges.AnnotatedChallenge]:
"""Construct Namedtuple Challenges
:param messages.AuthorizationResource authzr: authorization
@@ -248,12 +280,14 @@ class AuthHandler:
:param list path: List of indices from `challenges`.
:returns: achalls, list of challenge type
- :class:`certbot.achallenges.Indexed`
+ :class:`certbot.achallenges.AnnotatedChallenge`
:rtype: list
:raises .errors.Error: if challenge type is not recognized
"""
+ if not self.account:
+ raise errors.Error("Account is not set.")
achalls = []
for index in path:
@@ -265,6 +299,8 @@ class AuthHandler:
def _report_failed_authzrs(self, failed_authzrs: List[messages.AuthorizationResource]) -> None:
"""Notifies the user about failed authorizations."""
+ if not self.account:
+ raise errors.Error("Account is not set.")
problems: Dict[str, List[achallenges.AnnotatedChallenge]] = {}
failed_achalls = [challb_to_achall(challb, self.account.key, authzr.body.identifier.value)
for authzr in failed_authzrs for challb in authzr.body.challenges
@@ -273,8 +309,9 @@ class AuthHandler:
for achall in failed_achalls:
problems.setdefault(achall.error.typ, []).append(achall)
- msg = [f"\nCertbot failed to authenticate some domains (authenticator: {self.auth.name})."
- " The Certificate Authority reported these problems:"]
+ msg = ["\nCertbot failed to authenticate some domains "
+ f"(authenticator: {self.auth.name})."
+ " The Certificate Authority reported these problems:"]
for _, achalls in sorted(problems.items(), key=lambda item: item[0]):
msg.append(_generate_failed_chall_msg(achalls))
@@ -287,7 +324,8 @@ class AuthHandler:
display_util.notify("".join(msg))
-def challb_to_achall(challb, account_key, domain):
+def challb_to_achall(challb: messages.ChallengeBody, account_key: josepy.JWK,
+ domain: str) -> achallenges.AnnotatedChallenge:
"""Converts a ChallengeBody object to an AnnotatedChallenge.
:param .ChallengeBody challb: ChallengeBody
@@ -310,7 +348,9 @@ def challb_to_achall(challb, account_key, domain):
"Received unsupported challenge of type: {0}".format(chall.typ))
-def gen_challenge_path(challbs, preferences, combinations):
+def gen_challenge_path(challbs: List[messages.ChallengeBody],
+ preferences: List[Type[challenges.Challenge]],
+ combinations: Tuple[List[int], ...]) -> List[int]:
"""Generate a plan to get authority over the identity.
.. todo:: This can be possibly be rewritten to use resolved_combinations.
@@ -328,8 +368,8 @@ def gen_challenge_path(challbs, preferences, combinations):
:class:`acme.messages.Challenge`, each of which would
be sufficient to prove possession of the identifier.
- :returns: tuple of indices from ``challenges``.
- :rtype: tuple
+ :returns: list of indices from ``challenges``.
+ :rtype: list
:raises certbot.errors.AuthorizationError: If a
path cannot be created that satisfies the CA given the preferences and
@@ -341,7 +381,10 @@ def gen_challenge_path(challbs, preferences, combinations):
return _find_dumb_path(challbs, preferences)
-def _find_smart_path(challbs, preferences, combinations):
+def _find_smart_path(challbs: List[messages.ChallengeBody],
+ preferences: List[Type[challenges.Challenge]],
+ combinations: Tuple[List[int], ...]
+ ) -> List[int]:
"""Find challenge path with server hints.
Can be called if combinations is included. Function uses a simple
@@ -356,7 +399,7 @@ def _find_smart_path(challbs, preferences, combinations):
# max_cost is now equal to sum(indices) + 1
- best_combo = None
+ best_combo: Optional[List[int]] = None
# Set above completing all of the available challenges
best_combo_cost = max_cost
@@ -373,12 +416,13 @@ def _find_smart_path(challbs, preferences, combinations):
combo_total = 0
if not best_combo:
- _report_no_chall_path(challbs)
+ raise _report_no_chall_path(challbs)
return best_combo
-def _find_dumb_path(challbs, preferences):
+def _find_dumb_path(challbs: List[messages.ChallengeBody],
+ preferences: List[Type[challenges.Challenge]]) -> List[int]:
"""Find challenge path without server hints.
Should be called if the combinations hint is not included by the
@@ -394,16 +438,19 @@ def _find_dumb_path(challbs, preferences):
if supported:
path.append(i)
else:
- _report_no_chall_path(challbs)
+ raise _report_no_chall_path(challbs)
return path
-def _report_no_chall_path(challbs):
- """Logs and raises an error that no satisfiable chall path exists.
+def _report_no_chall_path(challbs: List[messages.ChallengeBody]) -> errors.AuthorizationError:
+ """Logs and return a raisable error reporting that no satisfiable chall path exists.
:param challbs: challenges from the authorization that can't be satisfied
+ :returns: An authorization error
+ :rtype: certbot.errors.AuthorizationError
+
"""
msg = ("Client with the currently selected authenticator does not support "
"any combination of challenges that will satisfy the CA.")
@@ -412,7 +459,7 @@ def _report_no_chall_path(challbs):
" You may need to use an authenticator "
"plugin that can do challenges over DNS.")
logger.critical(msg)
- raise errors.AuthorizationError(msg)
+ return errors.AuthorizationError(msg)
def _generate_failed_chall_msg(failed_achalls: List[achallenges.AnnotatedChallenge]) -> str:
diff --git a/certbot/certbot/_internal/cert_manager.py b/certbot/certbot/_internal/cert_manager.py
index ab66e57c7..7eeddfa1a 100644
--- a/certbot/certbot/_internal/cert_manager.py
+++ b/certbot/certbot/_internal/cert_manager.py
@@ -3,10 +3,18 @@ import datetime
import logging
import re
import traceback
+from typing import Any
+from typing import Callable
+from typing import Iterable
from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import TypeVar
+from typing import Union
import pytz
+from certbot import configuration
from certbot import crypto_util
from certbot import errors
from certbot import ocsp
@@ -22,7 +30,7 @@ logger = logging.getLogger(__name__)
###################
-def update_live_symlinks(config):
+def update_live_symlinks(config: configuration.NamespaceConfig) -> None:
"""Update the certificate file family symlinks to use archive_dir.
Use the information in the config file to make symlinks point to
@@ -38,7 +46,7 @@ def update_live_symlinks(config):
storage.RenewableCert(renewal_file, config, update_symlinks=True)
-def rename_lineage(config):
+def rename_lineage(config: configuration.NamespaceConfig) -> None:
"""Rename the specified lineage to the new name.
:param config: Configuration.
@@ -64,7 +72,7 @@ def rename_lineage(config):
.format(certname, new_certname), pause=False)
-def certificates(config):
+def certificates(config: configuration.NamespaceConfig) -> None:
"""Display information about certs configured with Certbot
:param config: Configuration.
@@ -87,7 +95,7 @@ def certificates(config):
_describe_certs(config, parsed_certs, parse_failures)
-def delete(config):
+def delete(config: configuration.NamespaceConfig) -> None:
"""Delete Certbot files associated with a certificate lineage."""
certnames = get_certnames(config, "delete", allow_multiple=True)
msg = ["The following certificate(s) are selected for deletion:\n"]
@@ -112,7 +120,9 @@ def delete(config):
# Public Helpers
###################
-def lineage_for_certname(cli_config, certname):
+
+def lineage_for_certname(cli_config: configuration.NamespaceConfig,
+ certname: str) -> Optional[storage.RenewableCert]:
"""Find a lineage object with name certname."""
configs_dir = cli_config.renewal_configs_dir
# Verify the directory is there
@@ -129,13 +139,16 @@ def lineage_for_certname(cli_config, certname):
return None
-def domains_for_certname(config, certname):
+def domains_for_certname(config: configuration.NamespaceConfig,
+ certname: str) -> Optional[List[str]]:
"""Find the domains in the cert with name certname."""
lineage = lineage_for_certname(config, certname)
return lineage.names() if lineage else None
-def find_duplicative_certs(config, domains):
+def find_duplicative_certs(config: configuration.NamespaceConfig,
+ domains: List[str]) -> Tuple[Optional[storage.RenewableCert],
+ Optional[storage.RenewableCert]]:
"""Find existing certs that match the given domain names.
This function searches for certificates whose domains are equal to
@@ -158,7 +171,11 @@ def find_duplicative_certs(config, domains):
:rtype: `tuple` of `storage.RenewableCert` or `None`
"""
- def update_certs_for_domain_matches(candidate_lineage, rv):
+ def update_certs_for_domain_matches(candidate_lineage: storage.RenewableCert,
+ rv: Tuple[Optional[storage.RenewableCert],
+ Optional[storage.RenewableCert]]
+ ) -> Tuple[Optional[storage.RenewableCert],
+ Optional[storage.RenewableCert]]:
"""Return cert as identical_names_cert if it matches,
or subset_names_cert if it matches as subset
"""
@@ -177,10 +194,12 @@ def find_duplicative_certs(config, domains):
subset_names_cert = candidate_lineage
return (identical_names_cert, subset_names_cert)
- return _search_lineages(config, update_certs_for_domain_matches, (None, None))
+ init: Tuple[Optional[storage.RenewableCert], Optional[storage.RenewableCert]] = (None, None)
+
+ return _search_lineages(config, update_certs_for_domain_matches, init)
-def _archive_files(candidate_lineage, filetype):
+def _archive_files(candidate_lineage: storage.RenewableCert, filetype: str) -> Optional[List[str]]:
""" In order to match things like:
/etc/letsencrypt/archive/example.com/chain1.pem.
@@ -202,7 +221,8 @@ def _archive_files(candidate_lineage, filetype):
return None
-def _acceptable_matches():
+def _acceptable_matches() -> List[Union[Callable[[storage.RenewableCert], str],
+ Callable[[storage.RenewableCert], Optional[List[str]]]]]:
""" Generates the list that's passed to match_and_check_overlaps. Is its own function to
make unit testing easier.
@@ -213,7 +233,7 @@ def _acceptable_matches():
lambda x: _archive_files(x, "cert"), lambda x: _archive_files(x, "fullchain")]
-def cert_path_to_lineage(cli_config):
+def cert_path_to_lineage(cli_config: configuration.NamespaceConfig) -> str:
""" If config.cert_path is defined, try to find an appropriate value for config.certname.
:param `configuration.NamespaceConfig` cli_config: parsed command line arguments
@@ -226,11 +246,16 @@ def cert_path_to_lineage(cli_config):
"""
acceptable_matches = _acceptable_matches()
match = match_and_check_overlaps(cli_config, acceptable_matches,
- lambda x: cli_config.cert_path, lambda x: x.lineagename)
+ lambda x: cli_config.cert_path, lambda x: x.lineagename)
return match[0]
-def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func):
+def match_and_check_overlaps(cli_config: configuration.NamespaceConfig,
+ acceptable_matches: Iterable[Union[
+ Callable[[storage.RenewableCert], str],
+ Callable[[storage.RenewableCert], Optional[List[str]]]]],
+ match_func: Callable[[storage.RenewableCert], str],
+ rv_func: Callable[[storage.RenewableCert], str]) -> List[str]:
""" Searches through all lineages for a match, and checks for duplicates.
If a duplicate is found, an error is raised, as performing operations on lineages
that have their properties incorrectly duplicated elsewhere is probably a bad idea.
@@ -241,21 +266,24 @@ def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func
:param function rv_func: specifies what to return
"""
- def find_matches(candidate_lineage, return_value, acceptable_matches):
+ def find_matches(candidate_lineage: storage.RenewableCert, return_value: List[str],
+ acceptable_matches: Iterable[Union[
+ Callable[[storage.RenewableCert], str],
+ Callable[[storage.RenewableCert], Optional[List[str]]]]]) -> List[str]:
"""Returns a list of matches using _search_lineages."""
- acceptable_matches = [func(candidate_lineage) for func in acceptable_matches]
+ acceptable_matches_resolved = [func(candidate_lineage) for func in acceptable_matches]
acceptable_matches_rv: List[str] = []
- for item in acceptable_matches:
+ for item in acceptable_matches_resolved:
if isinstance(item, list):
acceptable_matches_rv += item
- else:
+ elif item:
acceptable_matches_rv.append(item)
match = match_func(candidate_lineage)
if match in acceptable_matches_rv:
return_value.append(rv_func(candidate_lineage))
return return_value
- matched = _search_lineages(cli_config, find_matches, [], acceptable_matches)
+ matched: List[str] = _search_lineages(cli_config, find_matches, [], acceptable_matches)
if not matched:
raise errors.Error("No match found for cert-path {0}!".format(cli_config.cert_path))
elif len(matched) > 1:
@@ -263,7 +291,8 @@ def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func
return matched
-def human_readable_cert_info(config, cert, skip_filter_checks=False):
+def human_readable_cert_info(config: configuration.NamespaceConfig, cert: storage.RenewableCert,
+ skip_filter_checks: bool = False) -> Optional[str]:
""" Returns a human readable description of info about a RenewableCert object"""
certinfo = []
checker = ocsp.RevocationChecker()
@@ -312,7 +341,8 @@ def human_readable_cert_info(config, cert, skip_filter_checks=False):
return "".join(certinfo)
-def get_certnames(config, verb, allow_multiple=False, custom_prompt=None):
+def get_certnames(config: configuration.NamespaceConfig, verb: str, allow_multiple: bool = False,
+ custom_prompt: Optional[str] = None) -> List[str]:
"""Get certname from flag, interactively, or error out."""
certname = config.certname
if certname:
@@ -350,12 +380,13 @@ def get_certnames(config, verb, allow_multiple=False, custom_prompt=None):
###################
-def _report_lines(msgs):
+def _report_lines(msgs: Iterable[str]) -> str:
"""Format a results report for a category of single-line renewal outcomes"""
return " " + "\n ".join(str(msg) for msg in msgs)
-def _report_human_readable(config, parsed_certs):
+def _report_human_readable(config: configuration.NamespaceConfig,
+ parsed_certs: Iterable[storage.RenewableCert]) -> str:
"""Format a results report for a parsed cert"""
certinfo = []
for cert in parsed_certs:
@@ -365,7 +396,9 @@ def _report_human_readable(config, parsed_certs):
return "\n".join(certinfo)
-def _describe_certs(config, parsed_certs, parse_failures):
+def _describe_certs(config: configuration.NamespaceConfig,
+ parsed_certs: Iterable[storage.RenewableCert],
+ parse_failures: Iterable[str]) -> None:
"""Print information about the certs we know about"""
out: List[str] = []
@@ -386,7 +419,10 @@ def _describe_certs(config, parsed_certs, parse_failures):
display_util.notification("\n".join(out), pause=False, wrap=False)
-def _search_lineages(cli_config, func, initial_rv, *args):
+T = TypeVar('T')
+
+def _search_lineages(cli_config: configuration.NamespaceConfig, func: Callable[..., T],
+ initial_rv: T, *args: Any) -> T:
"""Iterate func over unbroken lineages, allowing custom return conditions.
Allows flexible customization of return values, including multiple
diff --git a/certbot/certbot/_internal/client.py b/certbot/certbot/_internal/client.py
index 593ba460f..f5793c895 100644
--- a/certbot/certbot/_internal/client.py
+++ b/certbot/certbot/_internal/client.py
@@ -2,11 +2,15 @@
import datetime
import logging
import platform
-from typing import cast
from typing import Any
+from typing import Callable
+from typing import cast
from typing import Dict
+from typing import IO
from typing import List
from typing import Optional
+from typing import Set
+from typing import Tuple
from typing import Union
import warnings
@@ -20,8 +24,10 @@ from acme import crypto_util as acme_crypto_util
from acme import errors as acme_errors
from acme import messages
import certbot
+from certbot import configuration
from certbot import crypto_util
from certbot import errors
+from certbot import interfaces
from certbot import util
from certbot._internal import account
from certbot._internal import auth_handler
@@ -30,15 +36,20 @@ from certbot._internal import constants
from certbot._internal import eff
from certbot._internal import error_handler
from certbot._internal import storage
+from certbot._internal.plugins import disco as plugin_disco
from certbot._internal.plugins import selection as plugin_selection
from certbot.compat import os
from certbot.display import ops as display_ops
from certbot.display import util as display_util
+from certbot.interfaces import AccountStorage
logger = logging.getLogger(__name__)
-def acme_from_config_key(config, key, regr=None):
- "Wrangle ACME client construction"
+
+def acme_from_config_key(config: configuration.NamespaceConfig, key: jose.JWK,
+ regr: Optional[messages.RegistrationResource] = None
+ ) -> acme_client.ClientV2:
+ """Wrangle ACME client construction"""
# TODO: Allow for other alg types besides RS256
net = acme_client.ClientNetwork(key, account=regr, verify_ssl=(not config.no_verify_ssl),
user_agent=determine_user_agent(config))
@@ -52,10 +63,10 @@ def acme_from_config_key(config, key, regr=None):
"Certbot is configured to use an ACMEv1 server (%s). ACMEv1 support is deprecated"
" and will soon be removed. See https://community.letsencrypt.org/t/143839 for "
"more information.", config.server)
- return client
+ return cast(acme_client.ClientV2, client)
-def determine_user_agent(config):
+def determine_user_agent(config: configuration.NamespaceConfig) -> str:
"""
Set a user_agent string in the config based on the choice of plugins.
(this wasn't knowable at construction time)
@@ -86,8 +97,9 @@ def determine_user_agent(config):
ua = config.user_agent
return ua
-def ua_flags(config):
- "Turn some very important CLI flags into clues in the user agent."
+
+def ua_flags(config: configuration.NamespaceConfig) -> str:
+ """Turn some very important CLI flags into clues in the user agent."""
if isinstance(config, DummyConfig):
return "FLAGS"
flags = []
@@ -105,25 +117,30 @@ def ua_flags(config):
flags.append("hook")
return " ".join(flags)
+
class DummyConfig:
- "Shim for computing a sample user agent."
- def __init__(self):
+ """Shim for computing a sample user agent."""
+ def __init__(self) -> None:
self.authenticator = "XXX"
self.installer = "YYY"
self.user_agent = None
self.verb = "SUBCOMMAND"
- def __getattr__(self, name):
- "Any config properties we might have are None."
+ def __getattr__(self, name: str) -> Any:
+ """Any config properties we might have are None."""
return None
-def sample_user_agent():
- "Document what this Certbot's user agent string will be like."
- return determine_user_agent(DummyConfig())
+def sample_user_agent() -> str:
+ """Document what this Certbot's user agent string will be like."""
+ # DummyConfig is designed to mock certbot.configuration.NamespaceConfig.
+ # Let mypy accept that.
+ return determine_user_agent(cast(configuration.NamespaceConfig, DummyConfig()))
-def register(config, account_storage, tos_cb=None):
+def register(config: configuration.NamespaceConfig, account_storage: AccountStorage,
+ tos_cb: Optional[Callable[[str], None]] = None
+ ) -> Tuple[account.Account, acme_client.ClientV2]:
"""Register new account with an ACME CA.
This function takes care of generating fresh private key,
@@ -143,13 +160,11 @@ def register(config, account_storage, tos_cb=None):
Service before registering account, client action is
necessary. For example, a CLI tool would prompt the user
acceptance. `tos_cb` must be a callable that should accept
- `.RegistrationResource` and return a `bool`: ``True`` iff the
- Terms of Service present in the contained
- `.Registration.terms_of_service` is accepted by the client, and
- ``False`` otherwise. ``tos_cb`` will be called only if the
- client action is necessary, i.e. when ``terms_of_service is not
- None``. This argument is optional, if not supplied it will
- default to automatic acceptance!
+ a Term of Service URL as a string, and raise an exception
+ if the TOS is not accepted by the client. ``tos_cb`` will be
+ called only if the client action is necessary, i.e. when
+ ``terms_of_service is not None``. This argument is optional,
+ if not supplied it will default to automatic acceptance!
:raises certbot.errors.Error: In case of any client problems, in
particular registration failure, or unaccepted Terms of Service.
@@ -194,12 +209,13 @@ def register(config, account_storage, tos_cb=None):
return acc, acme
-def perform_registration(acme, config, tos_cb):
+def perform_registration(acme: acme_client.ClientV2, config: configuration.NamespaceConfig,
+ tos_cb: Optional[Callable[[str], None]]) -> messages.RegistrationResource:
"""
Actually register new account, trying repeatedly if there are email
problems
- :param acme.client.Client client: ACME client object.
+ :param acme.client.Client acme: ACME client object.
:param certbot.configuration.NamespaceConfig config: Client configuration.
:param Callable tos_cb: a callback to handle Term of Service agreement.
@@ -210,11 +226,11 @@ def perform_registration(acme, config, tos_cb):
eab_credentials_supplied = config.eab_kid and config.eab_hmac_key
eab: Optional[Dict[str, Any]]
if eab_credentials_supplied:
- account_public_key = acme.client.net.key.public_key()
+ account_public_key = acme.net.key.public_key()
eab = messages.ExternalAccountBinding.from_data(account_public_key=account_public_key,
kid=config.eab_kid,
hmac_key=config.eab_hmac_key,
- directory=acme.client.directory)
+ directory=acme.directory)
else:
eab = None
@@ -229,7 +245,15 @@ def perform_registration(acme, config, tos_cb):
newreg = messages.NewRegistration.from_data(
email=config.email,
external_account_binding=cast(Optional[messages.ExternalAccountBinding], eab))
- return acme.new_account_and_tos(newreg, tos_cb)
+ # Until ACME v1 support is removed from Certbot, we actually need the provided
+ # ACME client to be a wrapper of type BackwardsCompatibleClientV2.
+ # TODO: Remove this cast and rewrite the logic when the client is actually a ClientV2
+ try:
+ return cast(acme_client.BackwardsCompatibleClientV2,
+ acme).new_account_and_tos(newreg, tos_cb)
+ except AttributeError:
+ raise errors.Error("The ACME client must be an instance of "
+ "acme.client.BackwardsCompatibleClientV2")
except messages.Error as e:
if e.code in ('invalidEmail', 'invalidContact'):
if config.noninteractive_mode:
@@ -258,7 +282,10 @@ class Client:
"""
- def __init__(self, config, account_, auth, installer, acme=None):
+ def __init__(self, config: configuration.NamespaceConfig, account_: Optional[account.Account],
+ auth: Optional[interfaces.Authenticator],
+ installer: Optional[interfaces.Installer],
+ acme: Optional[acme_client.ClientV2] = None) -> None:
"""Initialize a client."""
self.config = config
self.account = account_
@@ -277,7 +304,9 @@ class Client:
else:
self.auth_handler = None
- def obtain_certificate_from_csr(self, csr, orderr=None):
+ def obtain_certificate_from_csr(self, csr: util.CSR,
+ orderr: Optional[messages.OrderResource] = None
+ ) -> Tuple[bytes, bytes]:
"""Obtain certificate.
:param .util.CSR csr: PEM-encoded Certificate Signing
@@ -294,8 +323,10 @@ class Client:
"not set.")
logger.error(msg)
raise errors.Error(msg)
- if self.account.regr is None:
+ if self.account is None or self.account.regr is None:
raise errors.Error("Please register with the ACME server first.")
+ if self.acme is None:
+ raise errors.Error("ACME client is not set.")
logger.debug("CSR: %s", csr)
@@ -303,19 +334,18 @@ class Client:
orderr = self._get_order_and_authorizations(csr.data, best_effort=False)
deadline = datetime.datetime.now() + datetime.timedelta(seconds=90)
- get_alt_chains = self.config.preferred_chain is not None
- orderr = self.acme.finalize_order(orderr, deadline,
- fetch_alternative_chains=get_alt_chains)
+ orderr = self.acme.finalize_order(
+ orderr, deadline, fetch_alternative_chains=self.config.preferred_chain is not None)
fullchain = orderr.fullchain_pem
- if get_alt_chains and orderr.alternative_fullchains_pem:
- fullchain = crypto_util.find_chain_with_issuer([fullchain] + \
- orderr.alternative_fullchains_pem,
- self.config.preferred_chain,
- not self.config.dry_run)
+ if self.config.preferred_chain and orderr.alternative_fullchains_pem:
+ fullchain = crypto_util.find_chain_with_issuer(
+ [fullchain] + orderr.alternative_fullchains_pem,
+ self.config.preferred_chain, not self.config.dry_run)
cert, chain = crypto_util.cert_and_chain_from_fullchain(fullchain)
return cert.encode(), chain.encode()
- def obtain_certificate(self, domains, old_keypath=None):
+ def obtain_certificate(self, domains: List[str], old_keypath: Optional[str] = None
+ ) -> Tuple[bytes, bytes, util.Key, util.CSR]:
"""Obtains a certificate from the ACME server.
`.register` must be called before `.obtain_certificate`
@@ -386,7 +416,8 @@ class Client:
elliptic_curve=elliptic_curve,
strict_permissions=self.config.strict_permissions,
)
- csr = crypto_util.generate_csr(key, domains, self.config.csr_dir,
+ # TODO: Remove the cast once certbot package is fully typed
+ csr = crypto_util.generate_csr(key, cast(Set[str], domains), self.config.csr_dir,
self.config.must_staple, self.config.strict_permissions)
orderr = self._get_order_and_authorizations(csr.data, self.config.allow_subset_of_names)
@@ -408,11 +439,11 @@ class Client:
cert, chain = self.obtain_certificate_from_csr(csr, orderr)
return cert, chain, key, csr
- def _get_order_and_authorizations(self, csr_pem: str,
+ def _get_order_and_authorizations(self, csr_pem: bytes,
best_effort: bool) -> messages.OrderResource:
"""Request a new order and complete its authorizations.
- :param str csr_pem: A CSR in PEM format.
+ :param bytes csr_pem: A CSR in PEM format.
:param bool best_effort: True if failing to complete all
authorizations should not raise an exception
@@ -420,6 +451,8 @@ class Client:
:rtype: acme.messages.OrderResource
"""
+ if not self.acme:
+ raise errors.Error("ACME client is not set.")
try:
orderr = self.acme.new_order(csr_pem)
except acme_errors.WildcardUnsupportedError:
@@ -442,7 +475,8 @@ class Client:
authzr = self.auth_handler.handle_authorizations(orderr, self.config, best_effort)
return orderr.update(authorizations=authzr)
- def obtain_and_enroll_certificate(self, domains, certname):
+ def obtain_and_enroll_certificate(self, domains: List[str], certname: Optional[str]
+ ) -> Optional[storage.RenewableCert]:
"""Obtain and enroll certificate.
Get a new certificate for the specified domains using the specified
@@ -455,8 +489,7 @@ class Client:
:type certname: `str` or `None`
:returns: A new :class:`certbot._internal.storage.RenewableCert` instance
- referred to the enrolled cert lineage, False if the cert could not
- be obtained, or None if doing a successful dry run.
+ referred to the enrolled cert lineage, or None if doing a successful dry run.
"""
cert, chain, key, _ = self.obtain_certificate(domains)
@@ -470,15 +503,14 @@ class Client:
new_name = self._choose_lineagename(domains, certname)
if self.config.dry_run:
- logger.debug("Dry run: Skipping creating new lineage for %s",
- new_name)
+ logger.debug("Dry run: Skipping creating new lineage for %s", new_name)
return None
return storage.RenewableCert.new_lineage(
new_name, cert,
key.pem, chain,
self.config)
- def _choose_lineagename(self, domains, certname):
+ def _choose_lineagename(self, domains: List[str], certname: Optional[str]) -> str:
"""Chooses a name for the new lineage.
:param domains: domains in certificate request
@@ -497,12 +529,13 @@ class Client:
return domains[0][2:]
return domains[0]
- def save_certificate(self, cert_pem, chain_pem,
- cert_path, chain_path, fullchain_path):
+ def save_certificate(self, cert_pem: bytes, chain_pem: bytes,
+ cert_path: str, chain_path: str, fullchain_path: str
+ ) -> Tuple[str, str, str]:
"""Saves the certificate received from the ACME server.
- :param str cert_pem:
- :param str chain_pem:
+ :param bytes cert_pem:
+ :param bytes chain_pem:
:param str cert_path: Candidate path to a certificate.
:param str chain_path: Candidate path to a certificate chain.
:param str fullchain_path: Candidate path to a full cert chain.
@@ -517,7 +550,6 @@ class Client:
for path in cert_path, chain_path, fullchain_path:
util.make_or_verify_dir(os.path.dirname(path), 0o755, self.config.strict_permissions)
-
cert_file, abs_cert_path = _open_pem_file('cert_path', cert_path)
try:
@@ -525,17 +557,16 @@ class Client:
finally:
cert_file.close()
- chain_file, abs_chain_path =\
- _open_pem_file('chain_path', chain_path)
- fullchain_file, abs_fullchain_path =\
- _open_pem_file('fullchain_path', fullchain_path)
+ chain_file, abs_chain_path = _open_pem_file('chain_path', chain_path)
+ fullchain_file, abs_fullchain_path = _open_pem_file('fullchain_path', fullchain_path)
_save_chain(chain_pem, chain_file)
_save_chain(cert_pem + chain_pem, fullchain_file)
return abs_cert_path, abs_chain_path, abs_fullchain_path
- def deploy_certificate(self, domains, privkey_path, cert_path, chain_path, fullchain_path):
+ def deploy_certificate(self, domains: List[str], privkey_path: str, cert_path: str,
+ chain_path: str, fullchain_path: str) -> None:
"""Install certificate
:param list domains: list of domains to install the certificate
@@ -572,7 +603,8 @@ class Client:
# sites may have been enabled / final cleanup
self.installer.restart()
- def enhance_config(self, domains, chain_path, redirect_default=True):
+ def enhance_config(self, domains: List[str], chain_path: str,
+ redirect_default: bool = True) -> None:
"""Enhance the configuration.
:param list domains: list of domains to configure
@@ -630,11 +662,14 @@ class Client:
"""
+ if not self.installer:
+ raise errors.Error("No installer plugin has been set.")
enh_label = options if enhancement == "ensure-http-header" else enhancement
with error_handler.ErrorHandler(self._recovery_routine_with_msg, None):
for dom in domains:
try:
- self.installer.enhance(dom, enhancement, options)
+ # TODO: Remove the cast once certbot package is fully typed
+ self.installer.enhance(dom, enhancement, cast(Optional[List[str]], options))
except errors.PluginEnhancementAlreadyPresent:
logger.info("Enhancement %s was already set.", enh_label)
except errors.PluginError:
@@ -649,32 +684,34 @@ class Client:
:param str success_msg: message to show on successful recovery
"""
- self.installer.recovery_routine()
- if success_msg:
- display_util.notify(success_msg)
+ if self.installer:
+ self.installer.recovery_routine()
+ if success_msg:
+ display_util.notify(success_msg)
- def _rollback_and_restart(self, success_msg):
+ def _rollback_and_restart(self, success_msg: str) -> None:
"""Rollback the most recent checkpoint and restart the webserver
:param str success_msg: message to show on successful rollback
"""
- logger.info("Rolling back to previous server configuration...")
- try:
- self.installer.rollback_checkpoints()
- self.installer.restart()
- except:
- logger.error(
- "An error occurred and we failed to restore your config and "
- "restart your server. Please post to "
- "https://community.letsencrypt.org/c/help "
- "with details about your configuration and this error you received."
- )
- raise
- display_util.notify(success_msg)
+ if self.installer:
+ logger.info("Rolling back to previous server configuration...")
+ try:
+ self.installer.rollback_checkpoints()
+ self.installer.restart()
+ except:
+ logger.error(
+ "An error occurred and we failed to restore your config and "
+ "restart your server. Please post to "
+ "https://community.letsencrypt.org/c/help "
+ "with details about your configuration and this error you received."
+ )
+ raise
+ display_util.notify(success_msg)
-def validate_key_csr(privkey, csr=None):
+def validate_key_csr(privkey: util.Key, csr: Optional[util.CSR] = None) -> None:
"""Validate Key and CSR files.
Verifies that the client key and csr arguments are valid and correspond to
@@ -720,13 +757,16 @@ def validate_key_csr(privkey, csr=None):
raise errors.Error("The key and CSR do not match")
-def rollback(default_installer, checkpoints, config, plugins):
+def rollback(default_installer: str, checkpoints: int,
+ config: configuration.NamespaceConfig, plugins: plugin_disco.PluginsRegistry) -> None:
"""Revert configuration the specified number of checkpoints.
+ :param str default_installer: Default installer name to use for the rollback
:param int checkpoints: Number of checkpoints to revert.
-
:param config: Configuration.
:type config: :class:`certbot.configuration.NamespaceConfiguration`
+ :param plugins: Plugins available
+ :type plugins: :class:`certbot._internal.plugins.disco.PluginsRegistry`
"""
# Misconfigurations are only a slight problems... allow the user to rollback
@@ -741,7 +781,8 @@ def rollback(default_installer, checkpoints, config, plugins):
installer.rollback_checkpoints(checkpoints)
installer.restart()
-def _open_pem_file(cli_arg_path, pem_path):
+
+def _open_pem_file(cli_arg_path: str, pem_path: str) -> Tuple[IO, str]:
"""Open a pem file.
If cli_arg_path was set by the client, open that.
@@ -759,10 +800,11 @@ def _open_pem_file(cli_arg_path, pem_path):
uniq = util.unique_file(pem_path, 0o644, "wb")
return uniq[0], os.path.abspath(uniq[1])
-def _save_chain(chain_pem, chain_file):
+
+def _save_chain(chain_pem: bytes, chain_file: IO) -> None:
"""Saves chain_pem at a unique path based on chain_path.
- :param str chain_pem: certificate chain in PEM format
+ :param bytes chain_pem: certificate chain in PEM format
:param str chain_file: chain file object
"""
diff --git a/certbot/certbot/_internal/constants.py b/certbot/certbot/_internal/constants.py
index f7695670d..c2b4e6556 100644
--- a/certbot/certbot/_internal/constants.py
+++ b/certbot/certbot/_internal/constants.py
@@ -1,5 +1,7 @@
"""Certbot constants."""
import logging
+from typing import Any
+from typing import Dict
import pkg_resources
@@ -13,7 +15,7 @@ SETUPTOOLS_PLUGINS_ENTRY_POINT = "certbot.plugins"
OLD_SETUPTOOLS_PLUGINS_ENTRY_POINT = "letsencrypt.plugins"
"""Plugins Setuptools entry point before rename."""
-CLI_DEFAULTS = dict(
+CLI_DEFAULTS: Dict[str, Any] = dict(
config_files=[
os.path.join(misc.get_default_folder('config'), 'cli.ini'),
# https://freedesktop.org/wiki/Software/xdg-user-dirs/
diff --git a/certbot/certbot/_internal/eff.py b/certbot/certbot/_internal/eff.py
index cf07a3d44..2f3926895 100644
--- a/certbot/certbot/_internal/eff.py
+++ b/certbot/certbot/_internal/eff.py
@@ -1,5 +1,6 @@
"""Subscribes users to the EFF newsletter."""
import logging
+from typing import cast
from typing import Optional
import requests
@@ -32,16 +33,18 @@ def prepare_subscription(config: configuration.NamespaceConfig, acc: Account) ->
if config.email is None:
_report_failure("you didn't provide an e-mail address")
else:
- acc.meta = acc.meta.update(register_to_eff=config.email)
+ # TODO: Remove cast when https://github.com/certbot/certbot/pull/9073 is merged.
+ acc.meta = cast(Account.Meta, acc.meta.update(register_to_eff=config.email))
elif config.email and _want_subscription():
- acc.meta = acc.meta.update(register_to_eff=config.email)
+ # TODO: Remove cast when https://github.com/certbot/certbot/pull/9073 is merged.
+ acc.meta = cast(Account.Meta, acc.meta.update(register_to_eff=config.email))
if acc.meta.register_to_eff:
storage = AccountFileStorage(config)
storage.update_meta(acc)
-def handle_subscription(config: configuration.NamespaceConfig, acc: Account) -> None:
+def handle_subscription(config: configuration.NamespaceConfig, acc: Optional[Account]) -> None:
"""High level function to take care of EFF newsletter subscriptions.
Once subscription is handled, it will not be handled again.
@@ -50,12 +53,14 @@ def handle_subscription(config: configuration.NamespaceConfig, acc: Account) ->
:param Account acc: Current client account.
"""
- if config.dry_run:
+ if config.dry_run or not acc:
return
if acc.meta.register_to_eff:
- subscribe(acc.meta.register_to_eff)
+ # TODO: Remove cast when https://github.com/certbot/certbot/pull/9073 is merged.
+ subscribe(cast(str, acc.meta.register_to_eff))
- acc.meta = acc.meta.update(register_to_eff=None)
+ # TODO: Remove cast when https://github.com/certbot/certbot/pull/9073 is merged.
+ acc.meta = cast(Account.Meta, acc.meta.update(register_to_eff=None))
storage = AccountFileStorage(config)
storage.update_meta(acc)
diff --git a/certbot/certbot/_internal/error_handler.py b/certbot/certbot/_internal/error_handler.py
index 64aad155e..0e63d02de 100644
--- a/certbot/certbot/_internal/error_handler.py
+++ b/certbot/certbot/_internal/error_handler.py
@@ -3,10 +3,13 @@ import functools
import logging
import signal
import traceback
+from types import TracebackType
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
+from typing import Optional
+from typing import Type
from typing import Union
from certbot import errors
@@ -74,7 +77,7 @@ class ErrorHandler:
deferred until they finish.
"""
- def __init__(self, func, *args, **kwargs):
+ def __init__(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
self.call_on_regular_exit = False
self.body_executed = False
self.funcs: List[Callable[[], Any]] = []
@@ -83,11 +86,13 @@ class ErrorHandler:
if func is not None:
self.register(func, *args, **kwargs)
- def __enter__(self):
+ def __enter__(self) -> None:
self.body_executed = False
self._set_signal_handlers()
- def __exit__(self, exec_type, exec_value, trace):
+ def __exit__(self, exec_type: Optional[Type[BaseException]],
+ exec_value: Optional[BaseException],
+ trace: Optional[TracebackType]) -> bool:
self.body_executed = True
retval = False
# SystemExit is ignored to properly handle forks that don't exec
@@ -108,7 +113,7 @@ class ErrorHandler:
self._call_signals()
return retval
- def register(self, func: Callable, *args: Any, **kwargs: Any) -> None:
+ def register(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
"""Sets func to be run with the given arguments during cleanup.
:param function func: function to be called in case of an error
@@ -116,7 +121,7 @@ class ErrorHandler:
"""
self.funcs.append(functools.partial(func, *args, **kwargs))
- def _call_registered(self):
+ def _call_registered(self) -> None:
"""Calls all registered functions"""
logger.debug("Calling registered functions")
while self.funcs:
@@ -128,7 +133,7 @@ class ErrorHandler:
''.join(output).rstrip())
self.funcs.pop()
- def _set_signal_handlers(self):
+ def _set_signal_handlers(self) -> None:
"""Sets signal handlers for signals in _SIGNALS."""
for signum in _SIGNALS:
prev_handler = signal.getsignal(signum)
@@ -137,13 +142,13 @@ class ErrorHandler:
self.prev_handlers[signum] = prev_handler
signal.signal(signum, self._signal_handler)
- def _reset_signal_handlers(self):
+ def _reset_signal_handlers(self) -> None:
"""Resets signal handlers for signals in _SIGNALS."""
for signum, handler in self.prev_handlers.items():
signal.signal(signum, handler)
self.prev_handlers.clear()
- def _signal_handler(self, signum, unused_frame):
+ def _signal_handler(self, signum: int, unused_frame: Any) -> None:
"""Replacement function for handling received signals.
Store the received signal. If we are executing the code block in
@@ -156,7 +161,7 @@ class ErrorHandler:
if not self.body_executed:
raise errors.SignalExit
- def _call_signals(self):
+ def _call_signals(self) -> None:
"""Finally call the deferred signals."""
for signum in self.received_signals:
logger.debug("Calling signal %s", signum)
@@ -169,6 +174,6 @@ class ExitHandler(ErrorHandler):
In addition to cleaning up on all signals, also cleans up on
regular exit.
"""
- def __init__(self, func, *args, **kwargs):
+ def __init__(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
ErrorHandler.__init__(self, func, *args, **kwargs)
self.call_on_regular_exit = True
diff --git a/certbot/certbot/_internal/hooks.py b/certbot/certbot/_internal/hooks.py
index e4975f1c8..813f7f6bd 100644
--- a/certbot/certbot/_internal/hooks.py
+++ b/certbot/certbot/_internal/hooks.py
@@ -2,8 +2,10 @@
import logging
from typing import List
+from typing import Optional
from typing import Set
+from certbot import configuration
from certbot import errors
from certbot import util
from certbot.compat import filesystem
@@ -15,7 +17,7 @@ from certbot.plugins import util as plug_util
logger = logging.getLogger(__name__)
-def validate_hooks(config):
+def validate_hooks(config: configuration.NamespaceConfig) -> None:
"""Check hook commands are executable."""
validate_hook(config.pre_hook, "pre")
validate_hook(config.post_hook, "post")
@@ -23,7 +25,7 @@ def validate_hooks(config):
validate_hook(config.renew_hook, "renew")
-def _prog(shell_cmd):
+def _prog(shell_cmd: str) -> Optional[str]:
"""Extract the program run by a shell command.
:param str shell_cmd: command to be executed
@@ -36,10 +38,11 @@ def _prog(shell_cmd):
plug_util.path_surgery(shell_cmd)
if not util.exe_exists(shell_cmd):
return None
+
return os.path.basename(shell_cmd)
-def validate_hook(shell_cmd, hook_name):
+def validate_hook(shell_cmd: str, hook_name: str) -> None:
"""Check that a command provided as a hook is plausibly executable.
:raises .errors.HookCommandNotFound: if the command is not found
@@ -57,7 +60,7 @@ def validate_hook(shell_cmd, hook_name):
raise errors.HookCommandNotFound(msg)
-def pre_hook(config):
+def pre_hook(config: configuration.NamespaceConfig) -> None:
"""Run pre-hooks if they exist and haven't already been run.
When Certbot is running with the renew subcommand, this function
@@ -81,7 +84,7 @@ def pre_hook(config):
executed_pre_hooks: Set[str] = set()
-def _run_pre_hook_if_necessary(command):
+def _run_pre_hook_if_necessary(command: str) -> None:
"""Run the specified pre-hook if we haven't already.
If we've already run this exact command before, a message is logged
@@ -97,7 +100,7 @@ def _run_pre_hook_if_necessary(command):
executed_pre_hooks.add(command)
-def post_hook(config):
+def post_hook(config: configuration.NamespaceConfig) -> None:
"""Run post-hooks if defined.
This function also registers any executables found in
@@ -131,7 +134,7 @@ def post_hook(config):
post_hooks: List[str] = []
-def _run_eventually(command):
+def _run_eventually(command: str) -> None:
"""Registers a post-hook to be run eventually.
All commands given to this function will be run exactly once in the
@@ -144,13 +147,14 @@ def _run_eventually(command):
post_hooks.append(command)
-def run_saved_post_hooks():
+def run_saved_post_hooks() -> None:
"""Run any post hooks that were saved up in the course of the 'renew' verb"""
for cmd in post_hooks:
_run_hook("post-hook", cmd)
-def deploy_hook(config, domains, lineage_path):
+def deploy_hook(config: configuration.NamespaceConfig, domains: List[str],
+ lineage_path: str) -> None:
"""Run post-issuance hook if defined.
:param configuration.NamespaceConfig config: Certbot settings
@@ -164,7 +168,8 @@ def deploy_hook(config, domains, lineage_path):
lineage_path, config.dry_run)
-def renew_hook(config, domains, lineage_path):
+def renew_hook(config: configuration.NamespaceConfig, domains: List[str],
+ lineage_path: str) -> None:
"""Run post-renewal hooks.
This function runs any hooks found in
@@ -196,7 +201,7 @@ def renew_hook(config, domains, lineage_path):
lineage_path, config.dry_run)
-def _run_deploy_hook(command, domains, lineage_path, dry_run):
+def _run_deploy_hook(command: str, domains: List[str], lineage_path: str, dry_run: bool) -> None:
"""Run the specified deploy-hook (if not doing a dry run).
If dry_run is True, command is not run and a message is logged
@@ -220,7 +225,7 @@ def _run_deploy_hook(command, domains, lineage_path, dry_run):
_run_hook("deploy-hook", command)
-def _run_hook(cmd_name, shell_cmd):
+def _run_hook(cmd_name: str, shell_cmd: str) -> str:
"""Run a hook command.
:param str cmd_name: the user facing name of the hook being run
@@ -234,7 +239,7 @@ def _run_hook(cmd_name, shell_cmd):
return err
-def list_hooks(dir_path):
+def list_hooks(dir_path: str) -> List[str]:
"""List paths to all hooks found in dir_path in sorted order.
:param str dir_path: directory to search
diff --git a/certbot/certbot/_internal/lock.py b/certbot/certbot/_internal/lock.py
index 80b79d993..95c44856f 100644
--- a/certbot/certbot/_internal/lock.py
+++ b/certbot/certbot/_internal/lock.py
@@ -89,10 +89,10 @@ class _BaseLockMechanism:
"""
return self._fd is not None
- def acquire(self): # pylint: disable=missing-function-docstring
+ def acquire(self) -> None: # pylint: disable=missing-function-docstring
pass # pragma: no cover
- def release(self): # pylint: disable=missing-function-docstring
+ def release(self) -> None: # pylint: disable=missing-function-docstring
pass # pragma: no cover
@@ -143,7 +143,8 @@ class _UnixLockMechanism(_BaseLockMechanism):
# Normally os module should not be imported in certbot codebase except in certbot.compat
# for the sake of compatibility over Windows and Linux.
# We make an exception here, since _lock_success is private and called only on Linux.
- from os import stat, fstat # pylint: disable=os-module-forbidden
+ from os import fstat # pylint: disable=os-module-forbidden
+ from os import stat # pylint: disable=os-module-forbidden
try:
stat1 = stat(self._path)
except OSError as err:
@@ -196,7 +197,7 @@ class _WindowsLockMechanism(_BaseLockMechanism):
Consequently, mscvrt.locking is sufficient to obtain an effective lock, and the race
condition encountered on Linux is not possible on Windows, leading to a simpler workflow.
"""
- def acquire(self):
+ def acquire(self) -> None:
"""Acquire the lock"""
open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
@@ -220,7 +221,7 @@ class _WindowsLockMechanism(_BaseLockMechanism):
self._fd = fd
- def release(self):
+ def release(self) -> None:
"""Release the lock."""
try:
if not self._fd:
diff --git a/certbot/certbot/_internal/log.py b/certbot/certbot/_internal/log.py
index 12b144d54..e168158d9 100644
--- a/certbot/certbot/_internal/log.py
+++ b/certbot/certbot/_internal/log.py
@@ -29,9 +29,14 @@ import sys
import tempfile
import traceback
from types import TracebackType
+from typing import Any
from typing import IO
+from typing import Optional
+from typing import Tuple
+from typing import Type
from acme import messages
+from certbot import configuration
from certbot import errors
from certbot import util
from certbot._internal import constants
@@ -45,7 +50,7 @@ FILE_FMT = "%(asctime)s:%(levelname)s:%(name)s:%(message)s"
logger = logging.getLogger(__name__)
-def pre_arg_parse_setup():
+def pre_arg_parse_setup() -> None:
"""Setup logging before command line arguments are parsed.
Terminal logging is setup using
@@ -85,7 +90,7 @@ def pre_arg_parse_setup():
log_path=temp_handler.path)
-def post_arg_parse_setup(config):
+def post_arg_parse_setup(config: configuration.NamespaceConfig) -> None:
"""Setup logging after command line arguments are parsed.
This function assumes `pre_arg_parse_setup` was called earlier and
@@ -137,7 +142,8 @@ def post_arg_parse_setup(config):
debug=config.debug, quiet=config.quiet, log_path=file_path)
-def setup_log_file_handler(config, logfile, fmt):
+def setup_log_file_handler(config: configuration.NamespaceConfig, logfile: str,
+ fmt: str) -> Tuple[logging.Handler, str]:
"""Setup file debug logging.
:param certbot.configuration.NamespaceConfig config: Configuration object
@@ -179,13 +185,13 @@ class ColoredStreamHandler(logging.StreamHandler):
:ivar bool red_level: The level at which to output
"""
- def __init__(self, stream=None):
+ def __init__(self, stream: Optional[IO] = None) -> None:
super().__init__(stream)
self.colored = (sys.stderr.isatty() if stream is None else
stream.isatty())
self.red_level = logging.WARNING
- def format(self, record):
+ def format(self, record: logging.LogRecord) -> str:
"""Formats the string representation of record.
:param logging.LogRecord record: Record to be formatted
@@ -207,11 +213,12 @@ class MemoryHandler(logging.handlers.MemoryHandler):
only happens when flush(force=True) is called.
"""
- def __init__(self, target=None, capacity=10000):
+ def __init__(self, target: Optional[logging.Handler] = None,
+ capacity: int = 10000) -> None:
# capacity doesn't matter because should_flush() is overridden
super().__init__(capacity, target=target)
- def close(self):
+ def close(self) -> None:
"""Close the memory handler, but don't set the target to None."""
# This allows the logging module which may only have a weak
# reference to the target handler to properly flush and close it.
@@ -219,7 +226,7 @@ class MemoryHandler(logging.handlers.MemoryHandler):
super().close()
self.target = target
- def flush(self, force=False): # pylint: disable=arguments-differ
+ def flush(self, force: bool = False) -> None: # pylint: disable=arguments-differ
"""Flush the buffer if force=True.
If force=False, this call is a noop.
@@ -232,7 +239,7 @@ class MemoryHandler(logging.handlers.MemoryHandler):
if force:
super().flush()
- def shouldFlush(self, record):
+ def shouldFlush(self, record: logging.LogRecord) -> bool:
"""Should the buffer be automatically flushed?
:param logging.LogRecord record: log record to be considered
@@ -254,7 +261,7 @@ class TempHandler(logging.StreamHandler):
:ivar str path: file system path to the temporary log file
"""
- def __init__(self):
+ def __init__(self) -> None:
self._workdir = tempfile.mkdtemp()
self.path = os.path.join(self._workdir, 'log')
stream = util.safe_open(self.path, mode='w', chmod=0o600)
@@ -264,7 +271,7 @@ class TempHandler(logging.StreamHandler):
self.stream: IO[str]
self._delete = True
- def emit(self, record):
+ def emit(self, record: logging.LogRecord) -> None:
"""Log the specified logging record.
:param logging.LogRecord record: Record to be formatted
@@ -273,7 +280,7 @@ class TempHandler(logging.StreamHandler):
self._delete = False
super().emit(record)
- def close(self):
+ def close(self) -> None:
"""Close the handler and the temporary log file.
The temporary log file is deleted if it wasn't used.
@@ -292,7 +299,8 @@ class TempHandler(logging.StreamHandler):
self.release()
-def pre_arg_parse_except_hook(memory_handler, *args, **kwargs):
+def pre_arg_parse_except_hook(memory_handler: MemoryHandler,
+ *args: Any, **kwargs: Any) -> None:
"""A simple wrapper around post_arg_parse_except_hook.
The additional functionality provided by this wrapper is the memory
@@ -319,8 +327,9 @@ def pre_arg_parse_except_hook(memory_handler, *args, **kwargs):
memory_handler.flush(force=True)
-def post_arg_parse_except_hook(exc_type: type, exc_value: BaseException, trace: TracebackType,
- debug: bool, quiet: bool, log_path: str):
+def post_arg_parse_except_hook(exc_type: Type[BaseException], exc_value: BaseException,
+ trace: TracebackType, debug: bool, quiet: bool,
+ log_path: str) -> None:
"""Logs fatal exceptions and reports them to the user.
If debug is True, the full exception and traceback is shown to the
@@ -369,7 +378,7 @@ def post_arg_parse_except_hook(exc_type: type, exc_value: BaseException, trace:
exit_func()
-def exit_with_advice(log_path: str):
+def exit_with_advice(log_path: str) -> None:
"""Print a link to the community forums, the debug log path, and exit
The message is printed to stderr and the program will exit with a
diff --git a/certbot/certbot/_internal/main.py b/certbot/certbot/_internal/main.py
index 3301c72c8..9c29b9560 100644
--- a/certbot/certbot/_internal/main.py
+++ b/certbot/certbot/_internal/main.py
@@ -11,6 +11,7 @@ from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple
+from typing import TypeVar
from typing import Union
import configobj
@@ -18,6 +19,7 @@ import josepy as jose
import zope.component
import zope.interface
+from acme import client as acme_client
from acme import errors as acme_errors
import certbot
from certbot import configuration
@@ -56,7 +58,7 @@ USER_CANCELLED = ("User chose to cancel the operation and may "
logger = logging.getLogger(__name__)
-def _suggest_donation_if_appropriate(config):
+def _suggest_donation_if_appropriate(config: configuration.NamespaceConfig) -> None:
"""Potentially suggest a donation to support Certbot.
:param config: Configuration object
@@ -82,7 +84,10 @@ def _suggest_donation_if_appropriate(config):
)
-def _get_and_save_cert(le_client, config, domains=None, certname=None, lineage=None):
+def _get_and_save_cert(le_client: client.Client, config: configuration.NamespaceConfig,
+ domains: Optional[List[str]] = None, certname: Optional[str] = None,
+ lineage: Optional[storage.RenewableCert] = None
+ ) -> Optional[storage.RenewableCert]:
"""Authenticate and enroll certificate.
This method finds the relevant lineage, figures out what to do with it,
@@ -115,14 +120,15 @@ def _get_and_save_cert(le_client, config, domains=None, certname=None, lineage=N
display_util.notify(
"{action} for {domains}".format(
action="Simulating renewal of an existing certificate"
- if config.dry_run else "Renewing an existing certificate",
+ if config.dry_run else "Renewing an existing certificate",
domains=internal_display_util.summarize_domain_list(domains or lineage.names())
)
)
renewal.renew_cert(config, domains, le_client, lineage)
else:
# TREAT AS NEW REQUEST
- assert domains is not None
+ if domains is None:
+ raise errors.Error("Domain list cannot be none if the lineage is not set.")
display_util.notify(
"{action} for {domains}".format(
action="Simulating a certificate request" if config.dry_run else
@@ -164,7 +170,7 @@ def _handle_unexpected_key_type_migration(config: configuration.NamespaceConfig,
def _handle_subset_cert_request(config: configuration.NamespaceConfig,
- domains: List[str],
+ domains: Iterable[str],
cert: storage.RenewableCert
) -> Tuple[str, Optional[storage.RenewableCert]]:
"""Figure out what to do if a previous cert had a subset of the names now requested
@@ -264,7 +270,8 @@ def _handle_identical_cert_request(config: configuration.NamespaceConfig,
raise AssertionError('This is impossible')
-def _find_lineage_for_domains(config, domains):
+def _find_lineage_for_domains(config: configuration.NamespaceConfig, domains: List[str]
+ ) -> Tuple[Optional[str], Optional[storage.RenewableCert]]:
"""Determine whether there are duplicated names and how to handle
them (renew, reinstall, newcert, or raising an error to stop
the client run if the user chooses to cancel the operation when
@@ -304,7 +311,8 @@ def _find_lineage_for_domains(config, domains):
return None, None
-def _find_cert(config, domains, certname):
+def _find_cert(config: configuration.NamespaceConfig, domains: List[str], certname: str
+ ) -> Tuple[bool, Optional[storage.RenewableCert]]:
"""Finds an existing certificate object given domains and/or a certificate name.
:param config: Configuration object
@@ -328,10 +336,9 @@ def _find_cert(config, domains, certname):
return (action != "reinstall"), lineage
-def _find_lineage_for_domains_and_certname(config: configuration.NamespaceConfig,
- domains: List[str],
- certname: str
- ) -> Tuple[str, Optional[storage.RenewableCert]]:
+def _find_lineage_for_domains_and_certname(
+ config: configuration.NamespaceConfig, domains: List[str],
+ certname: str) -> Tuple[Optional[str], Optional[storage.RenewableCert]]:
"""Find appropriate lineage based on given domains and/or certname.
:param config: Configuration object
@@ -357,7 +364,8 @@ def _find_lineage_for_domains_and_certname(config: configuration.NamespaceConfig
lineage = cert_manager.lineage_for_certname(config, certname)
if lineage:
if domains:
- if set(cert_manager.domains_for_certname(config, certname)) != set(domains):
+ computed_domains = cert_manager.domains_for_certname(config, certname)
+ if computed_domains and set(computed_domains) != set(domains):
_handle_unexpected_key_type_migration(config, lineage)
_ask_user_to_confirm_new_names(config, domains, certname,
lineage.names()) # raises if no
@@ -367,10 +375,14 @@ def _find_lineage_for_domains_and_certname(config: configuration.NamespaceConfig
elif domains:
return "newcert", None
raise errors.ConfigurationError("No certificate with name {0} found. "
- "Use -d to specify domains, or run certbot certificates to see "
- "possible certificate names.".format(certname))
+ "Use -d to specify domains, or run certbot certificates to see "
+ "possible certificate names.".format(certname))
-def _get_added_removed(after, before):
+
+T = TypeVar("T")
+
+
+def _get_added_removed(after: Iterable[T], before: Iterable[T]) -> Tuple[List[T], List[T]]:
"""Get lists of items removed from `before`
and a lists of items added to `after`
"""
@@ -380,7 +392,8 @@ def _get_added_removed(after, before):
removed.sort()
return added, removed
-def _format_list(character, strings):
+
+def _format_list(character: str, strings: Iterable[str]) -> str:
"""Format list with given character
"""
if not strings:
@@ -392,7 +405,10 @@ def _format_list(character, strings):
br=os.linesep
)
-def _ask_user_to_confirm_new_names(config, new_domains, certname, old_domains):
+
+def _ask_user_to_confirm_new_names(config: configuration.NamespaceConfig,
+ new_domains: Iterable[str], certname: str,
+ old_domains: Iterable[str]) -> None:
"""Ask user to confirm update cert certname to contain new_domains.
:param config: Configuration object
@@ -429,7 +445,9 @@ def _ask_user_to_confirm_new_names(config, new_domains, certname, old_domains):
raise errors.ConfigurationError("Specified mismatched certificate name and domains.")
-def _find_domains_or_certname(config, installer, question=None):
+def _find_domains_or_certname(config: configuration.NamespaceConfig,
+ installer: Optional[interfaces.Installer],
+ question: Optional[str] = None) -> Tuple[List[str], str]:
"""Retrieve domains and certname from config or user input.
:param config: Configuration object
@@ -596,7 +614,7 @@ def _is_interactive_only_auth(config: configuration.NamespaceConfig) -> bool:
def _csr_report_new_cert(config: configuration.NamespaceConfig, cert_path: Optional[str],
- chain_path: Optional[str], fullchain_path: Optional[str]):
+ chain_path: Optional[str], fullchain_path: Optional[str]) -> None:
""" --csr variant of _report_new_cert.
Until --csr is overhauled (#8332) this is transitional function to report the creation
@@ -633,7 +651,9 @@ def _csr_report_new_cert(config: configuration.NamespaceConfig, cert_path: Optio
)
-def _determine_account(config):
+def _determine_account(config: configuration.NamespaceConfig
+ ) -> Tuple[account.Account,
+ Optional[acme_client.ClientV2]]:
"""Determine which account to use.
If ``config.account`` is ``None``, it will be updated based on the
@@ -649,9 +669,9 @@ def _determine_account(config):
:raises errors.Error: If unable to register an account with ACME server
"""
- def _tos_cb(terms_of_service):
+ def _tos_cb(terms_of_service: str) -> None:
if config.tos:
- return True
+ return
msg = ("Please read the Terms of Service at {0}. You "
"must agree in order to register with the ACME "
"server. Do you agree?".format(terms_of_service))
@@ -660,17 +680,19 @@ def _determine_account(config):
raise errors.Error(
"Registration cannot proceed without accepting "
"Terms of Service.")
- return None
account_storage = account.AccountFileStorage(config)
- acme = None
+ acme: Optional[acme_client.ClientV2] = None
if config.account is not None:
acc = account_storage.load(config.account)
else:
accounts = account_storage.find_all()
if len(accounts) > 1:
- acc = display_ops.choose_account(accounts)
+ potential_acc = display_ops.choose_account(accounts)
+ if not potential_acc:
+ raise errors.Error("No account has been chosen.")
+ acc = potential_acc
elif len(accounts) == 1:
acc = accounts[0]
else: # no account registered yet
@@ -691,7 +713,7 @@ def _determine_account(config):
return acc, acme
-def _delete_if_appropriate(config):
+def _delete_if_appropriate(config: configuration.NamespaceConfig) -> None:
"""Does the user want to delete their now-revoked certs? If run in non-interactive mode,
deleting happens automatically.
@@ -729,21 +751,23 @@ def _delete_if_appropriate(config):
config, config.certname)
try:
cert_manager.match_and_check_overlaps(config, [lambda x: archive_dir],
- lambda x: x.archive_dir, lambda x: x)
+ lambda x: x.archive_dir, lambda x: x.lineagename)
except errors.OverlappingMatchFound:
logger.warning("Not deleting revoked certificates due to overlapping archive dirs. "
"More than one certificate is using %s", archive_dir)
return
except Exception as e:
msg = ('config.default_archive_dir: {0}, config.live_dir: {1}, archive_dir: {2},'
- 'original exception: {3}')
+ 'original exception: {3}')
msg = msg.format(config.default_archive_dir, config.live_dir, archive_dir, e)
raise errors.Error(msg)
cert_manager.delete(config)
-def _init_le_client(config, authenticator, installer):
+def _init_le_client(config: configuration.NamespaceConfig,
+ authenticator: Optional[interfaces.Authenticator],
+ installer: Optional[interfaces.Installer]) -> client.Client:
"""Initialize Let's Encrypt Client
:param config: Configuration object
@@ -758,19 +782,19 @@ def _init_le_client(config, authenticator, installer):
:rtype: client.Client
"""
+ acc: Optional[account.Account]
if authenticator is not None:
# if authenticator was given, then we will need account...
acc, acme = _determine_account(config)
logger.debug("Picked account: %r", acc)
- # XXX
- #crypto_util.validate_key_csr(acc.key)
else:
acc, acme = None, None
return client.Client(config, acc, authenticator, installer, acme=acme)
-def unregister(config, unused_plugins):
+def unregister(config: configuration.NamespaceConfig,
+ unused_plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
"""Deactivate account on server
:param config: Configuration object
@@ -779,8 +803,8 @@ def unregister(config, unused_plugins):
:param unused_plugins: List of plugins (deprecated)
:type unused_plugins: plugins_disco.PluginsRegistry
- :returns: `None`
- :rtype: None
+ :returns: `None` or a string indicating an error
+ :rtype: None or str
"""
account_storage = account.AccountFileStorage(config)
@@ -799,6 +823,9 @@ def unregister(config, unused_plugins):
acc, acme = _determine_account(config)
cb_client = client.Client(config, acc, None, None, acme=acme)
+ if not cb_client.acme:
+ raise errors.Error("ACME client is not set.")
+
# delete on boulder
cb_client.acme.deactivate_registration(acc.regr)
account_files = account.AccountFileStorage(config)
@@ -809,7 +836,8 @@ def unregister(config, unused_plugins):
return None
-def register(config, unused_plugins):
+def register(config: configuration.NamespaceConfig,
+ unused_plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
"""Create accounts on the server.
:param config: Configuration object
@@ -839,7 +867,8 @@ def register(config, unused_plugins):
return None
-def update_account(config, unused_plugins):
+def update_account(config: configuration.NamespaceConfig,
+ unused_plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
"""Modify accounts on the server.
:param config: Configuration object
@@ -864,8 +893,11 @@ def update_account(config, unused_plugins):
acc, acme = _determine_account(config)
cb_client = client.Client(config, acc, None, None, acme=acme)
- # Empty list of contacts in case the user is removing all emails
+ if not cb_client.acme:
+ raise errors.Error("ACME client is not set.")
+
+ # Empty list of contacts in case the user is removing all emails
acc_contacts: Iterable[str] = ()
if config.email:
acc_contacts = ['mailto:' + email for email in config.email.split(',')]
@@ -904,7 +936,8 @@ def _cert_name_from_config_or_lineage(config: configuration.NamespaceConfig,
return None
-def _install_cert(config, le_client, domains, lineage=None):
+def _install_cert(config: configuration.NamespaceConfig, le_client: client.Client,
+ domains: List[str], lineage: Optional[storage.RenewableCert] = None) -> None:
"""Install a cert
:param config: Configuration object
@@ -923,7 +956,8 @@ def _install_cert(config, le_client, domains, lineage=None):
:rtype: None
"""
- path_provider = lineage if lineage else config
+ path_provider: Union[storage.RenewableCert,
+ configuration.NamespaceConfig] = lineage if lineage else config
assert path_provider.cert_path is not None
le_client.deploy_certificate(domains, path_provider.key_path, path_provider.cert_path,
@@ -931,7 +965,8 @@ def _install_cert(config, le_client, domains, lineage=None):
le_client.enhance_config(domains, path_provider.chain_path)
-def install(config, plugins):
+def install(config: configuration.NamespaceConfig,
+ plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
"""Install a previously obtained cert in a server.
:param config: Configuration object
@@ -940,8 +975,8 @@ def install(config, plugins):
:param plugins: List of plugins
:type plugins: plugins_disco.PluginsRegistry
- :returns: `None`
- :rtype: None
+ :returns: `None` or the error message
+ :rtype: None or str
"""
# XXX: Update for renewer/RenewableCert
@@ -990,7 +1025,8 @@ def install(config, plugins):
return None
-def _populate_from_certname(config):
+
+def _populate_from_certname(config: configuration.NamespaceConfig) -> configuration.NamespaceConfig:
"""Helper function for install to populate missing config values from lineage
defined by --cert-name."""
@@ -1007,14 +1043,18 @@ def _populate_from_certname(config):
config.namespace.fullchain_path = lineage.fullchain_path
return config
-def _check_certificate_and_key(config):
+
+def _check_certificate_and_key(config: configuration.NamespaceConfig) -> None:
if not os.path.isfile(filesystem.realpath(config.cert_path)):
raise errors.ConfigurationError("Error while reading certificate from path "
"{0}".format(config.cert_path))
if not os.path.isfile(filesystem.realpath(config.key_path)):
raise errors.ConfigurationError("Error while reading private key from path "
"{0}".format(config.key_path))
-def plugins_cmd(config, plugins):
+
+
+def plugins_cmd(config: configuration.NamespaceConfig,
+ plugins: plugins_disco.PluginsRegistry) -> None:
"""List server software plugins.
:param config: Configuration object
@@ -1052,7 +1092,8 @@ def plugins_cmd(config, plugins):
notify(str(available))
-def enhance(config, plugins):
+def enhance(config: configuration.NamespaceConfig,
+ plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
"""Add security enhancements to existing configuration
:param config: Configuration object
@@ -1061,8 +1102,8 @@ def enhance(config, plugins):
:param plugins: List of plugins
:type plugins: plugins_disco.PluginsRegistry
- :returns: `None`
- :rtype: None
+ :returns: `None` or a string indicating an error
+ :rtype: None or str
"""
supported_enhancements = ["hsts", "redirect", "uir", "staple"]
@@ -1089,6 +1130,8 @@ def enhance(config, plugins):
config, "enhance", allow_multiple=False,
custom_prompt=certname_question)[0]
cert_domains = cert_manager.domains_for_certname(config, config.certname)
+ if cert_domains is None:
+ raise errors.Error("Could not find the list of domains for the given certificate name.")
if config.noninteractive_mode:
domains = cert_domains
else:
@@ -1100,6 +1143,8 @@ def enhance(config, plugins):
"defined, exiting.")
lineage = cert_manager.lineage_for_certname(config, config.certname)
+ if not lineage:
+ raise errors.Error("Could not find the lineage for the given certificate name.")
if not config.chain_path:
config.chain_path = lineage.chain_path
if oldstyle_enh:
@@ -1111,7 +1156,7 @@ def enhance(config, plugins):
return None
-def rollback(config, plugins):
+def rollback(config: configuration.NamespaceConfig, plugins: plugins_disco.PluginsRegistry) -> None:
"""Rollback server configuration changes made during install.
:param config: Configuration object
@@ -1126,7 +1171,9 @@ def rollback(config, plugins):
"""
client.rollback(config.installer, config.checkpoints, config, plugins)
-def update_symlinks(config, unused_plugins):
+
+def update_symlinks(config: configuration.NamespaceConfig,
+ unused_plugins: plugins_disco.PluginsRegistry) -> None:
"""Update the certificate file family symlinks
Use the information in the config file to make symlinks point to
@@ -1144,7 +1191,9 @@ def update_symlinks(config, unused_plugins):
"""
cert_manager.update_live_symlinks(config)
-def rename(config, unused_plugins):
+
+def rename(config: configuration.NamespaceConfig,
+ unused_plugins: plugins_disco.PluginsRegistry) -> None:
"""Rename a certificate
Use the information in the config file to rename an existing
@@ -1162,7 +1211,9 @@ def rename(config, unused_plugins):
"""
cert_manager.rename_lineage(config)
-def delete(config, unused_plugins):
+
+def delete(config: configuration.NamespaceConfig,
+ unused_plugins: plugins_disco.PluginsRegistry) -> None:
"""Delete a certificate
Use the information in the config file to delete an existing
@@ -1181,7 +1232,8 @@ def delete(config, unused_plugins):
cert_manager.delete(config)
-def certificates(config, unused_plugins):
+def certificates(config: configuration.NamespaceConfig,
+ unused_plugins: plugins_disco.PluginsRegistry) -> None:
"""Display information about certs configured with Certbot
:param config: Configuration object
@@ -1197,7 +1249,8 @@ def certificates(config, unused_plugins):
cert_manager.certificates(config)
-def revoke(config, unused_plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
+def revoke(config: configuration.NamespaceConfig,
+ unused_plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
"""Revoke a previously obtained certificate.
:param config: Configuration object
@@ -1251,7 +1304,8 @@ def revoke(config, unused_plugins: plugins_disco.PluginsRegistry) -> Optional[st
return None
-def run(config, plugins):
+def run(config: configuration.NamespaceConfig,
+ plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
"""Obtain a certificate and install.
:param config: Configuration object
@@ -1361,7 +1415,8 @@ def _csr_get_and_save_cert(config: configuration.NamespaceConfig,
return cert_path, chain_path, fullchain_path
-def renew_cert(config, plugins, lineage):
+def renew_cert(config: configuration.NamespaceConfig, plugins: plugins_disco.PluginsRegistry,
+ lineage: storage.RenewableCert) -> None:
"""Renew & save an existing cert. Do not install it.
:param config: Configuration object
@@ -1385,6 +1440,9 @@ def renew_cert(config, plugins, lineage):
renewed_lineage = _get_and_save_cert(le_client, config, lineage=lineage)
+ if not renewed_lineage:
+ raise errors.Error("An existing certificate for the given name could not be found.")
+
if installer and not config.dry_run:
# In case of a renewal, reload server to pick up new certificate.
updater.run_renewal_deployer(config, renewed_lineage, installer)
@@ -1392,7 +1450,7 @@ def renew_cert(config, plugins, lineage):
installer.restart()
-def certonly(config, plugins):
+def certonly(config: configuration.NamespaceConfig, plugins: plugins_disco.PluginsRegistry) -> None:
"""Authenticate & obtain cert, but do not install it.
This implements the 'certonly' subcommand.
@@ -1412,7 +1470,6 @@ def certonly(config, plugins):
# SETUP: Select plugins and construct a client instance
# installers are used in auth mode to determine domain names
installer, auth = plug_sel.choose_configurator_plugins(config, plugins, "certonly")
-
le_client = _init_le_client(config, auth, installer)
if config.csr:
@@ -1443,7 +1500,8 @@ def certonly(config, plugins):
eff.handle_subscription(config, le_client.account)
-def renew(config, unused_plugins):
+def renew(config: configuration.NamespaceConfig,
+ unused_plugins: plugins_disco.PluginsRegistry) -> None:
"""Renew previously-obtained certificates.
:param config: Configuration object
@@ -1462,7 +1520,7 @@ def renew(config, unused_plugins):
hooks.run_saved_post_hooks()
-def make_or_verify_needed_dirs(config):
+def make_or_verify_needed_dirs(config: configuration.NamespaceConfig) -> None:
"""Create or verify existence of config, work, and hook directories.
:param config: Configuration object
@@ -1514,7 +1572,7 @@ def make_displayer(config: configuration.NamespaceConfig
devnull.close()
-def main(cli_args=None):
+def main(cli_args: List[str] = None) -> Optional[Union[str, int]]:
"""Run Certbot.
:param cli_args: command line to Certbot, defaults to ``sys.argv[1:]``
diff --git a/certbot/certbot/_internal/renewal.py b/certbot/certbot/_internal/renewal.py
index 183e83ec0..91a38b11f 100644
--- a/certbot/certbot/_internal/renewal.py
+++ b/certbot/certbot/_internal/renewal.py
@@ -7,8 +7,13 @@ import random
import sys
import time
import traceback
+from typing import Any
+from typing import Dict
+from typing import Iterable
from typing import List
+from typing import Mapping
from typing import Optional
+from typing import Union
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
@@ -50,7 +55,8 @@ CONFIG_ITEMS = set(itertools.chain(
BOOL_CONFIG_ITEMS, INT_CONFIG_ITEMS, STR_CONFIG_ITEMS, ('pref_challs',)))
-def _reconstitute(config, full_path):
+def _reconstitute(config: configuration.NamespaceConfig,
+ full_path: str) -> Optional[storage.RenewableCert]:
"""Try to instantiate a RenewableCert, updating config with relevant items.
This is specifically for use in renewal and enforces several checks
@@ -108,7 +114,8 @@ def _reconstitute(config, full_path):
return renewal_candidate
-def _restore_webroot_config(config, renewalparams):
+def _restore_webroot_config(config: configuration.NamespaceConfig,
+ renewalparams: Mapping[str, Any]) -> None:
"""
webroot_map is, uniquely, a dict, and the general-purpose configuration
restoring logic is not able to correctly parse it from the serialized
@@ -125,7 +132,8 @@ def _restore_webroot_config(config, renewalparams):
config.webroot_path = wp
-def _restore_plugin_configs(config, renewalparams):
+def _restore_plugin_configs(config: configuration.NamespaceConfig,
+ renewalparams: Mapping[str, Any]) -> None:
"""Sets plugin specific values in config from renewalparams
:param configuration.NamespaceConfig config: configuration for the
@@ -168,7 +176,8 @@ def _restore_plugin_configs(config, renewalparams):
setattr(config, config_item, cast(config_value))
-def restore_required_config_elements(config, renewalparams):
+def restore_required_config_elements(config: configuration.NamespaceConfig,
+ renewalparams: Mapping[str, Any]) -> None:
"""Sets non-plugin specific values in config from renewalparams
:param configuration.NamespaceConfig config: configuration for the
@@ -189,7 +198,7 @@ def restore_required_config_elements(config, renewalparams):
setattr(config.namespace, item_name, value)
-def _remove_deprecated_config_elements(renewalparams):
+def _remove_deprecated_config_elements(renewalparams: Mapping[str, Any]) -> Dict[str, Any]:
"""Removes deprecated config options from the parsed renewalparams.
:param dict renewalparams: list of parsed renewalparams
@@ -202,7 +211,7 @@ def _remove_deprecated_config_elements(renewalparams):
if option_name not in cli.DEPRECATED_OPTIONS}
-def _restore_pref_challs(unused_name, value):
+def _restore_pref_challs(unused_name: str, value: Union[List[str], str]) -> List[str]:
"""Restores preferred challenges from a renewal config file.
If value is a `str`, it should be a single challenge type.
@@ -224,7 +233,7 @@ def _restore_pref_challs(unused_name, value):
return cli.parse_preferred_challenges(value)
-def _restore_bool(name, value):
+def _restore_bool(name: str, value: str) -> bool:
"""Restores a boolean key-value pair from a renewal config file.
:param str name: option name
@@ -243,7 +252,7 @@ def _restore_bool(name, value):
return lowercase_value == "true"
-def _restore_int(name, value):
+def _restore_int(name: str, value: str) -> int:
"""Restores an integer key-value pair from a renewal config file.
:param str name: option name
@@ -265,7 +274,7 @@ def _restore_int(name, value):
raise errors.Error("Expected a numeric value for {0}".format(name))
-def _restore_str(name, value):
+def _restore_str(name: str, value: str) -> Optional[str]:
"""Restores a string key-value pair from a renewal config file.
:param str name: option name
@@ -290,7 +299,7 @@ def _restore_str(name, value):
return None if value == "None" else value
-def should_renew(config, lineage):
+def should_renew(config: configuration.NamespaceConfig, lineage: storage.RenewableCert) -> bool:
"""Return true if any of the circumstances for automatic renewal apply."""
if config.renew_by_default:
logger.debug("Auto-renewal forced with --force-renewal...")
@@ -305,7 +314,8 @@ def should_renew(config, lineage):
return False
-def _avoid_invalidating_lineage(config, lineage, original_server):
+def _avoid_invalidating_lineage(config: configuration.NamespaceConfig,
+ lineage: storage.RenewableCert, original_server: str) -> None:
"""Do not renew a valid cert with one from a staging server!"""
if util.is_staging(config.server):
if not util.is_staging(original_server):
@@ -344,7 +354,7 @@ def renew_cert(config: configuration.NamespaceConfig, domains: Optional[List[str
hooks.renew_hook(config, domains, lineage.live_dir)
-def report(msgs, category):
+def report(msgs: Iterable[str], category: str) -> str:
"""Format a results report for a category of renewal outcomes"""
lines = ("%s (%s)" % (m, category) for m in msgs)
return " " + "\n ".join(lines)
@@ -398,7 +408,7 @@ def _renew_describe_results(config: configuration.NamespaceConfig, renew_success
notify(display_obj.SIDE_FRAME)
-def handle_renewal_request(config):
+def handle_renewal_request(config: configuration.NamespaceConfig) -> None:
"""Examine each lineage; renew if due and report results"""
# This is trivially False if config.domains is empty
@@ -448,7 +458,7 @@ def handle_renewal_request(config):
continue
try:
- if renewal_candidate is None:
+ if not renewal_candidate:
parse_failures.append(renewal_file)
else:
# This call is done only for retro-compatibility purposes.
@@ -490,7 +500,8 @@ def handle_renewal_request(config):
lineagename, e
)
logger.debug("Traceback was:\n%s", traceback.format_exc())
- renew_failures.append(renewal_candidate.fullchain)
+ if renewal_candidate:
+ renew_failures.append(renewal_candidate.fullchain)
# Describe all the results
_renew_describe_results(config, renew_successes, renew_failures,
diff --git a/certbot/certbot/_internal/reporter.py b/certbot/certbot/_internal/reporter.py
index 22275ff99..333fcca48 100644
--- a/certbot/certbot/_internal/reporter.py
+++ b/certbot/certbot/_internal/reporter.py
@@ -5,6 +5,7 @@ import queue
import sys
import textwrap
+from certbot import configuration
from certbot import util
logger = logging.getLogger(__name__)
@@ -27,11 +28,11 @@ class Reporter:
_msg_type = collections.namedtuple('_msg_type', 'priority text on_crash')
- def __init__(self, config):
+ def __init__(self, config: configuration.NamespaceConfig) -> None:
self.messages: queue.PriorityQueue[Reporter._msg_type] = queue.PriorityQueue()
self.config = config
- def add_message(self, msg, priority, on_crash=True):
+ def add_message(self, msg: str, priority: int, on_crash: bool = True) -> None:
"""Adds msg to the list of messages to be printed.
:param str msg: Message to be displayed to the user.
@@ -47,7 +48,7 @@ class Reporter:
self.messages.put(self._msg_type(priority, msg, on_crash))
logger.debug("Reporting to user: %s", msg)
- def print_messages(self):
+ def print_messages(self) -> None:
"""Prints messages to the user and clears the message queue.
If there is an unhandled exception, only messages for which
diff --git a/certbot/certbot/_internal/snap_config.py b/certbot/certbot/_internal/snap_config.py
index f7832cd55..3aad79912 100644
--- a/certbot/certbot/_internal/snap_config.py
+++ b/certbot/certbot/_internal/snap_config.py
@@ -1,7 +1,9 @@
"""Module configuring Certbot in a snap environment"""
import logging
import socket
+from typing import Iterable
from typing import List
+from typing import Optional
from requests import Session
from requests.adapters import HTTPAdapter
@@ -79,23 +81,24 @@ def prepare_env(cli_args: List[str]) -> List[str]:
class _SnapdConnection(HTTPConnection):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__("localhost")
- self.sock = None
+ self.sock: Optional[socket.socket] = None
- def connect(self):
+ def connect(self) -> None:
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect("/run/snapd.socket")
class _SnapdConnectionPool(HTTPConnectionPool):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__("localhost")
- def _new_conn(self):
+ def _new_conn(self) -> _SnapdConnection:
return _SnapdConnection()
class _SnapdAdapter(HTTPAdapter):
- def get_connection(self, url, proxies=None):
+ def get_connection(self, url: str,
+ proxies: Optional[Iterable[str]] = None) -> _SnapdConnectionPool:
return _SnapdConnectionPool()
diff --git a/certbot/certbot/_internal/storage.py b/certbot/certbot/_internal/storage.py
index 954241c0e..2b4f44fd1 100644
--- a/certbot/certbot/_internal/storage.py
+++ b/certbot/certbot/_internal/storage.py
@@ -5,8 +5,13 @@ import logging
import re
import shutil
import stat
-from typing import cast
+from typing import Any
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Mapping
from typing import Optional
+from typing import Tuple
import configobj
from cryptography.hazmat.backends import default_backend
@@ -39,7 +44,7 @@ CURRENT_VERSION = pkg_resources.parse_version(certbot.__version__)
BASE_PRIVKEY_MODE = 0o600
-def renewal_conf_files(config: configuration.NamespaceConfig):
+def renewal_conf_files(config: configuration.NamespaceConfig) -> List[str]:
"""Build a list of all renewal configuration files.
:param configuration.NamespaceConfig config: Configuration object
@@ -53,7 +58,7 @@ def renewal_conf_files(config: configuration.NamespaceConfig):
return result
-def renewal_file_for_certname(config, certname):
+def renewal_file_for_certname(config: configuration.NamespaceConfig, certname: str) -> str:
"""Return /path/to/certname.conf in the renewal conf directory"""
path = os.path.join(config.renewal_configs_dir, "{0}.conf".format(certname))
if not os.path.exists(path):
@@ -74,7 +79,8 @@ def cert_path_for_cert_name(config: configuration.NamespaceConfig, cert_name: st
cert_name_implied_conf, encoding='utf-8', default_encoding='utf-8')["fullchain"]
-def config_with_defaults(config=None):
+def config_with_defaults(config: Optional[configuration.NamespaceConfig] = None
+ ) -> configobj.ConfigObj:
"""Merge supplied config, if provided, on top of builtin defaults."""
defaults_copy = configobj.ConfigObj(
constants.RENEWER_DEFAULTS, encoding='utf-8', default_encoding='utf-8')
@@ -83,7 +89,9 @@ def config_with_defaults(config=None):
return defaults_copy
-def add_time_interval(base_time, interval, textparser=parsedatetime.Calendar()):
+def add_time_interval(base_time: datetime.datetime, interval: str,
+ textparser: parsedatetime.Calendar = parsedatetime.Calendar()
+ ) -> datetime.datetime:
"""Parse the time specified time interval, and add it to the base_time
The interval can be in the English-language format understood by
@@ -107,7 +115,9 @@ def add_time_interval(base_time, interval, textparser=parsedatetime.Calendar()):
return textparser.parseDT(interval, base_time, tzinfo=tzinfo)[0]
-def write_renewal_config(o_filename, n_filename, archive_dir, target, relevant_data):
+def write_renewal_config(o_filename: str, n_filename: str, archive_dir: str,
+ target: Mapping[str, str],
+ relevant_data: Mapping[str, Any]) -> configobj.ConfigObj:
"""Writes a renewal config file with the specified name and values.
:param str o_filename: Absolute path to the previous version of config file
@@ -160,7 +170,8 @@ def write_renewal_config(o_filename, n_filename, archive_dir, target, relevant_d
return config
-def rename_renewal_config(prev_name, new_name, cli_config):
+def rename_renewal_config(prev_name: str, new_name: str,
+ cli_config: configuration.NamespaceConfig) -> None:
"""Renames cli_config.certname's config to cli_config.new_certname.
:param .NamespaceConfig cli_config: parsed command line
@@ -178,7 +189,8 @@ def rename_renewal_config(prev_name, new_name, cli_config):
"for the new certificate name.")
-def update_configuration(lineagename, archive_dir, target, cli_config):
+def update_configuration(lineagename: str, archive_dir: str, target: Mapping[str, str],
+ cli_config: configuration.NamespaceConfig) -> configobj.ConfigObj:
"""Modifies lineagename's config to contain the specified values.
:param str lineagename: Name of the lineage being modified
@@ -206,7 +218,7 @@ def update_configuration(lineagename, archive_dir, target, cli_config):
return configobj.ConfigObj(config_filename, encoding='utf-8', default_encoding='utf-8')
-def get_link_target(link):
+def get_link_target(link: str) -> str:
"""Get an absolute path to the target of link.
:param str link: Path to a symbolic link
@@ -228,7 +240,7 @@ def get_link_target(link):
return os.path.abspath(target)
-def _write_live_readme_to(readme_path, is_base_dir=False):
+def _write_live_readme_to(readme_path: str, is_base_dir: bool = False) -> None:
prefix = ""
if is_base_dir:
prefix = "[cert name]/"
@@ -249,7 +261,7 @@ def _write_live_readme_to(readme_path, is_base_dir=False):
"certificates.\n".format(prefix=prefix))
-def _relevant(namespaces, option):
+def _relevant(namespaces: Iterable[str], option: str) -> bool:
"""
Is this option one that could be restored for future renewal purposes?
@@ -265,7 +277,7 @@ def _relevant(namespaces, option):
any(option.startswith(namespace) for namespace in namespaces))
-def relevant_values(all_values):
+def relevant_values(all_values: Mapping[str, Any]) -> Dict[str, Any]:
"""Return a new dict containing only items relevant for renewal.
:param dict all_values: The original values.
@@ -287,7 +299,8 @@ def relevant_values(all_values):
rv["server"] = all_values["server"]
return rv
-def lineagename_for_filename(config_filename):
+
+def lineagename_for_filename(config_filename: str) -> str:
"""Returns the lineagename for a configuration filename.
"""
if not config_filename.endswith(".conf"):
@@ -295,16 +308,21 @@ def lineagename_for_filename(config_filename):
"renewal config file name must end in .conf")
return os.path.basename(config_filename[:-len(".conf")])
-def renewal_filename_for_lineagename(config, lineagename):
+
+def renewal_filename_for_lineagename(config: configuration.NamespaceConfig,
+ lineagename: str) -> str:
"""Returns the lineagename for a configuration filename.
"""
return os.path.join(config.renewal_configs_dir, lineagename) + ".conf"
-def _relpath_from_file(archive_dir, from_file):
+
+def _relpath_from_file(archive_dir: str, from_file: str) -> str:
"""Path to a directory from a file"""
return os.path.relpath(archive_dir, os.path.dirname(from_file))
-def full_archive_path(config_obj, cli_config, lineagename):
+
+def full_archive_path(config_obj: configobj.ConfigObj, cli_config: configuration.NamespaceConfig,
+ lineagename: str) -> str:
"""Returns the full archive path for a lineagename
Uses cli_config to determine archive path if not available from config_obj.
@@ -317,11 +335,13 @@ def full_archive_path(config_obj, cli_config, lineagename):
return config_obj["archive_dir"]
return os.path.join(cli_config.default_archive_dir, lineagename)
-def _full_live_path(cli_config, lineagename):
+
+def _full_live_path(cli_config: configuration.NamespaceConfig, lineagename: str) -> str:
"""Returns the full default live path for a lineagename"""
return os.path.join(cli_config.live_dir, lineagename)
-def delete_files(config, certname):
+
+def delete_files(config: configuration.NamespaceConfig, certname: str) -> None:
"""Delete all files related to the certificate.
If some files are not found, ignore them and continue.
@@ -423,7 +443,8 @@ class RenewableCert(interfaces.RenewableCert):
renewal configuration file and/or systemwide defaults.
"""
- def __init__(self, config_filename, cli_config, update_symlinks=False):
+ def __init__(self, config_filename: str, cli_config: configuration.NamespaceConfig,
+ update_symlinks: bool = False) -> None:
"""Instantiate a RenewableCert object from an existing lineage.
:param str config_filename: the path to the renewal config file
@@ -477,27 +498,27 @@ class RenewableCert(interfaces.RenewableCert):
self._check_symlinks()
@property
- def key_path(self):
+ def key_path(self) -> str:
"""Duck type for self.privkey"""
return self.privkey
@property
- def cert_path(self):
+ def cert_path(self) -> str:
"""Duck type for self.cert"""
return self.cert
@property
- def chain_path(self):
+ def chain_path(self) -> str:
"""Duck type for self.chain"""
return self.chain
@property
- def fullchain_path(self):
+ def fullchain_path(self) -> str:
"""Duck type for self.fullchain"""
return self.fullchain
@property
- def lineagename(self):
+ def lineagename(self) -> str:
"""Name given to the certificate lineage.
:rtype: str
@@ -506,21 +527,24 @@ class RenewableCert(interfaces.RenewableCert):
return self._lineagename
@property
- def target_expiry(self):
+ def target_expiry(self) -> datetime.datetime:
"""The current target certificate's expiration datetime
:returns: Expiration datetime of the current target certificate
:rtype: :class:`datetime.datetime`
"""
- return crypto_util.notAfter(self.current_target("cert"))
+ cert_path = self.current_target("cert")
+ if not cert_path:
+ raise errors.Error("Target certificate does not exist.")
+ return crypto_util.notAfter(cert_path)
@property
- def archive_dir(self):
+ def archive_dir(self) -> str:
"""Returns the default or specified archive directory"""
return full_archive_path(self.configuration,
self.cli_config, self.lineagename)
- def relative_archive_dir(self, from_file):
+ def relative_archive_dir(self, from_file: str) -> str:
"""Returns the default or specified archive directory as a relative path
Used for creating symbolic links.
@@ -539,7 +563,7 @@ class RenewableCert(interfaces.RenewableCert):
return util.is_staging(self.server)
return False
- def _check_symlinks(self):
+ def _check_symlinks(self) -> None:
"""Raises an exception if a symlink doesn't exist"""
for kind in ALL_FOUR:
link = getattr(self, kind)
@@ -551,7 +575,7 @@ class RenewableCert(interfaces.RenewableCert):
raise errors.CertStorageError("target {0} of symlink {1} does "
"not exist".format(target, link))
- def _update_symlinks(self):
+ def _update_symlinks(self) -> None:
"""Updates symlinks to use archive_dir"""
for kind in ALL_FOUR:
link = getattr(self, kind)
@@ -562,7 +586,7 @@ class RenewableCert(interfaces.RenewableCert):
os.unlink(link)
os.symlink(new_link, link)
- def _consistent(self):
+ def _consistent(self) -> bool:
"""Are the files associated with this lineage self-consistent?
:returns: Whether the files stored in connection with this
@@ -630,7 +654,7 @@ class RenewableCert(interfaces.RenewableCert):
# for x in ALL_FOUR))) == 1
return True
- def _fix(self):
+ def _fix(self) -> None:
"""Attempt to fix defects or inconsistencies in this lineage.
.. todo:: Currently unimplemented.
@@ -648,7 +672,7 @@ class RenewableCert(interfaces.RenewableCert):
# happen as a result of random tampering by a sysadmin, or
# filesystem errors, or crashes.)
- def _previous_symlinks(self):
+ def _previous_symlinks(self) -> List[Tuple[str, str]]:
"""Returns the kind and path of all symlinks used in recovery.
:returns: list of (kind, symlink) tuples
@@ -663,7 +687,7 @@ class RenewableCert(interfaces.RenewableCert):
return previous_symlinks
- def _fix_symlinks(self):
+ def _fix_symlinks(self) -> None:
"""Fixes symlinks in the event of an incomplete version update.
If there is no problem with the current symlinks, this function
@@ -682,7 +706,7 @@ class RenewableCert(interfaces.RenewableCert):
if os.path.exists(link):
os.unlink(link)
- def current_target(self, kind):
+ def current_target(self, kind: str) -> Optional[str]:
"""Returns full path to which the specified item currently points.
:param str kind: the lineage member item ("cert", "privkey",
@@ -702,7 +726,7 @@ class RenewableCert(interfaces.RenewableCert):
return None
return get_link_target(link)
- def current_version(self, kind):
+ def current_version(self, kind: str) -> Optional[int]:
"""Returns numerical version of the specified item.
For example, if kind is "chain" and the current chain link
@@ -729,7 +753,7 @@ class RenewableCert(interfaces.RenewableCert):
logger.debug("No matches for target %s.", kind)
return None
- def version(self, kind, version):
+ def version(self, kind: str, version: int) -> str:
"""The filename that corresponds to the specified version and kind.
.. warning:: The specified version may not exist in this
@@ -746,10 +770,13 @@ class RenewableCert(interfaces.RenewableCert):
"""
if kind not in ALL_FOUR:
raise errors.CertStorageError("unknown kind of item")
- where = os.path.dirname(self.current_target(kind))
+ link = self.current_target(kind)
+ if not link:
+ raise errors.Error(f"Target {kind} does not exist!")
+ where = os.path.dirname(link)
return os.path.join(where, "{0}{1}.pem".format(kind, version))
- def available_versions(self, kind):
+ def available_versions(self, kind: str) -> List[int]:
"""Which alternative versions of the specified kind of item exist?
The archive directory where the current version is stored is
@@ -764,13 +791,16 @@ class RenewableCert(interfaces.RenewableCert):
"""
if kind not in ALL_FOUR:
raise errors.CertStorageError("unknown kind of item")
- where = os.path.dirname(self.current_target(kind))
+ link = self.current_target(kind)
+ if not link:
+ raise errors.Error(f"Target {kind} does not exist!")
+ where = os.path.dirname(link)
files = os.listdir(where)
pattern = re.compile(r"^{0}([0-9]+)\.pem$".format(kind))
matches = [pattern.match(f) for f in files]
return sorted([int(m.groups()[0]) for m in matches if m])
- def newest_available_version(self, kind):
+ def newest_available_version(self, kind: str) -> int:
"""Newest available version of the specified kind of item?
:param str kind: the lineage member item (``cert``,
@@ -782,7 +812,7 @@ class RenewableCert(interfaces.RenewableCert):
"""
return max(self.available_versions(kind))
- def latest_common_version(self):
+ def latest_common_version(self) -> int:
"""Newest version for which all items are available?
:returns: the newest available version for which all members
@@ -797,7 +827,7 @@ class RenewableCert(interfaces.RenewableCert):
versions = [self.available_versions(x) for x in ALL_FOUR]
return max(n for n in versions[0] if all(n in v for v in versions[1:]))
- def next_free_version(self):
+ def next_free_version(self) -> int:
"""Smallest version newer than all full or partial versions?
:returns: the smallest version number that is larger than any
@@ -811,7 +841,7 @@ class RenewableCert(interfaces.RenewableCert):
# for the others.
return max(self.newest_available_version(x) for x in ALL_FOUR) + 1
- def ensure_deployed(self):
+ def ensure_deployed(self) -> bool:
"""Make sure we've deployed the latest version.
:returns: False if a change was needed, True otherwise
@@ -826,8 +856,7 @@ class RenewableCert(interfaces.RenewableCert):
return False
return True
-
- def has_pending_deployment(self):
+ def has_pending_deployment(self) -> bool:
"""Is there a later version of all of the managed items?
:returns: ``True`` if there is a complete version of this
@@ -836,12 +865,18 @@ class RenewableCert(interfaces.RenewableCert):
:rtype: bool
"""
+ all_versions: List[int] = []
+ for item in ALL_FOUR:
+ version = self.current_version(item)
+ if version is None:
+ raise errors.Error(f"{item} is required but missing for this certificate.")
+ all_versions.append(version)
# TODO: consider whether to assume consistency or treat
# inconsistent/consistent versions differently
- smallest_current = min(self.current_version(x) for x in ALL_FOUR)
+ smallest_current = min(all_versions)
return smallest_current < self.latest_common_version()
- def _update_link_to(self, kind, version):
+ def _update_link_to(self, kind: str, version: int) -> None:
"""Make the specified item point at the specified version.
(Note that this method doesn't verify that the specified version
@@ -867,7 +902,7 @@ class RenewableCert(interfaces.RenewableCert):
os.unlink(link)
os.symlink(os.path.join(target_directory, filename), link)
- def update_all_links_to(self, version):
+ def update_all_links_to(self, version: int) -> None:
"""Change all member objects to point to the specified version.
:param int version: the desired version
@@ -876,7 +911,10 @@ class RenewableCert(interfaces.RenewableCert):
with error_handler.ErrorHandler(self._fix_symlinks):
previous_links = self._previous_symlinks()
for kind, link in previous_links:
- os.symlink(self.current_target(kind), link)
+ target = self.current_target(kind)
+ if not target:
+ raise errors.Error(f"Target {kind} does not exist!")
+ os.symlink(target, link)
for kind in ALL_FOUR:
self._update_link_to(kind, version)
@@ -884,7 +922,7 @@ class RenewableCert(interfaces.RenewableCert):
for _, link in previous_links:
os.unlink(link)
- def names(self):
+ def names(self) -> List[str]:
"""What are the subject names of this certificate?
:returns: the subject names
@@ -895,11 +933,10 @@ class RenewableCert(interfaces.RenewableCert):
target = self.current_target("cert")
if target is None:
raise errors.CertStorageError("could not find the certificate file")
- with open(target) as f:
- # TODO: Remove the cast once certbot package is fully typed
- return crypto_util.get_names_from_cert(cast(bytes, f.read()))
+ with open(target, "rb") as f:
+ return crypto_util.get_names_from_cert(f.read())
- def ocsp_revoked(self, version):
+ def ocsp_revoked(self, version: int) -> bool:
"""Is the specified cert version revoked according to OCSP?
Also returns True if the cert version is declared as revoked
@@ -927,7 +964,7 @@ class RenewableCert(interfaces.RenewableCert):
logger.debug(str(e))
return False
- def autorenewal_is_enabled(self):
+ def autorenewal_is_enabled(self) -> bool:
"""Is automatic renewal enabled for this cert?
If autorenew is not specified, defaults to True.
@@ -939,7 +976,7 @@ class RenewableCert(interfaces.RenewableCert):
return ("autorenew" not in self.configuration["renewalparams"] or
self.configuration["renewalparams"].as_bool("autorenew"))
- def should_autorenew(self):
+ def should_autorenew(self) -> bool:
"""Should we now try to autorenew the most recent cert version?
This is a policy question and does not only depend on whether
@@ -977,7 +1014,8 @@ class RenewableCert(interfaces.RenewableCert):
return False
@classmethod
- def new_lineage(cls, lineagename, cert, privkey, chain, cli_config):
+ def new_lineage(cls, lineagename: str, cert: bytes, privkey: bytes, chain: bytes,
+ cli_config: configuration.NamespaceConfig) -> "RenewableCert":
"""Create a new certificate lineage.
Attempts to create a certificate lineage -- enrolled for
@@ -1072,7 +1110,7 @@ class RenewableCert(interfaces.RenewableCert):
return cls(new_config.filename, cli_config)
@property
- def private_key_type(self):
+ def private_key_type(self) -> str:
"""
:returns: The type of algorithm for the private, RSA or ECDSA
:rtype: str
@@ -1088,8 +1126,8 @@ class RenewableCert(interfaces.RenewableCert):
else:
return "ECDSA"
- def save_successor(self, prior_version, new_cert,
- new_privkey, new_chain, cli_config):
+ def save_successor(self, prior_version: int, new_cert: bytes, new_privkey: bytes,
+ new_chain: bytes, cli_config: configuration.NamespaceConfig) -> int:
"""Save new cert and chain as a successor of a prior version.
Returns the new version number that was created.
diff --git a/certbot/certbot/_internal/updater.py b/certbot/certbot/_internal/updater.py
index 6494f392e..214087550 100644
--- a/certbot/certbot/_internal/updater.py
+++ b/certbot/certbot/_internal/updater.py
@@ -1,14 +1,19 @@
"""Updaters run at renewal"""
import logging
+from certbot import configuration
from certbot import errors
from certbot import interfaces
+from certbot._internal import storage
+from certbot._internal.plugins import disco as plugin_disco
from certbot._internal.plugins import selection as plug_sel
from certbot.plugins import enhancements
logger = logging.getLogger(__name__)
-def run_generic_updaters(config, lineage, plugins):
+
+def run_generic_updaters(config: configuration.NamespaceConfig, lineage: storage.RenewableCert,
+ plugins: plugin_disco.PluginsRegistry) -> None:
"""Run updaters that the plugin supports
:param config: Configuration object
@@ -35,7 +40,9 @@ def run_generic_updaters(config, lineage, plugins):
_run_updaters(lineage, installer, config)
_run_enhancement_updaters(lineage, installer, config)
-def run_renewal_deployer(config, lineage, installer):
+
+def run_renewal_deployer(config: configuration.NamespaceConfig, lineage: storage.RenewableCert,
+ installer: interfaces.Installer) -> None:
"""Helper function to run deployer interface method if supported by the used
installer plugin.
@@ -60,7 +67,9 @@ def run_renewal_deployer(config, lineage, installer):
installer.renew_deploy(lineage)
_run_enhancement_deployers(lineage, installer, config)
-def _run_updaters(lineage, installer, config):
+
+def _run_updaters(lineage: storage.RenewableCert, installer: interfaces.Installer,
+ config: configuration.NamespaceConfig) -> None:
"""Helper function to run the updater interface methods if supported by the
used installer plugin.
@@ -77,7 +86,9 @@ def _run_updaters(lineage, installer, config):
if isinstance(installer, interfaces.GenericUpdater):
installer.generic_updates(lineage)
-def _run_enhancement_updaters(lineage, installer, config):
+
+def _run_enhancement_updaters(lineage: storage.RenewableCert, installer: interfaces.Installer,
+ config: configuration.NamespaceConfig) -> None:
"""Iterates through known enhancement interfaces. If the installer implements
an enhancement interface and the enhance interface has an updater method, the
updater method gets run.
@@ -99,7 +110,8 @@ def _run_enhancement_updaters(lineage, installer, config):
getattr(installer, enh["updater_function"])(lineage)
-def _run_enhancement_deployers(lineage, installer, config):
+def _run_enhancement_deployers(lineage: storage.RenewableCert, installer: interfaces.Installer,
+ config: configuration.NamespaceConfig) -> None:
"""Iterates through known enhancement interfaces. If the installer implements
an enhancement interface and the enhance interface has an deployer method, the
deployer method gets run.