Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdrien Ferrand <adferrand@users.noreply.github.com>2021-11-26 01:00:03 +0300
committerGitHub <noreply@github.com>2021-11-26 01:00:03 +0300
commit86406ab63aebf463ca4aa0381a55ddeb91231cd2 (patch)
tree4af373737d9c548dc1f9f7e9691b81c33f3297ed
parent7d3a344d4314a19f49c706f6c82959ba6ce93451 (diff)
Add type annotations to the certbot package (part 4) (#9087)
* Extract from #9084 * Cast/ignore types during the transition * Remove useless casts and type ignore directives * Fix lint * Fix a cast * Mandatory typing for certbot packages * Update certbot/certbot/_internal/plugins/disco.py Co-authored-by: alexzorin <alex@zor.io> * Remove unused type import * Fix iterator type * Fix type * Fix types in selection Co-authored-by: alexzorin <alex@zor.io>
-rw-r--r--acme/acme/messages.py2
-rw-r--r--certbot/certbot/_internal/cli/__init__.py16
-rw-r--r--certbot/certbot/_internal/cli/cli_utils.py83
-rw-r--r--certbot/certbot/_internal/cli/group_adder.py7
-rw-r--r--certbot/certbot/_internal/cli/helpful.py46
-rw-r--r--certbot/certbot/_internal/cli/paths_parser.py12
-rw-r--r--certbot/certbot/_internal/cli/plugins_parsing.py9
-rw-r--r--certbot/certbot/_internal/cli/subparsers.py7
-rw-r--r--certbot/certbot/_internal/client.py13
-rw-r--r--certbot/certbot/_internal/display/completer.py17
-rw-r--r--certbot/certbot/_internal/display/dummy_readline.py14
-rw-r--r--certbot/certbot/_internal/display/obj.py139
-rw-r--r--certbot/certbot/_internal/display/util.py21
-rw-r--r--certbot/certbot/_internal/plugins/disco.py92
-rw-r--r--certbot/certbot/_internal/plugins/manual.py33
-rw-r--r--certbot/certbot/_internal/plugins/null.py31
-rw-r--r--certbot/certbot/_internal/plugins/selection.py100
-rw-r--r--certbot/certbot/_internal/plugins/standalone.py49
-rw-r--r--certbot/certbot/_internal/plugins/webroot.py59
-rw-r--r--tox.ini4
20 files changed, 471 insertions, 283 deletions
diff --git a/acme/acme/messages.py b/acme/acme/messages.py
index 0dd579a98..80dacb674 100644
--- a/acme/acme/messages.py
+++ b/acme/acme/messages.py
@@ -335,7 +335,7 @@ class Registration(ResourceBody):
@classmethod
def from_data(cls, phone: Optional[str] = None, email: Optional[str] = None,
- external_account_binding: Optional[ExternalAccountBinding] = None,
+ external_account_binding: Optional[Dict[str, Any]] = None,
**kwargs: Any) -> 'Registration':
"""
Create registration resource from contact details.
diff --git a/certbot/certbot/_internal/cli/__init__.py b/certbot/certbot/_internal/cli/__init__.py
index e7a1de49b..63b0dd097 100644
--- a/certbot/certbot/_internal/cli/__init__.py
+++ b/certbot/certbot/_internal/cli/__init__.py
@@ -4,7 +4,10 @@ import argparse
import logging
import logging.handlers
import sys
+from typing import Any
+from typing import List
from typing import Optional
+from typing import Type
import certbot
from certbot._internal import constants
@@ -40,9 +43,9 @@ from certbot._internal.cli.plugins_parsing import _plugins_parsing
from certbot._internal.cli.subparsers import _create_subparsers
from certbot._internal.cli.verb_help import VERB_HELP
from certbot._internal.cli.verb_help import VERB_HELP_MAP
-from certbot.plugins import enhancements
from certbot._internal.plugins import disco as plugins_disco
import certbot._internal.plugins.selection as plugin_selection
+from certbot.plugins import enhancements
logger = logging.getLogger(__name__)
@@ -51,7 +54,8 @@ logger = logging.getLogger(__name__)
helpful_parser: Optional[HelpfulArgumentParser] = None
-def prepare_and_parse_args(plugins, args, detect_defaults=False):
+def prepare_and_parse_args(plugins: plugins_disco.PluginsRegistry, args: List[str],
+ detect_defaults: bool = False) -> argparse.Namespace:
"""Returns parsed command line arguments.
:param .PluginsRegistry plugins: available plugins
@@ -443,7 +447,7 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False):
return helpful.parse_args()
-def set_by_cli(var):
+def set_by_cli(var: str) -> bool:
"""
Return True if a particular config variable has been set by the user
(CLI or config file) including if the user explicitly set it to the
@@ -487,7 +491,7 @@ def set_by_cli(var):
set_by_cli.detector = None # type: ignore
-def has_default_value(option, value):
+def has_default_value(option: str, value: Any) -> bool:
"""Does option have the default value?
If the default value of option is not known, False is returned.
@@ -505,7 +509,7 @@ def has_default_value(option, value):
return False
-def option_was_set(option, value):
+def option_was_set(option: str, value: Any) -> bool:
"""Was option set by the user or does it differ from the default?
:param str option: configuration variable being considered
@@ -521,7 +525,7 @@ def option_was_set(option, value):
return set_by_cli(option) or not has_default_value(option, value)
-def argparse_type(variable):
+def argparse_type(variable: Any) -> Type:
"""Return our argparse type function for a config variable (default: str)"""
# pylint: disable=protected-access
if helpful_parser is not None:
diff --git a/certbot/certbot/_internal/cli/cli_utils.py b/certbot/certbot/_internal/cli/cli_utils.py
index 7c859509c..5f3267eb0 100644
--- a/certbot/certbot/_internal/cli/cli_utils.py
+++ b/certbot/certbot/_internal/cli/cli_utils.py
@@ -2,6 +2,14 @@
import argparse
import copy
import inspect
+from typing import Any
+from typing import Iterable
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
from acme import challenges
from certbot import configuration
@@ -10,24 +18,27 @@ from certbot import util
from certbot._internal import constants
from certbot.compat import os
+if TYPE_CHECKING:
+ from certbot._internal.cli import helpful
+
class _Default:
"""A class to use as a default to detect if a value is set by a user"""
- def __bool__(self):
+ def __bool__(self) -> bool:
return False
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
return isinstance(other, _Default)
- def __hash__(self):
+ def __hash__(self) -> int:
return id(_Default)
- def __nonzero__(self):
+ def __nonzero__(self) -> bool:
return self.__bool__()
-def read_file(filename, mode="rb"):
+def read_file(filename: str, mode: str = "rb") -> Tuple[str, Any]:
"""Returns the given file's contents.
:param str filename: path to file
@@ -48,7 +59,7 @@ def read_file(filename, mode="rb"):
raise argparse.ArgumentTypeError(exc.strerror)
-def flag_default(name):
+def flag_default(name: str) -> Any:
"""Default value for CLI flag."""
# XXX: this is an internal housekeeping notion of defaults before
# argparse has been set up; it is not accurate for all flags. Call it
@@ -57,7 +68,7 @@ def flag_default(name):
return copy.deepcopy(constants.CLI_DEFAULTS[name])
-def config_help(name, hidden=False):
+def config_help(name: str, hidden: bool = False) -> Optional[str]:
"""Extract the help message for a `configuration.NamespaceConfig` property docstring."""
if hidden:
return argparse.SUPPRESS
@@ -73,11 +84,11 @@ class HelpfulArgumentGroup:
HelpfulArgumentParser when necessary.
"""
- def __init__(self, helpful_arg_parser, topic):
+ def __init__(self, helpful_arg_parser: "helpful.HelpfulArgumentParser", topic: str) -> None:
self._parser = helpful_arg_parser
self._topic = topic
- def add_argument(self, *args, **kwargs):
+ def add_argument(self, *args: Any, **kwargs: Any) -> None:
"""Add a new command line argument to the argument group."""
self._parser.add(self._topic, *args, **kwargs)
@@ -88,12 +99,12 @@ class CustomHelpFormatter(argparse.HelpFormatter):
In particular we fix https://bugs.python.org/issue28742
"""
- def _get_help_string(self, action):
+ def _get_help_string(self, action: argparse.Action) -> Optional[str]:
helpstr = action.help
- if '%(default)' not in action.help and '(default:' not in action.help:
+ if action.help and '%(default)' not in action.help and '(default:' not in action.help:
if action.default != argparse.SUPPRESS:
defaulting_nargs = [argparse.OPTIONAL, argparse.ZERO_OR_MORE]
- if action.option_strings or action.nargs in defaulting_nargs:
+ if helpstr and (action.option_strings or action.nargs in defaulting_nargs):
helpstr += ' (default: %(default)s)'
return helpstr
@@ -101,12 +112,15 @@ class CustomHelpFormatter(argparse.HelpFormatter):
class _DomainsAction(argparse.Action):
"""Action class for parsing domains."""
- def __call__(self, parser, namespace, domain, option_string=None):
+ def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
+ domain: Union[str, Sequence[Any], None],
+ option_string: Optional[str] = None) -> None:
"""Just wrap add_domains in argparseese."""
- add_domains(namespace, domain)
+ add_domains(namespace, str(domain) if domain is not None else None)
-def add_domains(args_or_config, domains):
+def add_domains(args_or_config: Union[argparse.Namespace, configuration.NamespaceConfig],
+ domains: Optional[str]) -> List[str]:
"""Registers new domains to be used during the current client run.
Domains are not added to the list of requested domains if they have
@@ -121,7 +135,10 @@ def add_domains(args_or_config, domains):
:rtype: `list` of `str`
"""
- validated_domains = []
+ validated_domains: List[str] = []
+ if not domains:
+ return validated_domains
+
for domain in domains.split(","):
domain = util.enforce_domain_sanity(domain.strip())
validated_domains.append(domain)
@@ -137,11 +154,13 @@ class CaseInsensitiveList(list):
This class is passed to the `choices` argument of `argparse.add_arguments`
through the `helpful` wrapper. It is necessary due to special handling of
command line arguments by `set_by_cli` in which the `type_func` is not applied."""
- def __contains__(self, element):
+ def __contains__(self, element: object) -> bool:
+ if not isinstance(element, str):
+ return False
return super().__contains__(element.lower())
-def _user_agent_comment_type(value):
+def _user_agent_comment_type(value: str) -> str:
if "(" in value or ")" in value:
raise argparse.ArgumentTypeError("may not contain parentheses")
return value
@@ -150,13 +169,17 @@ def _user_agent_comment_type(value):
class _EncodeReasonAction(argparse.Action):
"""Action class for parsing revocation reason."""
- def __call__(self, parser, namespace, reason, option_string=None):
+ def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
+ reason: Union[str, Sequence[Any], None],
+ option_string: Optional[str] = None) -> None:
"""Encodes the reason for certificate revocation."""
- code = constants.REVOCATION_REASONS[reason.lower()]
+ if reason is None:
+ raise ValueError("Unexpected null reason.")
+ code = constants.REVOCATION_REASONS[str(reason).lower()]
setattr(namespace, self.dest, code)
-def parse_preferred_challenges(pref_challs):
+def parse_preferred_challenges(pref_challs: Iterable[str]) -> List[str]:
"""Translate and validate preferred challenges.
:param pref_challs: list of preferred challenge types
@@ -183,9 +206,13 @@ def parse_preferred_challenges(pref_challs):
class _PrefChallAction(argparse.Action):
"""Action class for parsing preferred challenges."""
- def __call__(self, parser, namespace, pref_challs, option_string=None):
+ def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
+ pref_challs: Union[str, Sequence[Any], None],
+ option_string: Optional[str] = None) -> None:
+ if pref_challs is None:
+ raise ValueError("Unexpected null pref_challs.")
try:
- challs = parse_preferred_challenges(pref_challs.split(","))
+ challs = parse_preferred_challenges(str(pref_challs).split(","))
except errors.Error as error:
raise argparse.ArgumentError(self, str(error))
namespace.pref_challs.extend(challs)
@@ -194,7 +221,9 @@ class _PrefChallAction(argparse.Action):
class _DeployHookAction(argparse.Action):
"""Action class for parsing deploy hooks."""
- def __call__(self, parser, namespace, values, option_string=None):
+ def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
+ values: Union[str, Sequence[Any], None],
+ option_string: Optional[str] = None) -> None:
renew_hook_set = namespace.deploy_hook != namespace.renew_hook
if renew_hook_set and namespace.renew_hook != values:
raise argparse.ArgumentError(
@@ -205,7 +234,9 @@ class _DeployHookAction(argparse.Action):
class _RenewHookAction(argparse.Action):
"""Action class for parsing renew hooks."""
- def __call__(self, parser, namespace, values, option_string=None):
+ def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
+ values: Union[str, Sequence[Any], None],
+ option_string: Optional[str] = None) -> None:
deploy_hook_set = namespace.deploy_hook is not None
if deploy_hook_set and namespace.deploy_hook != values:
raise argparse.ArgumentError(
@@ -213,7 +244,7 @@ class _RenewHookAction(argparse.Action):
namespace.renew_hook = values
-def nonnegative_int(value):
+def nonnegative_int(value: str) -> int:
"""Converts value to an int and checks that it is not negative.
This function should used as the type parameter for argparse
diff --git a/certbot/certbot/_internal/cli/group_adder.py b/certbot/certbot/_internal/cli/group_adder.py
index 0c54c9fe1..96d58824b 100644
--- a/certbot/certbot/_internal/cli/group_adder.py
+++ b/certbot/certbot/_internal/cli/group_adder.py
@@ -1,9 +1,14 @@
"""This module contains a function to add the groups of arguments for the help
display"""
+from typing import TYPE_CHECKING
+
from certbot._internal.cli.verb_help import VERB_HELP
+if TYPE_CHECKING:
+ from certbot._internal.cli import helpful
+
-def _add_all_groups(helpful):
+def _add_all_groups(helpful: "helpful.HelpfulArgumentParser") -> None:
helpful.add_group("automation", description="Flags for automating execution & other tweaks")
helpful.add_group("security", description="Security parameters & server settings")
helpful.add_group("testing",
diff --git a/certbot/certbot/_internal/cli/helpful.py b/certbot/certbot/_internal/cli/helpful.py
index 0848829c4..714c487a4 100644
--- a/certbot/certbot/_internal/cli/helpful.py
+++ b/certbot/certbot/_internal/cli/helpful.py
@@ -7,6 +7,10 @@ import glob
import sys
from typing import Any
from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Optional
+from typing import Union
import configargparse
@@ -29,6 +33,7 @@ from certbot._internal.cli.cli_utils import HelpfulArgumentGroup
from certbot._internal.cli.verb_help import VERB_HELP
from certbot._internal.cli.verb_help import VERB_HELP_MAP
from certbot._internal.display import obj as display_obj
+from certbot._internal.plugins import disco
from certbot.compat import os
@@ -40,7 +45,8 @@ class HelpfulArgumentParser:
'certbot --help security' for security options.
"""
- def __init__(self, args, plugins, detect_defaults=False):
+ def __init__(self, args: List[str], plugins: Iterable[str],
+ detect_defaults: bool = False) -> None:
from certbot._internal import main
self.VERBS = {
"auth": main.certonly,
@@ -80,6 +86,7 @@ class HelpfulArgumentParser:
self.determine_verb()
help1 = self.prescan_for_flag("-h", self.help_topics)
help2 = self.prescan_for_flag("--help", self.help_topics)
+ self.help_arg: Union[str, bool]
if isinstance(help1, bool) and isinstance(help2, bool):
self.help_arg = help1 or help2
else:
@@ -111,7 +118,7 @@ class HelpfulArgumentParser:
# Help that are synonyms for --help subcommands
COMMANDS_TOPICS = ["command", "commands", "subcommand", "subcommands", "verbs"]
- def _list_subcommands(self):
+ def _list_subcommands(self) -> str:
longest = max(len(v) for v in VERB_HELP_MAP)
text = "The full list of available SUBCOMMANDS is:\n\n"
@@ -122,7 +129,7 @@ class HelpfulArgumentParser:
text += "\nYou can get more help on a specific subcommand with --help SUBCOMMAND\n"
return text
- def _usage_string(self, plugins, help_arg):
+ def _usage_string(self, plugins: Iterable[str], help_arg: Union[str, bool]) -> str:
"""Make usage strings late so that plugins can be initialised late
:param plugins: all discovered plugins
@@ -150,13 +157,14 @@ class HelpfulArgumentParser:
# if we're doing --help all, the OVERVIEW is part of the SHORT_USAGE at
# the top; if we're doing --help someothertopic, it's OT so it's not
usage += COMMAND_OVERVIEW % (apache_doc, nginx_doc)
- else:
+ elif isinstance(help_arg, str):
custom = VERB_HELP_MAP.get(help_arg, {}).get("usage", None)
usage = custom if custom else usage
+ # Only remaining case is help_arg == False, which gives effectively usage == SHORT_USAGE.
return usage
- def remove_config_file_domains_for_renewal(self, parsed_args):
+ def remove_config_file_domains_for_renewal(self, parsed_args: argparse.Namespace) -> None:
"""Make "certbot renew" safe if domains are set in cli.ini."""
# Works around https://github.com/certbot/certbot/issues/4096
if self.verb == "renew":
@@ -164,7 +172,7 @@ class HelpfulArgumentParser:
if source.startswith("config_file") and "domains" in flags:
parsed_args.domains = _Default() if self.detect_defaults else []
- def parse_args(self):
+ def parse_args(self) -> argparse.Namespace:
"""Parses command line arguments and returns the result.
:returns: parsed command line arguments
@@ -224,7 +232,7 @@ class HelpfulArgumentParser:
return parsed_args
- def set_test_server(self, parsed_args):
+ def set_test_server(self, parsed_args: argparse.Namespace) -> None:
"""We have --staging/--dry-run; perform sanity check and set config.server"""
# Flag combinations should produce these results:
@@ -253,7 +261,7 @@ class HelpfulArgumentParser:
parsed_args.tos = True
parsed_args.register_unsafely_without_email = True
- def handle_csr(self, parsed_args):
+ def handle_csr(self, parsed_args: argparse.Namespace) -> None:
"""Process a --csr flag."""
if parsed_args.verb != "certonly":
raise errors.Error("Currently, a CSR file may only be specified "
@@ -287,7 +295,7 @@ class HelpfulArgumentParser:
.format(", ".join(csr_domains), ", ".join(config_domains)))
- def determine_verb(self):
+ def determine_verb(self) -> None:
"""Determines the verb/subcommand provided by the user.
This function works around some of the limitations of argparse.
@@ -311,7 +319,7 @@ class HelpfulArgumentParser:
self.verb = "run"
- def prescan_for_flag(self, flag, possible_arguments):
+ def prescan_for_flag(self, flag: str, possible_arguments: Iterable[str]) -> Union[str, bool]:
"""Checks cli input for flags.
Check for a flag, which accepts a fixed set of possible arguments, in
@@ -332,7 +340,8 @@ class HelpfulArgumentParser:
pass
return True
- def add(self, topics, *args, **kwargs):
+ def add(self, topics: Optional[Union[List[Optional[str]], str]], *args: Any,
+ **kwargs: Any) -> None:
"""Add a new command line argument.
:param topics: str or [str] help topic(s) this should be listed under,
@@ -367,7 +376,7 @@ class HelpfulArgumentParser:
if self.detect_defaults:
kwargs = self.modify_kwargs_for_default_detection(**kwargs)
- if self.visible_topics[topic]:
+ if isinstance(topic, str) and self.visible_topics[topic]:
if topic in self.groups:
group = self.groups[topic]
group.add_argument(*args, **kwargs)
@@ -377,7 +386,7 @@ class HelpfulArgumentParser:
kwargs["help"] = argparse.SUPPRESS
self.parser.add_argument(*args, **kwargs)
- def modify_kwargs_for_default_detection(self, **kwargs):
+ def modify_kwargs_for_default_detection(self, **kwargs: Any) -> Dict[str, Any]:
"""Modify an arg so we can check if it was set by the user.
Changes the parameters given to argparse when adding an argument
@@ -399,7 +408,7 @@ class HelpfulArgumentParser:
return kwargs
- def add_deprecated_argument(self, argument_name, num_args):
+ def add_deprecated_argument(self, argument_name: str, num_args: int) -> None:
"""Adds a deprecated argument with the name argument_name.
Deprecated arguments are not shown in the help. If they are used
@@ -407,7 +416,7 @@ class HelpfulArgumentParser:
argument is deprecated and no other action is taken.
:param str argument_name: Name of deprecated argument.
- :param int nargs: Number of arguments the option takes.
+ :param int num_args: Number of arguments the option takes.
"""
# certbot.util.add_deprecated_argument expects the normal add_argument
@@ -427,7 +436,8 @@ class HelpfulArgumentParser:
add_func = functools.partial(self.add, None)
util.add_deprecated_argument(add_func, argument_name, num_args)
- def add_group(self, topic, verbs=(), **kwargs):
+ def add_group(self, topic: str, verbs: Iterable[str] = (),
+ **kwargs: Any) -> HelpfulArgumentGroup:
"""Create a new argument group.
This method must be called once for every topic, however, calls
@@ -449,7 +459,7 @@ class HelpfulArgumentParser:
self.groups[topic].add_argument(v, help=VERB_HELP_MAP[v]["short"])
return HelpfulArgumentGroup(self, topic)
- def add_plugin_args(self, plugins):
+ def add_plugin_args(self, plugins: disco.PluginsRegistry) -> None:
"""
Let each of the plugins add its own command line arguments, which
@@ -461,7 +471,7 @@ class HelpfulArgumentParser:
description=plugin_ep.long_description)
plugin_ep.plugin_cls.inject_parser_options(parser_or_group, name)
- def determine_help_topics(self, chosen_topic):
+ def determine_help_topics(self, chosen_topic: Union[str, bool]) -> Dict[str, bool]:
"""
The user may have requested help on a topic, return a dict of which
diff --git a/certbot/certbot/_internal/cli/paths_parser.py b/certbot/certbot/_internal/cli/paths_parser.py
index 6197a4bf9..69a112cfa 100644
--- a/certbot/certbot/_internal/cli/paths_parser.py
+++ b/certbot/certbot/_internal/cli/paths_parser.py
@@ -1,13 +1,19 @@
"""This is a module that adds configuration to the argument parser regarding
paths for certificates"""
+from typing import TYPE_CHECKING
+from typing import Union
+
from certbot._internal.cli.cli_utils import config_help
from certbot._internal.cli.cli_utils import flag_default
from certbot.compat import os
+if TYPE_CHECKING:
+ from certbot._internal.cli import helpful
+
-def _paths_parser(helpful):
+def _paths_parser(helpful: "helpful.HelpfulArgumentParser") -> None:
add = helpful.add
- verb = helpful.verb
+ verb: Union[str, bool] = helpful.verb
if verb == "help":
verb = helpful.help_arg
@@ -21,7 +27,7 @@ def _paths_parser(helpful):
add(["paths", "install", "revoke", "certonly", "manage"], "--cert-path", **cpkwargs)
section = "paths"
- if verb in ("install", "revoke"):
+ if isinstance(verb, str) and verb in ("install", "revoke"):
section = verb
add(section, "--key-path", type=os.path.abspath,
help="Path to private key for certificate installation "
diff --git a/certbot/certbot/_internal/cli/plugins_parsing.py b/certbot/certbot/_internal/cli/plugins_parsing.py
index bbfdf22da..f0a976bf4 100644
--- a/certbot/certbot/_internal/cli/plugins_parsing.py
+++ b/certbot/certbot/_internal/cli/plugins_parsing.py
@@ -1,8 +1,15 @@
"""This is a module that handles parsing of plugins for the argument parser"""
+from typing import TYPE_CHECKING
+
from certbot._internal.cli.cli_utils import flag_default
+from certbot._internal.plugins import disco
+
+if TYPE_CHECKING:
+ from certbot._internal.cli import helpful
-def _plugins_parsing(helpful, plugins):
+def _plugins_parsing(helpful: "helpful.HelpfulArgumentParser",
+ plugins: disco.PluginsRegistry) -> None:
# It's nuts, but there are two "plugins" topics. Somehow this works
helpful.add_group(
"plugins", description="Plugin Selection: Certbot client supports an "
diff --git a/certbot/certbot/_internal/cli/subparsers.py b/certbot/certbot/_internal/cli/subparsers.py
index d872cf71a..5e3304ba1 100644
--- a/certbot/certbot/_internal/cli/subparsers.py
+++ b/certbot/certbot/_internal/cli/subparsers.py
@@ -1,4 +1,6 @@
"""This module creates subparsers for the argument parser"""
+from typing import TYPE_CHECKING
+
from certbot import interfaces
from certbot._internal import constants
from certbot._internal.cli.cli_utils import _EncodeReasonAction
@@ -7,8 +9,11 @@ from certbot._internal.cli.cli_utils import CaseInsensitiveList
from certbot._internal.cli.cli_utils import flag_default
from certbot._internal.cli.cli_utils import read_file
+if TYPE_CHECKING:
+ from certbot._internal.cli import helpful
+
-def _create_subparsers(helpful):
+def _create_subparsers(helpful: "helpful.HelpfulArgumentParser") -> None:
from certbot._internal.client import sample_user_agent # avoid import loops
helpful.add(
None, "--user-agent", default=flag_default("user_agent"),
diff --git a/certbot/certbot/_internal/client.py b/certbot/certbot/_internal/client.py
index f5793c895..4c472df55 100644
--- a/certbot/certbot/_internal/client.py
+++ b/certbot/certbot/_internal/client.py
@@ -2,14 +2,13 @@
import datetime
import logging
import platform
+from typing import cast
from typing import Any
from typing import Callable
-from typing import cast
from typing import Dict
from typing import IO
from typing import List
from typing import Optional
-from typing import Set
from typing import Tuple
from typing import Union
import warnings
@@ -241,10 +240,8 @@ def perform_registration(acme: acme_client.ClientV2, config: configuration.Names
raise errors.Error(msg)
try:
- # TODO: Remove the cast once certbot package is fully typed
newreg = messages.NewRegistration.from_data(
- email=config.email,
- external_account_binding=cast(Optional[messages.ExternalAccountBinding], eab))
+ email=config.email, external_account_binding=eab)
# Until ACME v1 support is removed from Certbot, we actually need the provided
# ACME client to be a wrapper of type BackwardsCompatibleClientV2.
# TODO: Remove this cast and rewrite the logic when the client is actually a ClientV2
@@ -416,8 +413,7 @@ class Client:
elliptic_curve=elliptic_curve,
strict_permissions=self.config.strict_permissions,
)
- # TODO: Remove the cast once certbot package is fully typed
- csr = crypto_util.generate_csr(key, cast(Set[str], domains), self.config.csr_dir,
+ csr = crypto_util.generate_csr(key, domains, self.config.csr_dir,
self.config.must_staple, self.config.strict_permissions)
orderr = self._get_order_and_authorizations(csr.data, self.config.allow_subset_of_names)
@@ -668,8 +664,7 @@ class Client:
with error_handler.ErrorHandler(self._recovery_routine_with_msg, None):
for dom in domains:
try:
- # TODO: Remove the cast once certbot package is fully typed
- self.installer.enhance(dom, enhancement, cast(Optional[List[str]], options))
+ self.installer.enhance(dom, enhancement, options)
except errors.PluginEnhancementAlreadyPresent:
logger.info("Enhancement %s was already set.", enh_label)
except errors.PluginError:
diff --git a/certbot/certbot/_internal/display/completer.py b/certbot/certbot/_internal/display/completer.py
index b43859b19..821aba780 100644
--- a/certbot/certbot/_internal/display/completer.py
+++ b/certbot/certbot/_internal/display/completer.py
@@ -1,8 +1,14 @@
"""Provides Tab completion when prompting users for a path."""
import glob
+from types import TracebackType
from typing import Callable
from typing import Iterator
from typing import Optional
+from typing import Type
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from typing_extensions import Literal
# readline module is not available on all systems
try:
@@ -28,12 +34,12 @@ class Completer:
"""
- def __init__(self):
+ def __init__(self) -> None:
self._iter: Iterator[str]
self._original_completer: Optional[Callable]
self._original_delims: str
- def complete(self, text, state):
+ def complete(self, text: str, state: int) -> Optional[str]:
"""Provides path completion for use with readline.
:param str text: text to offer completions for
@@ -48,7 +54,7 @@ class Completer:
self._iter = glob.iglob(text + '*')
return next(self._iter, None)
- def __enter__(self):
+ def __enter__(self) -> None:
self._original_completer = readline.get_completer()
self._original_delims = readline.get_completer_delims()
@@ -62,6 +68,9 @@ class Completer:
else:
readline.parse_and_bind('tab: complete')
- def __exit__(self, unused_type, unused_value, unused_traceback):
+ def __exit__(self, unused_type: Optional[Type[BaseException]],
+ unused_value: Optional[BaseException],
+ unused_traceback: Optional[TracebackType]) -> 'Literal[False]':
readline.set_completer_delims(self._original_delims)
readline.set_completer(self._original_completer)
+ return False
diff --git a/certbot/certbot/_internal/display/dummy_readline.py b/certbot/certbot/_internal/display/dummy_readline.py
index fb3d807bb..2b6e4c310 100644
--- a/certbot/certbot/_internal/display/dummy_readline.py
+++ b/certbot/certbot/_internal/display/dummy_readline.py
@@ -1,21 +1,25 @@
"""A dummy module with no effect for use on systems without readline."""
+from typing import Callable
+from typing import Iterable
+from typing import List
+from typing import Optional
-def get_completer():
+def get_completer() -> Optional[Callable[[], str]]:
"""An empty implementation of readline.get_completer."""
-def get_completer_delims():
+def get_completer_delims() -> List[str]:
"""An empty implementation of readline.get_completer_delims."""
-def parse_and_bind(unused_command):
+def parse_and_bind(unused_command: str) -> None:
"""An empty implementation of readline.parse_and_bind."""
-def set_completer(unused_function=None):
+def set_completer(unused_function: Optional[Callable[[], str]] = None) -> None:
"""An empty implementation of readline.set_completer."""
-def set_completer_delims(unused_delims):
+def set_completer_delims(unused_delims: Iterable[str]) -> None:
"""An empty implementation of readline.set_completer_delims."""
diff --git a/certbot/certbot/_internal/display/obj.py b/certbot/certbot/_internal/display/obj.py
index b30587b4e..39b737e80 100644
--- a/certbot/certbot/_internal/display/obj.py
+++ b/certbot/certbot/_internal/display/obj.py
@@ -1,7 +1,13 @@
"""This modules define the actual display implementations used in Certbot"""
import logging
import sys
+from typing import Any
+from typing import Iterable
+from typing import List
from typing import Optional
+from typing import TextIO
+from typing import Tuple
+from typing import TypeVar
from typing import Union
import zope.component
@@ -35,12 +41,14 @@ it as a heading)"""
# Adding a level of indirection causes the lookup of the global _DisplayService
# object to happen first avoiding this potential bug.
class _DisplayService:
- def __init__(self):
+ def __init__(self) -> None:
self.display: Optional[Union[FileDisplay, NoninteractiveDisplay]] = None
_SERVICE = _DisplayService()
+T = TypeVar("T")
+
# This use of IDisplay can be removed when this class is no longer accessible
# through the public API in certbot.display.util.
@@ -49,15 +57,14 @@ class FileDisplay:
"""File-based display."""
# see https://github.com/certbot/certbot/issues/3915
- def __init__(self, outfile, force_interactive):
+ def __init__(self, outfile: TextIO, force_interactive: bool) -> None:
super().__init__()
self.outfile = outfile
self.force_interactive = force_interactive
self.skipped_interaction = False
- def notification(self, message, pause=True,
- wrap=True, force_interactive=False,
- decorate=True):
+ def notification(self, message: str, pause: bool = True, wrap: bool = True,
+ force_interactive: bool = False, decorate: bool = True) -> None:
"""Displays a notification and waits for user acceptance.
:param str message: Message to display
@@ -89,9 +96,11 @@ class FileDisplay:
else:
logger.debug("Not pausing for user confirmation")
- def menu(self, message, choices, ok_label=None, cancel_label=None, # pylint: disable=unused-argument
- help_label=None, default=None, # pylint: disable=unused-argument
- cli_flag=None, force_interactive=False, **unused_kwargs):
+ def menu(self, message: str, choices: Union[List[Tuple[str, str]], List[str]],
+ ok_label: Optional[str] = None, cancel_label: Optional[str] = None, # pylint: disable=unused-argument
+ help_label: Optional[str] = None, default: Optional[int] = None, # pylint: disable=unused-argument
+ cli_flag: Optional[str] = None, force_interactive: bool = False,
+ **unused_kwargs: Any) -> Tuple[str, int]:
"""Display a menu.
.. todo:: This doesn't enable the help label/button (I wasn't sold on
@@ -113,8 +122,9 @@ class FileDisplay:
:rtype: tuple
"""
- if self._return_default(message, default, cli_flag, force_interactive):
- return OK, default
+ return_default = self._return_default(message, default, cli_flag, force_interactive)
+ if return_default is not None:
+ return OK, return_default
self._print_menu(message, choices)
@@ -122,8 +132,8 @@ class FileDisplay:
return code, selection - 1
- def input(self, message, default=None,
- cli_flag=None, force_interactive=False, **unused_kwargs):
+ def input(self, message: str, default: Optional[str] = None, cli_flag: Optional[str] = None,
+ force_interactive: bool = False, **unused_kwargs: Any) -> Tuple[str, str]:
"""Accept input from the user.
:param str message: message to display to the user
@@ -138,8 +148,9 @@ class FileDisplay:
:rtype: tuple
"""
- if self._return_default(message, default, cli_flag, force_interactive):
- return OK, default
+ return_default = self._return_default(message, default, cli_flag, force_interactive)
+ if return_default is not None:
+ return OK, return_default
# Trailing space must be added outside of util.wrap_lines to
# be preserved
@@ -150,8 +161,9 @@ class FileDisplay:
return CANCEL, "-1"
return OK, ans
- def yesno(self, message, yes_label="Yes", no_label="No", default=None,
- cli_flag=None, force_interactive=False, **unused_kwargs):
+ def yesno(self, message: str, yes_label: str = "Yes", no_label: str = "No",
+ default: Optional[bool] = None, cli_flag: Optional[str] = None,
+ force_interactive: bool = False, **unused_kwargs: Any) -> bool:
"""Query the user with a yes/no question.
Yes and No label must begin with different letters, and must contain at
@@ -169,8 +181,9 @@ class FileDisplay:
:rtype: bool
"""
- if self._return_default(message, default, cli_flag, force_interactive):
- return default
+ return_default = self._return_default(message, default, cli_flag, force_interactive)
+ if return_default is not None:
+ return return_default
message = util.wrap_lines(message)
@@ -192,8 +205,9 @@ class FileDisplay:
ans.startswith(no_label[0].upper())):
return False
- def checklist(self, message, tags, default=None,
- cli_flag=None, force_interactive=False, **unused_kwargs):
+ def checklist(self, message: str, tags: List[str], default: Optional[List[str]] = None,
+ cli_flag: Optional[str] = None, force_interactive: bool = False,
+ **unused_kwargs: Any) -> Tuple[str, List[str]]:
"""Display a checklist.
:param str message: Message to display to user
@@ -209,8 +223,9 @@ class FileDisplay:
:rtype: tuple
"""
- if self._return_default(message, default, cli_flag, force_interactive):
- return OK, default
+ return_default = self._return_default(message, default, cli_flag, force_interactive)
+ if return_default is not None:
+ return OK, return_default
while True:
self._print_menu(message, tags)
@@ -233,22 +248,23 @@ class FileDisplay:
else:
return code, []
- def _return_default(self, prompt, default, cli_flag, force_interactive):
+ def _return_default(self, prompt: str, default: Optional[T],
+ cli_flag: Optional[str], force_interactive: bool) -> Optional[T]:
"""Should we return the default instead of prompting the user?
:param str prompt: prompt for the user
- :param default: default answer to prompt
+ :param T default: default answer to prompt
:param str cli_flag: command line option for setting an answer
to this question
:param bool force_interactive: if interactivity is forced
- :returns: True if we should return the default without prompting
- :rtype: bool
+ :returns: The default value if we should return it else `None`
+ :rtype: T or `None`
"""
# assert_valid_call(prompt, default, cli_flag, force_interactive)
if self._can_interact(force_interactive):
- return False
+ return None
if default is None:
msg = "Unable to get an answer for the question:\n{0}".format(prompt)
if cli_flag:
@@ -259,9 +275,9 @@ class FileDisplay:
logger.debug(
"Falling back to default %s for the prompt:\n%s",
default, prompt)
- return True
+ return default
- def _can_interact(self, force_interactive):
+ def _can_interact(self, force_interactive: bool) -> bool:
"""Can we safely interact with the user?
:param bool force_interactive: if interactivity is forced
@@ -282,8 +298,9 @@ class FileDisplay:
self.skipped_interaction = True
return False
- def directory_select(self, message, default=None, cli_flag=None,
- force_interactive=False, **unused_kwargs):
+ def directory_select(self, message: str, default: Optional[str] = None,
+ cli_flag: Optional[str] = None, force_interactive: bool = False,
+ **unused_kwargs: Any) -> Tuple[str, str]:
"""Display a directory selection screen.
:param str message: prompt to give the user
@@ -300,7 +317,8 @@ class FileDisplay:
with completer.Completer():
return self.input(message, default, cli_flag, force_interactive)
- def _scrub_checklist_input(self, indices, tags):
+ def _scrub_checklist_input(self, indices: Iterable[Union[str, int]],
+ tags: List[str]) -> List[str]:
"""Validate input and transform indices to appropriate tags.
:param list indices: input
@@ -312,21 +330,22 @@ class FileDisplay:
"""
# They should all be of type int
try:
- indices = [int(index) for index in indices]
+ indices_int = [int(index) for index in indices]
except ValueError:
return []
# Remove duplicates
- indices = list(set(indices))
+ indices_int = list(set(indices_int))
# Check all input is within range
- for index in indices:
+ for index in indices_int:
if index < 1 or index > len(tags):
return []
- # Transform indices to appropriate tags
- return [tags[index - 1] for index in indices]
+ # Transform indices_int to appropriate tags
+ return [tags[index - 1] for index in indices_int]
- def _print_menu(self, message, choices):
+ def _print_menu(self, message: str,
+ choices: Union[List[Tuple[str, str]], List[str]]) -> None:
"""Print a menu on the screen.
:param str message: title of menu
@@ -355,7 +374,7 @@ class FileDisplay:
self.outfile.write(SIDE_FRAME + os.linesep)
self.outfile.flush()
- def _get_valid_int_ans(self, max_):
+ def _get_valid_int_ans(self, max_: int) -> Tuple[str, int]:
"""Get a numerical selection.
:param int max: The maximum entry (len of choices), must be positive
@@ -398,21 +417,23 @@ class FileDisplay:
class NoninteractiveDisplay:
"""A display utility implementation that never asks for interactive user input"""
- def __init__(self, outfile, *unused_args, **unused_kwargs):
+ def __init__(self, outfile: TextIO, *unused_args: Any, **unused_kwargs: Any) -> None:
super().__init__()
self.outfile = outfile
- def _interaction_fail(self, message, cli_flag, extra=""):
- """Error out in case of an attempt to interact in noninteractive mode"""
+ def _interaction_fail(self, message: str, cli_flag: Optional[str],
+ extra: str = "") -> errors.MissingCommandlineFlag:
+ """Return error to raise in case of an attempt to interact in noninteractive mode"""
msg = "Missing command line flag or config entry for this setting:\n"
msg += message
if extra:
msg += "\n" + extra
if cli_flag:
msg += "\n\n(You can set this with the {0} flag)".format(cli_flag)
- raise errors.MissingCommandlineFlag(msg)
+ return errors.MissingCommandlineFlag(msg)
- def notification(self, message, pause=False, wrap=True, decorate=True, **unused_kwargs): # pylint: disable=unused-argument
+ def notification(self, message: str, pause: bool = False, wrap: bool = True, # pylint: disable=unused-argument
+ decorate: bool = True, **unused_kwargs: Any) -> None:
"""Displays a notification without waiting for user acceptance.
:param str message: Message to display to stdout
@@ -434,8 +455,10 @@ class NoninteractiveDisplay:
)
self.outfile.flush()
- def menu(self, message, choices, ok_label=None, cancel_label=None,
- help_label=None, default=None, cli_flag=None, **unused_kwargs):
+ def menu(self, message: str, choices: Union[List[Tuple[str, str]], List[str]],
+ ok_label: Optional[str] = None, cancel_label: Optional[str] = None,
+ help_label: Optional[str] = None, default: Optional[int] = None,
+ cli_flag: Optional[str] = None, **unused_kwargs: Any) -> Tuple[str, int]:
# pylint: disable=unused-argument
"""Avoid displaying a menu.
@@ -454,11 +477,12 @@ class NoninteractiveDisplay:
"""
if default is None:
- self._interaction_fail(message, cli_flag, "Choices: " + repr(choices))
+ raise self._interaction_fail(message, cli_flag, "Choices: " + repr(choices))
return OK, default
- def input(self, message, default=None, cli_flag=None, **unused_kwargs):
+ def input(self, message: str, default: Optional[str] = None, cli_flag: Optional[str] = None,
+ **unused_kwargs: Any) -> Tuple[str, str]:
"""Accept input from the user.
:param str message: message to display to the user
@@ -471,11 +495,12 @@ class NoninteractiveDisplay:
"""
if default is None:
- self._interaction_fail(message, cli_flag)
+ raise self._interaction_fail(message, cli_flag)
return OK, default
- def yesno(self, message, yes_label=None, no_label=None, # pylint: disable=unused-argument
- default=None, cli_flag=None, **unused_kwargs):
+ def yesno(self, message: str, yes_label: Optional[str] = None, no_label: Optional[str] = None, # pylint: disable=unused-argument
+ default: Optional[bool] = None, cli_flag: Optional[str] = None,
+ **unused_kwargs: Any) -> bool:
"""Decide Yes or No, without asking anybody
:param str message: question for the user
@@ -487,11 +512,11 @@ class NoninteractiveDisplay:
"""
if default is None:
- self._interaction_fail(message, cli_flag)
+ raise self._interaction_fail(message, cli_flag)
return default
- def checklist(self, message, tags, default=None,
- cli_flag=None, **unused_kwargs):
+ def checklist(self, message: str, tags: Iterable[str], default: Optional[List[str]] = None,
+ cli_flag: Optional[str] = None, **unused_kwargs: Any) -> Tuple[str, List[str]]:
"""Display a checklist.
:param str message: Message to display to user
@@ -505,11 +530,11 @@ class NoninteractiveDisplay:
"""
if default is None:
- self._interaction_fail(message, cli_flag, "? ".join(tags) + "?")
+ raise self._interaction_fail(message, cli_flag, "? ".join(tags) + "?")
return OK, default
- def directory_select(self, message, default=None,
- cli_flag=None, **unused_kwargs):
+ def directory_select(self, message: str, default: Optional[str] = None,
+ cli_flag: Optional[str] = None, **unused_kwargs: Any) -> Tuple[str, str]:
"""Simulate prompting the user for a directory.
This function returns default if it is not ``None``, otherwise,
diff --git a/certbot/certbot/_internal/display/util.py b/certbot/certbot/_internal/display/util.py
index b9aa132b6..d2115289e 100644
--- a/certbot/certbot/_internal/display/util.py
+++ b/certbot/certbot/_internal/display/util.py
@@ -1,12 +1,13 @@
"""Internal Certbot display utilities."""
-from typing import List
-import textwrap
import sys
+import textwrap
+from typing import List
+from typing import Optional
from certbot.compat import misc
-def wrap_lines(msg):
+def wrap_lines(msg: str) -> str:
"""Format lines nicely to 80 chars.
:param str msg: Original message
@@ -28,7 +29,7 @@ def wrap_lines(msg):
return '\n'.join(fixed_l)
-def parens_around_char(label):
+def parens_around_char(label: str) -> str:
"""Place parens around first character of label.
:param str label: Must contain at least one character
@@ -37,7 +38,7 @@ def parens_around_char(label):
return "({first}){rest}".format(first=label[0], rest=label[1:])
-def input_with_timeout(prompt=None, timeout=36000.0):
+def input_with_timeout(prompt: Optional[str] = None, timeout: float = 36000.0) -> str:
"""Get user input with a timeout.
Behaves the same as the builtin input, however, an error is raised if
@@ -67,7 +68,7 @@ def input_with_timeout(prompt=None, timeout=36000.0):
return line.rstrip('\n')
-def separate_list_input(input_):
+def separate_list_input(input_: str) -> List[str]:
"""Separate a comma or space separated list.
:param str input_: input from the user
@@ -97,10 +98,10 @@ def summarize_domain_list(domains: List[str]) -> str:
if not domains:
return ""
- l = len(domains)
- if l == 1:
+ length = len(domains)
+ if length == 1:
return domains[0]
- elif l == 2:
+ elif length == 2:
return " and ".join(domains)
else:
- return "{0} and {1} more domains".format(domains[0], l-1)
+ return "{0} and {1} more domains".format(domains[0], length-1)
diff --git a/certbot/certbot/_internal/plugins/disco.py b/certbot/certbot/_internal/plugins/disco.py
index 1a8cf22c7..30409aff0 100644
--- a/certbot/certbot/_internal/plugins/disco.py
+++ b/certbot/certbot/_internal/plugins/disco.py
@@ -1,9 +1,14 @@
"""Utilities for plugins discovery and selection."""
-from collections.abc import Mapping
import itertools
import logging
import sys
+from typing import Callable
+from typing import cast
from typing import Dict
+from typing import Iterable
+from typing import Iterator
+from typing import List
+from typing import Mapping
from typing import Optional
from typing import Type
from typing import Union
@@ -13,6 +18,7 @@ import pkg_resources
import zope.interface
import zope.interface.verify
+from certbot import configuration
from certbot import errors
from certbot import interfaces
from certbot._internal import constants
@@ -49,7 +55,7 @@ class PluginEntryPoint:
# this object is mutable, don't allow it to be hashed!
__hash__ = None # type: ignore
- def __init__(self, entry_point: pkg_resources.EntryPoint, with_prefix=False):
+ def __init__(self, entry_point: pkg_resources.EntryPoint, with_prefix: bool = False) -> None:
self.name = self.entry_point_to_plugin_name(entry_point, with_prefix)
self.plugin_cls: Type[interfaces.Plugin] = entry_point.load()
self.entry_point = entry_point
@@ -59,7 +65,7 @@ class PluginEntryPoint:
self._hidden = False
self._long_description: Optional[str] = None
- def check_name(self, name):
+ def check_name(self, name: Optional[str]) -> bool:
"""Check if the name refers to this plugin."""
if name == self.name:
if self.warning_message:
@@ -68,43 +74,46 @@ class PluginEntryPoint:
return False
@classmethod
- def entry_point_to_plugin_name(cls, entry_point, with_prefix):
+ def entry_point_to_plugin_name(cls, entry_point: pkg_resources.EntryPoint,
+ with_prefix: bool) -> str:
"""Unique plugin name for an ``entry_point``"""
if with_prefix:
+ if not entry_point.dist:
+ raise errors.Error(f"Entrypoint {entry_point.name} has no distribution!")
return entry_point.dist.key + ":" + entry_point.name
return entry_point.name
@property
- def description(self):
+ def description(self) -> str:
"""Description of the plugin."""
return self.plugin_cls.description
@property
- def description_with_name(self):
+ def description_with_name(self) -> str:
"""Description with name. Handy for UI."""
return "{0} ({1})".format(self.description, self.name)
@property
- def long_description(self):
+ def long_description(self) -> str:
"""Long description of the plugin."""
if self._long_description:
return self._long_description
return getattr(self.plugin_cls, "long_description", self.description)
@long_description.setter
- def long_description(self, description):
+ def long_description(self, description: str) -> None:
self._long_description = description
@property
- def hidden(self):
+ def hidden(self) -> bool:
"""Should this plugin be hidden from UI?"""
return self._hidden or getattr(self.plugin_cls, "hidden", False)
@hidden.setter
- def hidden(self, hide):
+ def hidden(self, hide: bool) -> None:
self._hidden = hide
- def ifaces(self, *ifaces_groups):
+ def ifaces(self, *ifaces_groups: Iterable[Type]) -> bool:
"""Does plugin implements specified interface groups?"""
return not ifaces_groups or any(
all(_implements(self.plugin_cls, iface)
@@ -112,20 +121,20 @@ class PluginEntryPoint:
for ifaces in ifaces_groups)
@property
- def initialized(self):
+ def initialized(self) -> bool:
"""Has the plugin been initialized already?"""
return self._initialized is not None
- def init(self, config=None):
+ def init(self, config: Optional[configuration.NamespaceConfig] = None) -> interfaces.Plugin:
"""Memoized plugin initialization."""
- if not self.initialized:
+ if not self._initialized:
self.entry_point.require() # fetch extras!
# For plugins implementing ABCs Plugin, Authenticator or Installer, the following
# line will raise an exception if some implementations of abstract methods are missing.
self._initialized = self.plugin_cls(config, self.name)
return self._initialized
- def verify(self, ifaces):
+ def verify(self, ifaces: Iterable[Type]) -> bool:
"""Verify that the plugin conforms to the specified interfaces."""
if not self.initialized:
raise ValueError("Plugin is not initialized.")
@@ -136,13 +145,13 @@ class PluginEntryPoint:
return True
@property
- def prepared(self):
+ def prepared(self) -> bool:
"""Has the plugin been prepared already?"""
if not self.initialized:
logger.debug(".prepared called on uninitialized %r", self)
return self._prepared is not None
- def prepare(self):
+ def prepare(self) -> Union[bool, Error]:
"""Memoized plugin preparation."""
if self._initialized is None:
raise ValueError("Plugin is not initialized.")
@@ -161,29 +170,30 @@ class PluginEntryPoint:
self._prepared = error
else:
self._prepared = True
- return self._prepared
+ # Mypy seems to fail to understand the actual type here, let's help it.
+ return cast(Union[bool, Error], self._prepared)
@property
- def misconfigured(self):
+ def misconfigured(self) -> bool:
"""Is plugin misconfigured?"""
return isinstance(self._prepared, errors.MisconfigurationError)
@property
- def problem(self):
+ def problem(self) -> Optional[Exception]:
"""Return the Exception raised during plugin setup, or None if all is well"""
if isinstance(self._prepared, Exception):
return self._prepared
return None
@property
- def available(self):
+ def available(self) -> bool:
"""Is plugin available, i.e. prepared or misconfigured?"""
return self._prepared is True or self.misconfigured
- def __repr__(self):
+ def __repr__(self) -> str:
return "PluginEntryPoint#{0}".format(self.name)
- def __str__(self):
+ def __str__(self) -> str:
lines = [
"* {0}".format(self.name),
"Description: {0}".format(self.plugin_cls.description),
@@ -205,7 +215,7 @@ class PluginEntryPoint:
class PluginsRegistry(Mapping):
"""Plugins registry."""
- def __init__(self, plugins):
+ def __init__(self, plugins: Mapping[str, PluginEntryPoint]) -> None:
# plugins are sorted so the same order is used between runs.
# This prevents deadlock caused by plugins acquiring a lock
# and ensures at least one concurrent Certbot instance will run
@@ -213,7 +223,7 @@ class PluginsRegistry(Mapping):
self._plugins = dict(sorted(plugins.items()))
@classmethod
- def find_all(cls):
+ def find_all(cls) -> 'PluginsRegistry':
"""Find plugins using setuptools entry points."""
plugins: Dict[str, PluginEntryPoint] = {}
plugin_paths_string = os.getenv('CERTBOT_PLUGIN_PATH')
@@ -245,7 +255,9 @@ class PluginsRegistry(Mapping):
return cls(plugins)
@classmethod
- def _load_entry_point(cls, entry_point, plugins, with_prefix):
+ def _load_entry_point(cls, entry_point: pkg_resources.EntryPoint,
+ plugins: Dict[str, PluginEntryPoint],
+ with_prefix: bool) -> PluginEntryPoint:
plugin_ep = PluginEntryPoint(entry_point, with_prefix)
if plugin_ep.name in plugins:
other_ep = plugins[plugin_ep.name]
@@ -261,47 +273,47 @@ class PluginsRegistry(Mapping):
return plugin_ep
- def __getitem__(self, name):
+ def __getitem__(self, name: str) -> PluginEntryPoint:
return self._plugins[name]
- def __iter__(self):
+ def __iter__(self) -> Iterator[str]:
return iter(self._plugins)
- def __len__(self):
+ def __len__(self) -> int:
return len(self._plugins)
- def init(self, config):
+ def init(self, config: configuration.NamespaceConfig) -> List[interfaces.Plugin]:
"""Initialize all plugins in the registry."""
return [plugin_ep.init(config) for plugin_ep
in self._plugins.values()]
- def filter(self, pred):
+ def filter(self, pred: Callable[[PluginEntryPoint], bool]) -> "PluginsRegistry":
"""Filter plugins based on predicate."""
return type(self)({name: plugin_ep for name, plugin_ep
- in self._plugins.items() if pred(plugin_ep)})
+ in self._plugins.items() if pred(plugin_ep)})
- def visible(self):
+ def visible(self) -> "PluginsRegistry":
"""Filter plugins based on visibility."""
return self.filter(lambda plugin_ep: not plugin_ep.hidden)
- def ifaces(self, *ifaces_groups):
+ def ifaces(self, *ifaces_groups: Iterable[Type]) -> "PluginsRegistry":
"""Filter plugins based on interfaces."""
return self.filter(lambda p_ep: p_ep.ifaces(*ifaces_groups))
- def verify(self, ifaces):
+ def verify(self, ifaces: Iterable[Type]) -> "PluginsRegistry":
"""Filter plugins based on verification."""
return self.filter(lambda p_ep: p_ep.verify(ifaces))
- def prepare(self):
+ def prepare(self) -> List[Union[bool, Error]]:
"""Prepare all plugins in the registry."""
return [plugin_ep.prepare() for plugin_ep in self._plugins.values()]
- def available(self):
+ def available(self) -> "PluginsRegistry":
"""Filter plugins based on availability."""
return self.filter(lambda p_ep: p_ep.available)
# successfully prepared + misconfigured
- def find_init(self, plugin):
+ def find_init(self, plugin: interfaces.Plugin) -> Optional[PluginEntryPoint]:
"""Find an initialized plugin.
This is particularly useful for finding a name for the plugin::
@@ -321,12 +333,12 @@ class PluginsRegistry(Mapping):
return candidates[0]
return None
- def __repr__(self):
+ def __repr__(self) -> str:
return "{0}({1})".format(
self.__class__.__name__, ','.join(
repr(p_ep) for p_ep in self._plugins.values()))
- def __str__(self):
+ def __str__(self) -> str:
if not self._plugins:
return "No plugins"
return "\n\n".join(str(p_ep) for p_ep in self._plugins.values())
diff --git a/certbot/certbot/_internal/plugins/manual.py b/certbot/certbot/_internal/plugins/manual.py
index d2372e7dd..dc45ae271 100644
--- a/certbot/certbot/_internal/plugins/manual.py
+++ b/certbot/certbot/_internal/plugins/manual.py
@@ -1,6 +1,12 @@
"""Manual authenticator plugin"""
import logging
+from typing import Any
+from typing import Callable
from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Tuple
+from typing import Type
from acme import challenges
from certbot import achallenges
@@ -88,7 +94,7 @@ asked to create multiple distinct TXT records with the same name. This is
permitted by DNS standards.)
"""
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.reverter = reverter.Reverter(self.config)
self.reverter.recovery_routine()
@@ -97,14 +103,14 @@ permitted by DNS standards.)
self.subsequent_any_challenge = False
@classmethod
- def add_parser_arguments(cls, add):
+ def add_parser_arguments(cls, add: Callable[..., None]) -> None:
add('auth-hook',
help='Path or command to execute for the authentication script')
add('cleanup-hook',
help='Path or command to execute for the cleanup script')
util.add_deprecated_argument(add, 'public-ip-logging-ok', 0)
- def prepare(self): # pylint: disable=missing-function-docstring
+ def prepare(self) -> None: # pylint: disable=missing-function-docstring
if self.config.noninteractive_mode and not self.conf('auth-hook'):
raise errors.PluginError(
'An authentication script must be provided with --{0} when '
@@ -112,7 +118,7 @@ permitted by DNS standards.)
self.option_name('auth-hook')))
self._validate_hooks()
- def _validate_hooks(self):
+ def _validate_hooks(self) -> None:
if self.config.validate_hooks:
for name in ('auth-hook', 'cleanup-hook'):
hook = self.conf(name)
@@ -120,13 +126,13 @@ permitted by DNS standards.)
hook_prefix = self.option_name(name)[:-len('-hook')]
hooks.validate_hook(hook, hook_prefix)
- def more_info(self): # pylint: disable=missing-function-docstring
+ def more_info(self) -> str: # pylint: disable=missing-function-docstring
return (
'This plugin allows the user to customize setup for domain '
'validation challenges either through shell scripts provided by '
'the user or by performing the setup manually.')
- def auth_hint(self, failed_achalls):
+ def auth_hint(self, failed_achalls: Iterable[achallenges.AnnotatedChallenge]) -> str:
has_chall = lambda cls: any(isinstance(achall.chall, cls) for achall in failed_achalls)
has_dns = has_chall(challenges.DNS01)
@@ -162,11 +168,12 @@ permitted by DNS standards.)
)
)
- def get_chall_pref(self, domain):
+ def get_chall_pref(self, domain: str) -> Iterable[Type[challenges.Challenge]]:
# pylint: disable=unused-argument,missing-function-docstring
return [challenges.HTTP01, challenges.DNS01]
- def perform(self, achalls): # pylint: disable=missing-function-docstring
+ def perform(self, achalls: List[achallenges.AnnotatedChallenge]
+ ) -> List[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring
responses = []
last_dns_achall = 0
for i, achall in enumerate(achalls):
@@ -180,7 +187,8 @@ permitted by DNS standards.)
responses.append(achall.response(achall.account_key))
return responses
- def _perform_achall_with_script(self, achall, achalls):
+ def _perform_achall_with_script(self, achall: achallenges.AnnotatedChallenge,
+ achalls: List[achallenges.AnnotatedChallenge]) -> None:
env = dict(CERTBOT_DOMAIN=achall.domain,
CERTBOT_VALIDATION=achall.validation(achall.account_key),
CERTBOT_ALL_DOMAINS=','.join(one_achall.domain for one_achall in achalls),
@@ -194,7 +202,8 @@ permitted by DNS standards.)
env['CERTBOT_AUTH_OUTPUT'] = out.strip()
self.env[achall] = env
- def _perform_achall_manually(self, achall, last_dns_achall=False):
+ def _perform_achall_manually(self, achall: achallenges.AnnotatedChallenge,
+ last_dns_achall: bool = False) -> None:
validation = achall.validation(achall.account_key)
if isinstance(achall.chall, challenges.HTTP01):
msg = self._HTTP_INSTRUCTIONS.format(
@@ -225,7 +234,7 @@ permitted by DNS standards.)
display_util.notification(msg, wrap=False, force_interactive=True)
self.subsequent_any_challenge = True
- def cleanup(self, achalls): # pylint: disable=missing-function-docstring
+ def cleanup(self, achalls: Iterable[achallenges.AnnotatedChallenge]) -> None: # pylint: disable=missing-function-docstring
if self.conf('cleanup-hook'):
for achall in achalls:
env = self.env.pop(achall)
@@ -235,7 +244,7 @@ permitted by DNS standards.)
self._execute_hook('cleanup-hook', achall.domain)
self.reverter.recovery_routine()
- def _execute_hook(self, hook_name, achall_domain):
+ def _execute_hook(self, hook_name: str, achall_domain: str) -> Tuple[str, str]:
returncode, err, out = misc.execute_command_status(
self.option_name(hook_name), self.conf(hook_name),
env=util.env_no_snap_for_external_calls()
diff --git a/certbot/certbot/_internal/plugins/null.py b/certbot/certbot/_internal/plugins/null.py
index b800c5c39..e79a88bb9 100644
--- a/certbot/certbot/_internal/plugins/null.py
+++ b/certbot/certbot/_internal/plugins/null.py
@@ -1,5 +1,9 @@
"""Null plugin."""
import logging
+from typing import Callable
+from typing import List
+from typing import Optional
+from typing import Union
from certbot import interfaces
from certbot.plugins import common
@@ -14,41 +18,42 @@ class Installer(common.Plugin, interfaces.Installer):
hidden = True
@classmethod
- def add_parser_arguments(cls, add):
+ def add_parser_arguments(cls, add: Callable[..., None]) -> None:
pass
# pylint: disable=missing-function-docstring
- def prepare(self):
+ def prepare(self) -> None:
pass # pragma: no cover
- def more_info(self):
+ def more_info(self) -> str:
return "Installer that doesn't do anything (for testing)."
- def get_all_names(self):
+ def get_all_names(self) -> List[str]:
return []
- def deploy_cert(self, domain, cert_path, key_path,
- chain_path=None, fullchain_path=None):
+ def deploy_cert(self, domain: str, cert_path: str, key_path: str,
+ chain_path: str, fullchain_path: str) -> None:
pass # pragma: no cover
- def enhance(self, domain, enhancement, options=None):
+ def enhance(self, domain: str, enhancement: str,
+ options: Optional[Union[List[str], str]] = None) -> None:
pass # pragma: no cover
- def supported_enhancements(self):
+ def supported_enhancements(self) -> List[str]:
return []
- def save(self, title=None, temporary=False):
+ def save(self, title: Optional[str] = None, temporary: bool = False) -> None:
pass # pragma: no cover
- def rollback_checkpoints(self, rollback=1):
+ def rollback_checkpoints(self, rollback: int = 1) -> None:
pass # pragma: no cover
- def recovery_routine(self):
+ def recovery_routine(self) -> None:
pass # pragma: no cover
- def config_test(self):
+ def config_test(self) -> None:
pass # pragma: no cover
- def restart(self):
+ def restart(self) -> None:
pass # pragma: no cover
diff --git a/certbot/certbot/_internal/plugins/selection.py b/certbot/certbot/_internal/plugins/selection.py
index 0e88e1324..f62db4089 100644
--- a/certbot/certbot/_internal/plugins/selection.py
+++ b/certbot/certbot/_internal/plugins/selection.py
@@ -1,8 +1,13 @@
"""Decide which plugins to use for authentication & installation"""
import logging
+from typing import cast
+from typing import Iterable
+from typing import List
from typing import Optional
from typing import Tuple
+from typing import Type
+from typing import TypeVar
from certbot import configuration
from certbot import errors
@@ -14,32 +19,36 @@ from certbot.display import util as display_util
logger = logging.getLogger(__name__)
-def pick_configurator(
- config, default, plugins,
- question="How would you like to authenticate and install "
- "certificates?"):
+def pick_configurator(config: configuration.NamespaceConfig, default: Optional[str],
+ plugins: disco.PluginsRegistry,
+ question: str = "How would you like to authenticate and install "
+ "certificates?") -> Optional[interfaces.Plugin]:
"""Pick configurator plugin."""
return pick_plugin(
config, default, plugins, question,
(interfaces.Authenticator, interfaces.Installer))
-def pick_installer(config, default, plugins,
- question="How would you like to install certificates?"):
+def pick_installer(config: configuration.NamespaceConfig, default: Optional[str],
+ plugins: disco.PluginsRegistry,
+ question: str = "How would you like to install certificates?"
+ ) -> Optional[interfaces.Installer]:
"""Pick installer plugin."""
- return pick_plugin(
- config, default, plugins, question, (interfaces.Installer,))
+ return pick_plugin(config, default, plugins, question, (interfaces.Installer,))
-def pick_authenticator(
- config, default, plugins, question="How would you "
- "like to authenticate with the ACME CA?"):
+def pick_authenticator(config: configuration.NamespaceConfig, default: Optional[str],
+ plugins: disco.PluginsRegistry,
+ question: str = "How would you "
+ "like to authenticate with the ACME CA?"
+ ) -> Optional[interfaces.Authenticator]:
"""Pick authentication plugin."""
return pick_plugin(
config, default, plugins, question, (interfaces.Authenticator,))
-def get_unprepared_installer(config, plugins):
+def get_unprepared_installer(config: configuration.NamespaceConfig,
+ plugins: disco.PluginsRegistry) -> Optional[interfaces.Installer]:
"""
Get an unprepared interfaces.Installer object.
@@ -69,10 +78,15 @@ def get_unprepared_installer(config, plugins):
"Could not select or initialize the requested installer %s." % req_inst)
-def pick_plugin(config, default, plugins, question, ifaces):
+P = TypeVar('P', bound=interfaces.Plugin)
+
+
+def pick_plugin(config: configuration.NamespaceConfig, default: Optional[str],
+ plugins: disco.PluginsRegistry, question: str,
+ ifaces: Iterable[Type]) -> Optional[P]:
"""Pick plugin.
- :param certbot.configuration.NamespaceConfig: Configuration
+ :param certbot.configuration.NamespaceConfig config: Configuration
:param str default: Plugin name supplied by user or ``None``.
:param certbot._internal.plugins.disco.PluginsRegistry plugins:
All plugins registered as entry points.
@@ -108,22 +122,23 @@ def pick_plugin(config, default, plugins, question, ifaces):
if len(prepared) > 1:
logger.debug("Multiple candidate plugins: %s", prepared)
- plugin_ep = choose_plugin(list(prepared.values()), question)
- if plugin_ep is None:
+ plugin_ep1 = choose_plugin(list(prepared.values()), question)
+ if plugin_ep1 is None:
return None
- return plugin_ep.init()
+ return cast(P, plugin_ep1.init())
elif len(prepared) == 1:
- plugin_ep = list(prepared.values())[0]
- logger.debug("Single candidate plugin: %s", plugin_ep)
- if plugin_ep.misconfigured:
+ plugin_ep2 = list(prepared.values())[0]
+ logger.debug("Single candidate plugin: %s", plugin_ep2)
+ if plugin_ep2.misconfigured:
return None
- return plugin_ep.init()
+ return plugin_ep2.init()
else:
logger.debug("No candidate plugin")
return None
-def choose_plugin(prepared, question):
+def choose_plugin(prepared: List[disco.PluginEntryPoint],
+ question: str) -> Optional[disco.PluginEntryPoint]:
"""Allow the user to choose their plugin.
:param list prepared: List of `~.PluginEntryPoint`.
@@ -152,17 +167,29 @@ def choose_plugin(prepared, question):
else:
return None
+
noninstaller_plugins = ["webroot", "manual", "standalone", "dns-cloudflare", "dns-cloudxns",
"dns-digitalocean", "dns-dnsimple", "dns-dnsmadeeasy", "dns-gehirn",
"dns-google", "dns-linode", "dns-luadns", "dns-nsone", "dns-ovh",
"dns-rfc2136", "dns-route53", "dns-sakuracloud"]
-def record_chosen_plugins(config, plugins, auth, inst):
- "Update the config entries to reflect the plugins we actually selected."
- config.authenticator = plugins.find_init(auth).name if auth else None
- config.installer = plugins.find_init(inst).name if inst else None
+
+def record_chosen_plugins(config: configuration.NamespaceConfig, plugins: disco.PluginsRegistry,
+ auth: Optional[interfaces.Authenticator],
+ inst: Optional[interfaces.Installer]) -> None:
+ """Update the config entries to reflect the plugins we actually selected."""
+ config.authenticator = None
+ if auth:
+ auth_ep = plugins.find_init(auth)
+ if auth_ep:
+ config.authenticator = auth_ep.name
+ config.installer = None
+ if inst:
+ inst_ep = plugins.find_init(inst)
+ if inst_ep:
+ config.installer = inst_ep.name
logger.info("Plugins selected: Authenticator %s, Installer %s",
- config.authenticator, config.installer)
+ config.authenticator, config.installer)
def choose_configurator_plugins(config: configuration.NamespaceConfig,
@@ -181,7 +208,7 @@ def choose_configurator_plugins(config: configuration.NamespaceConfig,
"""
req_auth, req_inst = cli_plugin_requests(config)
- installer_question = None
+ installer_question = ""
if verb == "enhance":
installer_question = ("Which installer would you like to use to "
@@ -209,11 +236,14 @@ def choose_configurator_plugins(config: configuration.NamespaceConfig,
logger.warning("Specifying an authenticator doesn't make sense when "
"running Certbot with verb \"%s\"", verb)
# Try to meet the user's request and/or ask them to pick plugins
- authenticator = installer = None
+ authenticator: Optional[interfaces.Authenticator] = None
+ installer: Optional[interfaces.Installer] = None
if verb == "run" and req_auth == req_inst:
# Unless the user has explicitly asked for different auth/install,
# only consider offering a single choice
- authenticator = installer = pick_configurator(config, req_inst, plugins)
+ configurator = pick_configurator(config, req_inst, plugins)
+ authenticator = cast(Optional[interfaces.Authenticator], configurator)
+ installer = cast(Optional[interfaces.Installer], configurator)
else:
if need_inst or req_inst:
installer = pick_installer(config, req_inst, plugins, installer_question)
@@ -231,11 +261,11 @@ def choose_configurator_plugins(config: configuration.NamespaceConfig,
return installer, authenticator
-def set_configurator(previously, now):
+def set_configurator(previously: Optional[str], now: Optional[str]) -> Optional[str]:
"""
Setting configurators multiple ways is okay, as long as they all agree
:param str previously: previously identified request for the installer/authenticator
- :param str requested: the request currently being processed
+ :param str now: the request currently being processed
"""
if not now:
# we're not actually setting anything
@@ -247,7 +277,8 @@ def set_configurator(previously, now):
return now
-def cli_plugin_requests(config):
+def cli_plugin_requests(config: configuration.NamespaceConfig
+ ) -> Tuple[Optional[str], Optional[str]]:
"""
Figure out which plugins the user requested with CLI and config options
@@ -302,7 +333,8 @@ def cli_plugin_requests(config):
return req_auth, req_inst
-def diagnose_configurator_problem(cfg_type, requested, plugins):
+def diagnose_configurator_problem(cfg_type: str, requested: Optional[str],
+ plugins: disco.PluginsRegistry) -> None:
"""
Raise the most helpful error message about a plugin being unavailable
diff --git a/certbot/certbot/_internal/plugins/standalone.py b/certbot/certbot/_internal/plugins/standalone.py
index 45c801256..f70d2ed9e 100644
--- a/certbot/certbot/_internal/plugins/standalone.py
+++ b/certbot/certbot/_internal/plugins/standalone.py
@@ -3,14 +3,19 @@ import collections
import errno
import logging
import socket
+from typing import Any
+from typing import Callable
from typing import DefaultDict
from typing import Dict
+from typing import Iterable
from typing import List
+from typing import Mapping
from typing import Set
from typing import Tuple
+from typing import Type
from typing import TYPE_CHECKING
-import OpenSSL
+from OpenSSL import crypto
from acme import challenges
from acme import standalone as acme_standalone
@@ -42,12 +47,15 @@ class ServerManager:
will serve the same URLs!
"""
- def __init__(self, certs, http_01_resources):
- self._instances: Dict[int, acme_standalone.BaseDualNetworkedServers] = {}
+ def __init__(self, certs: Mapping[bytes, Tuple[crypto.PKey, crypto.X509]],
+ http_01_resources: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource]
+ ) -> None:
+ self._instances: Dict[int, acme_standalone.HTTP01DualNetworkedServers] = {}
self.certs = certs
self.http_01_resources = http_01_resources
- def run(self, port, challenge_type, listenaddr=""):
+ def run(self, port: int, challenge_type: Type[challenges.Challenge],
+ listenaddr: str = "") -> acme_standalone.HTTP01DualNetworkedServers:
"""Run ACME server on specified ``port``.
This method is idempotent, i.e. all calls with the same pair of
@@ -81,7 +89,7 @@ class ServerManager:
self._instances[real_port] = servers
return servers
- def stop(self, port):
+ def stop(self, port: int) -> None:
"""Stop ACME server running on the specified ``port``.
:param int port:
@@ -94,7 +102,7 @@ class ServerManager:
instance.shutdown_and_server_close()
del self._instances[port]
- def running(self):
+ def running(self) -> Dict[int, acme_standalone.HTTP01DualNetworkedServers]:
"""Return all running instances.
Once the server is stopped using `stop`, it will not be
@@ -118,7 +126,7 @@ class Authenticator(common.Plugin, interfaces.Authenticator):
description = "Spin up a temporary webserver"
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.served: ServedType = collections.defaultdict(set)
@@ -127,44 +135,49 @@ class Authenticator(common.Plugin, interfaces.Authenticator):
# values, main thread writes). Due to the nature of CPython's
# GIL, the operations are safe, c.f.
# https://docs.python.org/2/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe
- self.certs: Dict[bytes, Tuple[OpenSSL.crypto.PKey, OpenSSL.crypto.X509]] = {}
+ self.certs: Mapping[bytes, Tuple[crypto.PKey, crypto.X509]] = {}
self.http_01_resources: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource] = set()
self.servers = ServerManager(self.certs, self.http_01_resources)
@classmethod
- def add_parser_arguments(cls, add):
+ def add_parser_arguments(cls, add: Callable[..., None]) -> None:
pass # No additional argument for the standalone plugin parser
- def more_info(self): # pylint: disable=missing-function-docstring
+ def more_info(self) -> str: # pylint: disable=missing-function-docstring
return("This authenticator creates its own ephemeral TCP listener "
"on the necessary port in order to respond to incoming "
"http-01 challenges from the certificate authority. Therefore, "
"it does not rely on any existing server program.")
- def prepare(self): # pylint: disable=missing-function-docstring
+ def prepare(self) -> None: # pylint: disable=missing-function-docstring
pass
- def get_chall_pref(self, domain):
+ def get_chall_pref(self, domain: str) -> Iterable[Type[challenges.Challenge]]:
# pylint: disable=unused-argument,missing-function-docstring
return [challenges.HTTP01]
- def perform(self, achalls): # pylint: disable=missing-function-docstring
+ def perform(self, achalls: Iterable[achallenges.AnnotatedChallenge]
+ ) -> List[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring
return [self._try_perform_single(achall) for achall in achalls]
- def _try_perform_single(self, achall):
+ def _try_perform_single(self,
+ achall: achallenges.AnnotatedChallenge) -> challenges.ChallengeResponse:
while True:
try:
return self._perform_single(achall)
except errors.StandaloneBindError as error:
_handle_perform_error(error)
- def _perform_single(self, achall):
+ def _perform_single(self,
+ achall: achallenges.AnnotatedChallenge) -> challenges.ChallengeResponse:
servers, response = self._perform_http_01(achall)
self.served[servers].add(achall)
return response
- def _perform_http_01(self, achall):
+ def _perform_http_01(self, achall: achallenges.AnnotatedChallenge
+ ) -> Tuple[acme_standalone.HTTP01DualNetworkedServers,
+ challenges.ChallengeResponse]:
port = self.config.http01_port
addr = self.config.http01_address
servers = self.servers.run(port, challenges.HTTP01, listenaddr=addr)
@@ -174,7 +187,7 @@ class Authenticator(common.Plugin, interfaces.Authenticator):
self.http_01_resources.add(resource)
return servers, response
- def cleanup(self, achalls): # pylint: disable=missing-function-docstring
+ def cleanup(self, achalls: Iterable[achallenges.AnnotatedChallenge]) -> None: # pylint: disable=missing-function-docstring
# reduce self.served and close servers if no challenges are served
for unused_servers, server_achalls in self.served.items():
for achall in achalls:
@@ -193,7 +206,7 @@ class Authenticator(common.Plugin, interfaces.Authenticator):
"accept inbound connections from the internet.")
-def _handle_perform_error(error):
+def _handle_perform_error(error: errors.StandaloneBindError) -> None:
if error.socket_error.errno == errno.EACCES:
raise errors.PluginError(
"Could not bind TCP port {0} because you don't have "
diff --git a/certbot/certbot/_internal/plugins/webroot.py b/certbot/certbot/_internal/plugins/webroot.py
index 1a4892ac9..bb61d8220 100644
--- a/certbot/certbot/_internal/plugins/webroot.py
+++ b/certbot/certbot/_internal/plugins/webroot.py
@@ -3,10 +3,17 @@ import argparse
import collections
import json
import logging
+from typing import Any
+from typing import Callable
from typing import DefaultDict
from typing import Dict
+from typing import Iterable
from typing import List
+from typing import Optional
+from typing import Sequence
from typing import Set
+from typing import Type
+from typing import Union
from acme import challenges
from certbot import crypto_util
@@ -57,11 +64,11 @@ necessary validation resources to appropriate paths on the file
system. It expects that there is some other HTTP server configured
to serve all files under specified web root ({0})."""
- def more_info(self): # pylint: disable=missing-function-docstring
+ def more_info(self) -> str: # pylint: disable=missing-function-docstring
return self.MORE_INFO.format(self.conf("path"))
@classmethod
- def add_parser_arguments(cls, add):
+ def add_parser_arguments(cls, add: Callable[..., None]) -> None:
add("path", "-w", default=[], action=_WebrootPathAction,
help="public_html / webroot path. This can be specified multiple "
"times to handle different domains; each domain will have "
@@ -78,34 +85,34 @@ to serve all files under specified web root ({0})."""
"file, it needs to be on a single line, like: webroot-map = "
'{"example.com":"/var/www"}.')
- def auth_hint(self, failed_achalls): # pragma: no cover
+ def auth_hint(self, failed_achalls: Iterable[AnnotatedChallenge]) -> str: # pragma: no cover
return ("The Certificate Authority failed to download the temporary challenge files "
"created by Certbot. Ensure that the listed domains serve their content from "
"the provided --webroot-path/-w and that files created there can be downloaded "
"from the internet.")
- def get_chall_pref(self, domain): # pragma: no cover
+ def get_chall_pref(self, domain: str) -> Iterable[Type[challenges.Challenge]]:
# pylint: disable=unused-argument,missing-function-docstring
return [challenges.HTTP01]
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.full_roots: Dict[str, str] = {}
self.performed: DefaultDict[str, Set[AnnotatedChallenge]] = collections.defaultdict(set)
# stack of dirs successfully created by this authenticator
self._created_dirs: List[str] = []
- def prepare(self): # pylint: disable=missing-function-docstring
+ def prepare(self) -> None: # pylint: disable=missing-function-docstring
pass
- def perform(self, achalls): # pylint: disable=missing-function-docstring
+ def perform(self, achalls: Iterable[AnnotatedChallenge]) -> List[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring
self._set_webroots(achalls)
self._create_challenge_dirs()
return [self._perform_single(achall) for achall in achalls]
- def _set_webroots(self, achalls):
+ def _set_webroots(self, achalls: Iterable[AnnotatedChallenge]) -> None:
if self.conf("path"):
webroot_path = self.conf("path")[-1]
logger.info("Using the webroot path %s for all unmatched domains.",
@@ -127,7 +134,7 @@ to serve all files under specified web root ({0})."""
known_webroots.insert(0, new_webroot)
self.conf("map")[achall.domain] = new_webroot
- def _prompt_for_webroot(self, domain, known_webroots):
+ def _prompt_for_webroot(self, domain: str, known_webroots: List[str]) -> Optional[str]:
webroot = None
while webroot is None:
@@ -142,7 +149,8 @@ to serve all files under specified web root ({0})."""
return webroot
- def _prompt_with_webroot_list(self, domain, known_webroots):
+ def _prompt_with_webroot_list(self, domain: str,
+ known_webroots: List[str]) -> Optional[str]:
path_flag = "--" + self.option_name("path")
while True:
@@ -156,7 +164,7 @@ to serve all files under specified web root ({0})."""
"webroot when using the webroot plugin.")
return None if index == 0 else known_webroots[index - 1] # code == display_util.OK
- def _prompt_for_new_webroot(self, domain, allowraise=False):
+ def _prompt_for_new_webroot(self, domain: str, allowraise: bool = False) -> Optional[str]:
code, webroot = ops.validated_directory(
_validate_webroot,
"Input the webroot for {0}:".format(domain),
@@ -169,7 +177,7 @@ to serve all files under specified web root ({0})."""
"webroot when using the webroot plugin.")
return _validate_webroot(webroot) # code == display_util.OK
- def _create_challenge_dirs(self):
+ def _create_challenge_dirs(self) -> None:
path_map = self.conf("map")
if not path_map:
raise errors.PluginError(
@@ -227,10 +235,10 @@ to serve all files under specified web root ({0})."""
with safe_open(web_config_path, mode="w", chmod=0o644) as web_config:
web_config.write(_WEB_CONFIG_CONTENT)
- def _get_validation_path(self, root_path, achall):
+ def _get_validation_path(self, root_path: str, achall: AnnotatedChallenge) -> str:
return os.path.join(root_path, achall.chall.encode("token"))
- def _perform_single(self, achall):
+ def _perform_single(self, achall: AnnotatedChallenge) -> challenges.ChallengeResponse:
response, validation = achall.response_and_validation()
root_path = self.full_roots[achall.domain]
@@ -249,7 +257,7 @@ to serve all files under specified web root ({0})."""
self.performed[root_path].add(achall)
return response
- def cleanup(self, achalls): # pylint: disable=missing-function-docstring
+ def cleanup(self, achalls: Iterable[AnnotatedChallenge]) -> None: # pylint: disable=missing-function-docstring
for achall in achalls:
root_path = self.full_roots.get(achall.domain, None)
if root_path is not None:
@@ -270,7 +278,6 @@ to serve all files under specified web root ({0})."""
logger.info("Not cleaning up the web.config file in %s "
"because it is not generated by Certbot.", root_path)
-
not_removed: List[str] = []
while self._created_dirs:
path = self._created_dirs.pop()
@@ -287,8 +294,12 @@ to serve all files under specified web root ({0})."""
class _WebrootMapAction(argparse.Action):
"""Action class for parsing webroot_map."""
- def __call__(self, parser, namespace, webroot_map, option_string=None):
- for domains, webroot_path in json.loads(webroot_map).items():
+ def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
+ webroot_map: Union[str, Sequence[Any], None],
+ option_string: Optional[str] = None) -> None:
+ if webroot_map is None:
+ return
+ for domains, webroot_path in json.loads(str(webroot_map)).items():
webroot_path = _validate_webroot(webroot_path)
namespace.webroot_map.update(
(d, webroot_path) for d in cli.add_domains(namespace, domains))
@@ -297,11 +308,15 @@ class _WebrootMapAction(argparse.Action):
class _WebrootPathAction(argparse.Action):
"""Action class for parsing webroot_path."""
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._domain_before_webroot = False
- def __call__(self, parser, namespace, webroot_path, option_string=None):
+ def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
+ webroot_path: Union[str, Sequence[Any], None],
+ option_string: Optional[str] = None) -> None:
+ if webroot_path is None:
+ return
if self._domain_before_webroot:
raise errors.PluginError(
"If you specify multiple webroot paths, "
@@ -316,10 +331,10 @@ class _WebrootPathAction(argparse.Action):
elif namespace.domains:
self._domain_before_webroot = True
- namespace.webroot_path.append(_validate_webroot(webroot_path))
+ namespace.webroot_path.append(_validate_webroot(str(webroot_path)))
-def _validate_webroot(webroot_path):
+def _validate_webroot(webroot_path: str) -> str:
"""Validates and returns the absolute path of webroot_path.
:param str webroot_path: path to the webroot directory
diff --git a/tox.ini b/tox.ini
index 554ff5b68..ee3e4d7d2 100644
--- a/tox.ini
+++ b/tox.ini
@@ -20,8 +20,8 @@ install_and_test = python {toxinidir}/tools/install_and_test.py
dns_packages = certbot-dns-cloudflare certbot-dns-cloudxns certbot-dns-digitalocean certbot-dns-dnsimple certbot-dns-dnsmadeeasy certbot-dns-gehirn certbot-dns-google certbot-dns-linode certbot-dns-luadns certbot-dns-nsone certbot-dns-ovh certbot-dns-rfc2136 certbot-dns-route53 certbot-dns-sakuracloud
win_all_packages = acme[test] certbot[test] {[base]dns_packages} certbot-nginx
all_packages = {[base]win_all_packages} certbot-apache
-fully_typed_source_paths = acme/acme
-partially_typed_source_paths = certbot/certbot certbot-ci/certbot_integration_tests certbot-apache/certbot_apache certbot-compatibility-test/certbot_compatibility_test certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudxns/certbot_dns_cloudxns certbot-dns-digitalocean/certbot_dns_digitalocean certbot-dns-dnsimple/certbot_dns_dnsimple certbot-dns-dnsmadeeasy/certbot_dns_dnsmadeeasy certbot-dns-gehirn/certbot_dns_gehirn certbot-dns-google/certbot_dns_google certbot-dns-linode/certbot_dns_linode certbot-dns-luadns/certbot_dns_luadns certbot-dns-nsone/certbot_dns_nsone certbot-dns-ovh/certbot_dns_ovh certbot-dns-rfc2136/certbot_dns_rfc2136 certbot-dns-route53/certbot_dns_route53 certbot-dns-sakuracloud/certbot_dns_sakuracloud certbot-nginx/certbot_nginx tests/lock_test.py
+fully_typed_source_paths = acme/acme certbot/certbot
+partially_typed_source_paths = certbot-ci/certbot_integration_tests certbot-apache/certbot_apache certbot-compatibility-test/certbot_compatibility_test certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudxns/certbot_dns_cloudxns certbot-dns-digitalocean/certbot_dns_digitalocean certbot-dns-dnsimple/certbot_dns_dnsimple certbot-dns-dnsmadeeasy/certbot_dns_dnsmadeeasy certbot-dns-gehirn/certbot_dns_gehirn certbot-dns-google/certbot_dns_google certbot-dns-linode/certbot_dns_linode certbot-dns-luadns/certbot_dns_luadns certbot-dns-nsone/certbot_dns_nsone certbot-dns-ovh/certbot_dns_ovh certbot-dns-rfc2136/certbot_dns_rfc2136 certbot-dns-route53/certbot_dns_route53 certbot-dns-sakuracloud/certbot_dns_sakuracloud certbot-nginx/certbot_nginx tests/lock_test.py
[testenv]
passenv =