Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMads Jensen <mje@inducks.org>2022-01-21 12:15:48 +0300
committerGitHub <noreply@github.com>2022-01-21 12:15:48 +0300
commit7d9e9a49005de7961e84d2a7c608db57dbab3046 (patch)
tree1cab86eb579eb6644a9daf1733f8a5b3007fc630
parentafc5be5abeeee60560113c5eda0abfeaf1e04be8 (diff)
Add typing to certbot.apache (#9071)
* Add typing to certbot.apache Co-authored-by: Adrien Ferrand <ferrand.ad@gmail.com>
-rw-r--r--certbot-apache/certbot_apache/_internal/apache_util.py43
-rw-r--r--certbot-apache/certbot_apache/_internal/apacheparser.py88
-rw-r--r--certbot-apache/certbot_apache/_internal/assertions.py20
-rw-r--r--certbot-apache/certbot_apache/_internal/augeasparser.py155
-rw-r--r--certbot-apache/certbot_apache/_internal/configurator.py430
-rw-r--r--certbot-apache/certbot_apache/_internal/constants.py30
-rw-r--r--certbot-apache/certbot_apache/_internal/display_ops.py16
-rw-r--r--certbot-apache/certbot_apache/_internal/dualparser.py74
-rw-r--r--certbot-apache/certbot_apache/_internal/entrypoint.py5
-rw-r--r--certbot-apache/certbot_apache/_internal/http_01.py28
-rw-r--r--certbot-apache/certbot_apache/_internal/interfaces.py87
-rw-r--r--certbot-apache/certbot_apache/_internal/obj.py40
-rw-r--r--certbot-apache/certbot_apache/_internal/override_centos.py34
-rw-r--r--certbot-apache/certbot_apache/_internal/override_debian.py10
-rw-r--r--certbot-apache/certbot_apache/_internal/override_fedora.py17
-rw-r--r--certbot-apache/certbot_apache/_internal/override_gentoo.py17
-rw-r--r--certbot-apache/certbot_apache/_internal/parser.py137
-rw-r--r--certbot-apache/certbot_apache/_internal/parsernode_util.py15
-rw-r--r--certbot-apache/tests/parser_test.py20
-rw-r--r--certbot-apache/tests/util.py2
-rw-r--r--certbot-compatibility-test/certbot_compatibility_test/configurators/common.py2
-rw-r--r--certbot/certbot/_internal/client.py6
-rw-r--r--certbot/certbot/interfaces.py2
-rw-r--r--certbot/certbot/plugins/common.py9
-rw-r--r--certbot/certbot/tests/util.py2
25 files changed, 730 insertions, 559 deletions
diff --git a/certbot-apache/certbot_apache/_internal/apache_util.py b/certbot-apache/certbot_apache/_internal/apache_util.py
index 39329a97e..850bdb7f5 100644
--- a/certbot-apache/certbot_apache/_internal/apache_util.py
+++ b/certbot-apache/certbot_apache/_internal/apache_util.py
@@ -4,6 +4,13 @@ import fnmatch
import logging
import re
import subprocess
+from typing import Any
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
import pkg_resources
@@ -14,7 +21,7 @@ from certbot.compat import os
logger = logging.getLogger(__name__)
-def get_mod_deps(mod_name):
+def get_mod_deps(mod_name: str) -> Any:
"""Get known module dependencies.
.. note:: This does not need to be accurate in order for the client to
@@ -33,7 +40,7 @@ def get_mod_deps(mod_name):
return deps.get(mod_name, [])
-def get_file_path(vhost_path):
+def get_file_path(vhost_path: str) -> Optional[str]:
"""Get file path from augeas_vhost_path.
Takes in Augeas path and returns the file name
@@ -50,7 +57,7 @@ def get_file_path(vhost_path):
return _split_aug_path(vhost_path)[0]
-def get_internal_aug_path(vhost_path):
+def get_internal_aug_path(vhost_path: str) -> str:
"""Get the Augeas path for a vhost with the file path removed.
:param str vhost_path: Augeas virtual host path
@@ -62,7 +69,7 @@ def get_internal_aug_path(vhost_path):
return _split_aug_path(vhost_path)[1]
-def _split_aug_path(vhost_path):
+def _split_aug_path(vhost_path: str) -> Tuple[str, str]:
"""Splits an Augeas path into a file path and an internal path.
After removing "/files", this function splits vhost_path into the
@@ -76,7 +83,7 @@ def _split_aug_path(vhost_path):
"""
# Strip off /files
file_path = vhost_path[6:]
- internal_path = []
+ internal_path: List[str] = []
# Remove components from the end of file_path until it becomes valid
while not os.path.exists(file_path):
@@ -86,7 +93,7 @@ def _split_aug_path(vhost_path):
return file_path, "/".join(reversed(internal_path))
-def parse_define_file(filepath, varname):
+def parse_define_file(filepath: str, varname: str) -> Dict[str, str]:
""" Parses Defines from a variable in configuration file
:param str filepath: Path of file to parse
@@ -96,7 +103,7 @@ def parse_define_file(filepath, varname):
:rtype: `dict`
"""
- return_vars = {}
+ return_vars: Dict[str, str] = {}
# Get list of words in the variable
a_opts = util.get_var_from_file(varname, filepath).split()
for i, v in enumerate(a_opts):
@@ -111,19 +118,19 @@ def parse_define_file(filepath, varname):
return return_vars
-def unique_id():
+def unique_id() -> str:
""" Returns an unique id to be used as a VirtualHost identifier"""
return binascii.hexlify(os.urandom(16)).decode("utf-8")
-def included_in_paths(filepath, paths):
+def included_in_paths(filepath: str, paths: Iterable[str]) -> bool:
"""
Returns true if the filepath is included in the list of paths
that may contain full paths or wildcard paths that need to be
expanded.
:param str filepath: Filepath to check
- :params list paths: List of paths to check against
+ :param list paths: List of paths to check against
:returns: True if included
:rtype: bool
@@ -132,7 +139,7 @@ def included_in_paths(filepath, paths):
return any(fnmatch.fnmatch(filepath, path) for path in paths)
-def parse_defines(apachectl):
+def parse_defines(apachectl: str) -> Dict[str, str]:
"""
Gets Defines from httpd process and returns a dictionary of
the defined variables.
@@ -143,7 +150,7 @@ def parse_defines(apachectl):
:rtype: dict
"""
- variables = {}
+ variables: Dict[str, str] = {}
define_cmd = [apachectl, "-t", "-D",
"DUMP_RUN_CFG"]
matches = parse_from_subprocess(define_cmd, r"Define: ([^ \n]*)")
@@ -161,7 +168,7 @@ def parse_defines(apachectl):
return variables
-def parse_includes(apachectl):
+def parse_includes(apachectl: str) -> List[str]:
"""
Gets Include directives from httpd process and returns a list of
their values.
@@ -172,11 +179,11 @@ def parse_includes(apachectl):
:rtype: list of str
"""
- inc_cmd = [apachectl, "-t", "-D", "DUMP_INCLUDES"]
+ inc_cmd: List[str] = [apachectl, "-t", "-D", "DUMP_INCLUDES"]
return parse_from_subprocess(inc_cmd, r"\(.*\) (.*)")
-def parse_modules(apachectl):
+def parse_modules(apachectl: str) -> List[str]:
"""
Get loaded modules from httpd process, and return the list
of loaded module names.
@@ -191,7 +198,7 @@ def parse_modules(apachectl):
return parse_from_subprocess(mod_cmd, r"(.*)_module")
-def parse_from_subprocess(command, regexp):
+def parse_from_subprocess(command: List[str], regexp: str) -> List[str]:
"""Get values from stdout of subprocess command
:param list command: Command to run
@@ -205,7 +212,7 @@ def parse_from_subprocess(command, regexp):
return re.compile(regexp).findall(stdout)
-def _get_runtime_cfg(command):
+def _get_runtime_cfg(command: Sequence[str]) -> str:
"""
Get runtime configuration info.
@@ -241,7 +248,7 @@ def _get_runtime_cfg(command):
return stdout
-def find_ssl_apache_conf(prefix):
+def find_ssl_apache_conf(prefix: str) -> str:
"""
Find a TLS Apache config file in the dedicated storage.
:param str prefix: prefix of the TLS Apache config file to find
diff --git a/certbot-apache/certbot_apache/_internal/apacheparser.py b/certbot-apache/certbot_apache/_internal/apacheparser.py
index 04fe931c6..3b68eca02 100644
--- a/certbot-apache/certbot_apache/_internal/apacheparser.py
+++ b/certbot-apache/certbot_apache/_internal/apacheparser.py
@@ -1,4 +1,7 @@
""" apacheconfig implementation of the ParserNode interfaces """
+from typing import Any
+from typing import List
+from typing import Optional
from typing import Tuple
from certbot_apache._internal import assertions
@@ -13,20 +16,21 @@ class ApacheParserNode(interfaces.ParserNode):
by parsing the equivalent configuration text using the apacheconfig library.
"""
- def __init__(self, **kwargs):
- ancestor, dirty, filepath, metadata = util.parsernode_kwargs(
- kwargs) # pylint: disable=unused-variable
+ def __init__(self, **kwargs: Any):
+ # pylint: disable=unused-variable
+ ancestor, dirty, filepath, metadata = util.parsernode_kwargs(kwargs)
super().__init__(**kwargs)
- self.ancestor = ancestor
- self.filepath = filepath
- self.dirty = dirty
- self.metadata = metadata
- self._raw = self.metadata["ac_ast"]
+ self.ancestor: str = ancestor
+ self.filepath: str = filepath
+ self.dirty: bool = dirty
+ self.metadata: Any = metadata
+ self._raw: Any = self.metadata["ac_ast"]
- def save(self, msg): # pragma: no cover
- pass
+ def save(self, msg: str) -> None:
+ pass # pragma: no cover
- def find_ancestors(self, name): # pylint: disable=unused-variable
+ # pylint: disable=unused-variable
+ def find_ancestors(self, name: str) -> List["ApacheBlockNode"]:
"""Find ancestor BlockNodes with a given name"""
return [ApacheBlockNode(name=assertions.PASS,
parameters=assertions.PASS,
@@ -38,33 +42,33 @@ class ApacheParserNode(interfaces.ParserNode):
class ApacheCommentNode(ApacheParserNode):
""" apacheconfig implementation of CommentNode interface """
- def __init__(self, **kwargs):
+ def __init__(self, **kwargs: Any):
comment, kwargs = util.commentnode_kwargs(kwargs) # pylint: disable=unused-variable
super().__init__(**kwargs)
self.comment = comment
- def __eq__(self, other): # pragma: no cover
+ def __eq__(self, other: Any):
if isinstance(other, self.__class__):
return (self.comment == other.comment and
self.dirty == other.dirty and
self.ancestor == other.ancestor and
self.metadata == other.metadata and
self.filepath == other.filepath)
- return False
+ return False # pragma: no cover
class ApacheDirectiveNode(ApacheParserNode):
""" apacheconfig implementation of DirectiveNode interface """
- def __init__(self, **kwargs):
+ def __init__(self, **kwargs: Any):
name, parameters, enabled, kwargs = util.directivenode_kwargs(kwargs)
super().__init__(**kwargs)
- self.name = name
- self.parameters = parameters
- self.enabled = enabled
- self.include = None
+ self.name: str = name
+ self.parameters: List[str] = parameters
+ self.enabled: bool = enabled
+ self.include: Optional[str] = None
- def __eq__(self, other): # pragma: no cover
+ def __eq__(self, other: Any) -> bool:
if isinstance(other, self.__class__):
return (self.name == other.name and
self.filepath == other.filepath and
@@ -73,21 +77,21 @@ class ApacheDirectiveNode(ApacheParserNode):
self.dirty == other.dirty and
self.ancestor == other.ancestor and
self.metadata == other.metadata)
- return False
+ return False # pragma: no cover
- def set_parameters(self, _parameters): # pragma: no cover
+ def set_parameters(self, _parameters):
"""Sets the parameters for DirectiveNode"""
- return
+ return # pragma: no cover
class ApacheBlockNode(ApacheDirectiveNode):
""" apacheconfig implementation of BlockNode interface """
- def __init__(self, **kwargs):
+ def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
self.children: Tuple[ApacheParserNode, ...] = ()
- def __eq__(self, other): # pragma: no cover
+ def __eq__(self, other):
if isinstance(other, self.__class__):
return (self.name == other.name and
self.filepath == other.filepath and
@@ -97,10 +101,12 @@ class ApacheBlockNode(ApacheDirectiveNode):
self.dirty == other.dirty and
self.ancestor == other.ancestor and
self.metadata == other.metadata)
- return False
+ return False # pragma: no cover
# pylint: disable=unused-argument
- def add_child_block(self, name, parameters=None, position=None): # pragma: no cover
+ def add_child_block(
+ self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
+ ) -> "ApacheBlockNode": # pragma: no cover
"""Adds a new BlockNode to the sequence of children"""
new_block = ApacheBlockNode(name=assertions.PASS,
parameters=assertions.PASS,
@@ -111,7 +117,9 @@ class ApacheBlockNode(ApacheDirectiveNode):
return new_block
# pylint: disable=unused-argument
- def add_child_directive(self, name, parameters=None, position=None): # pragma: no cover
+ def add_child_directive(
+ self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
+ ) -> ApacheDirectiveNode: # pragma: no cover
"""Adds a new DirectiveNode to the sequence of children"""
new_dir = ApacheDirectiveNode(name=assertions.PASS,
parameters=assertions.PASS,
@@ -122,7 +130,9 @@ class ApacheBlockNode(ApacheDirectiveNode):
return new_dir
# pylint: disable=unused-argument
- def add_child_comment(self, comment="", position=None): # pragma: no cover
+ def add_child_comment(
+ self, name: str, parameters: Optional[int] = None, position: Optional[int] = None
+ ) -> ApacheCommentNode: # pragma: no cover
"""Adds a new CommentNode to the sequence of children"""
new_comment = ApacheCommentNode(comment=assertions.PASS,
@@ -132,7 +142,8 @@ class ApacheBlockNode(ApacheDirectiveNode):
self.children += (new_comment,)
return new_comment
- def find_blocks(self, name, exclude=True): # pylint: disable=unused-argument
+ # pylint: disable=unused-argument
+ def find_blocks(self, name, exclude: bool = True) -> List["ApacheBlockNode"]:
"""Recursive search of BlockNodes from the sequence of children"""
return [ApacheBlockNode(name=assertions.PASS,
parameters=assertions.PASS,
@@ -140,7 +151,8 @@ class ApacheBlockNode(ApacheDirectiveNode):
filepath=assertions.PASS,
metadata=self.metadata)]
- def find_directives(self, name, exclude=True): # pylint: disable=unused-argument
+ # pylint: disable=unused-argument
+ def find_directives(self, name: str, exclude: bool = True) -> List[ApacheDirectiveNode]:
"""Recursive search of DirectiveNodes from the sequence of children"""
return [ApacheDirectiveNode(name=assertions.PASS,
parameters=assertions.PASS,
@@ -149,22 +161,22 @@ class ApacheBlockNode(ApacheDirectiveNode):
metadata=self.metadata)]
# pylint: disable=unused-argument
- def find_comments(self, comment, exact=False): # pragma: no cover
+ def find_comments(self, comment: str, exact: bool = False) -> List[ApacheCommentNode]:
"""Recursive search of DirectiveNodes from the sequence of children"""
- return [ApacheCommentNode(comment=assertions.PASS,
+ return [ApacheCommentNode(comment=assertions.PASS, # pragma: no cover
ancestor=self,
filepath=assertions.PASS,
metadata=self.metadata)]
- def delete_child(self, child): # pragma: no cover
+ def delete_child(self, child: "ApacheBlockNode") -> None:
"""Deletes a ParserNode from the sequence of children"""
- return
+ return # pragma: no cover
- def unsaved_files(self): # pragma: no cover
+ def unsaved_files(self) -> List[str]:
"""Returns a list of unsaved filepaths"""
- return [assertions.PASS]
+ return [assertions.PASS] # pragma: no cover
- def parsed_paths(self): # pragma: no cover
+ def parsed_paths(self) -> List[str]:
"""Returns a list of parsed configuration file paths"""
return [assertions.PASS]
diff --git a/certbot-apache/certbot_apache/_internal/assertions.py b/certbot-apache/certbot_apache/_internal/assertions.py
index ea625d020..9c83444a4 100644
--- a/certbot-apache/certbot_apache/_internal/assertions.py
+++ b/certbot-apache/certbot_apache/_internal/assertions.py
@@ -1,12 +1,13 @@
"""Dual parser node assertions"""
import fnmatch
+from typing import Any
from certbot_apache._internal import interfaces
PASS = "CERTBOT_PASS_ASSERT"
-def assertEqual(first, second):
+def assertEqual(first: Any, second: Any) -> None:
""" Equality assertion """
if isinstance(first, interfaces.CommentNode):
@@ -30,7 +31,8 @@ def assertEqual(first, second):
assert first.filepath == second.filepath
-def assertEqualComment(first, second): # pragma: no cover
+# pragma: no cover
+def assertEqualComment(first: Any, second: Any) -> None:
""" Equality assertion for CommentNode """
assert isinstance(first, interfaces.CommentNode)
@@ -40,7 +42,7 @@ def assertEqualComment(first, second): # pragma: no cover
assert first.comment == second.comment # type: ignore
-def _assertEqualDirectiveComponents(first, second): # pragma: no cover
+def _assertEqualDirectiveComponents(first: Any, second: Any) -> None: # pragma: no cover
""" Handles assertion for instance variables for DirectiveNode and BlockNode"""
# Enabled value cannot be asserted, because Augeas implementation
@@ -53,7 +55,7 @@ def _assertEqualDirectiveComponents(first, second): # pragma: no cover
assert first.parameters == second.parameters
-def assertEqualDirective(first, second):
+def assertEqualDirective(first: Any, second: Any) -> None:
""" Equality assertion for DirectiveNode """
assert isinstance(first, interfaces.DirectiveNode)
@@ -61,7 +63,7 @@ def assertEqualDirective(first, second):
_assertEqualDirectiveComponents(first, second)
-def isPass(value): # pragma: no cover
+def isPass(value) -> bool: # pragma: no cover
"""Checks if the value is set to PASS"""
if isinstance(value, bool):
return True
@@ -73,9 +75,9 @@ def isPassDirective(block):
if isPass(block.name):
return True
- if isPass(block.parameters): # pragma: no cover
+ if isPass(block.parameters): # pragma: no cover
return True
- if isPass(block.filepath): # pragma: no cover
+ if isPass(block.filepath): # pragma: no cover
return True
return False
@@ -90,7 +92,7 @@ def isPassComment(comment):
return False
-def isPassNodeList(nodelist): # pragma: no cover
+def isPassNodeList(nodelist): # pragma: no cover
""" Checks if a ParserNode in the nodelist should pass the assertion,
this function is used for results of find_* methods. Unimplemented find_*
methods should return a sequence containing a single ParserNode instance
@@ -115,7 +117,7 @@ def assertEqualSimple(first, second):
assert first == second
-def isEqualVirtualHost(first, second):
+def isEqualVirtualHost(first, second) -> bool:
"""
Checks that two VirtualHost objects are similar. There are some built
in differences with the implementations: VirtualHost created by ParserNode
diff --git a/certbot-apache/certbot_apache/_internal/augeasparser.py b/certbot-apache/certbot_apache/_internal/augeasparser.py
index 6361d8fc9..d0589b84a 100644
--- a/certbot-apache/certbot_apache/_internal/augeasparser.py
+++ b/certbot-apache/certbot_apache/_internal/augeasparser.py
@@ -64,7 +64,15 @@ Translates over to:
"/files/etc/apache2/apache2.conf/bLoCk[1]",
]
"""
+from typing import Any
+from typing import cast
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Optional
from typing import Set
+from typing import Tuple
+from typing import Union
from certbot import errors
from certbot.compat import os
@@ -78,14 +86,16 @@ from certbot_apache._internal import parsernode_util as util
class AugeasParserNode(interfaces.ParserNode):
""" Augeas implementation of ParserNode interface """
- def __init__(self, **kwargs):
- ancestor, dirty, filepath, metadata = util.parsernode_kwargs(kwargs) # pylint: disable=unused-variable
+ def __init__(self, **kwargs: Any):
+ # pylint: disable=unused-variable
+ ancestor, dirty, filepath, metadata = util.parsernode_kwargs(kwargs)
super().__init__(**kwargs)
- self.ancestor = ancestor
- self.filepath = filepath
- self.dirty = dirty
- self.metadata = metadata
- self.parser = self.metadata.get("augeasparser")
+ self.ancestor: str = ancestor
+ self.filepath: str = filepath
+ self.dirty: bool = dirty
+ self.metadata: Dict[str, Any] = metadata
+ self.parser: parser.ApacheParser = cast(parser.ApacheParser,
+ self.metadata.get("augeasparser"))
try:
if self.metadata["augeaspath"].endswith("/"):
raise errors.PluginError(
@@ -96,10 +106,10 @@ class AugeasParserNode(interfaces.ParserNode):
except KeyError:
raise errors.PluginError("Augeas path is required")
- def save(self, msg):
+ def save(self, msg: Iterable[str]) -> None:
self.parser.save(msg)
- def find_ancestors(self, name):
+ def find_ancestors(self, name: str) -> List["AugeasBlockNode"]:
"""
Searches for ancestor BlockNodes with a given name.
@@ -109,7 +119,7 @@ class AugeasParserNode(interfaces.ParserNode):
:rtype: list of AugeasBlockNode
"""
- ancestors = []
+ ancestors: List[AugeasBlockNode] = []
parent = self.metadata["augeaspath"]
while True:
@@ -124,7 +134,7 @@ class AugeasParserNode(interfaces.ParserNode):
return ancestors
- def _create_blocknode(self, path):
+ def _create_blocknode(self, path: str) -> "AugeasBlockNode":
"""
Helper function to create a BlockNode from Augeas path. This is used by
AugeasParserNode.find_ancestors and AugeasBlockNode.
@@ -132,21 +142,25 @@ class AugeasParserNode(interfaces.ParserNode):
"""
- name = self._aug_get_name(path)
- metadata = {"augeasparser": self.parser, "augeaspath": path}
+ name: str = self._aug_get_name(path)
+ metadata: Dict[str, Union[parser.ApacheParser, str]] = {
+ "augeasparser": self.parser, "augeaspath": path
+ }
# Check if the file was included from the root config or initial state
- enabled = self.parser.parsed_in_original(
- apache_util.get_file_path(path)
- )
+ file_path = apache_util.get_file_path(path)
+ if file_path is None:
+ raise ValueError(f"No file path found for vhost: {path}.") # pragma: no cover
+
+ enabled = self.parser.parsed_in_original(file_path)
return AugeasBlockNode(name=name,
enabled=enabled,
ancestor=assertions.PASS,
- filepath=apache_util.get_file_path(path),
+ filepath=file_path,
metadata=metadata)
- def _aug_get_name(self, path):
+ def _aug_get_name(self, path: str) -> str:
"""
Helper function to get name of a configuration block or variable from path.
"""
@@ -166,12 +180,12 @@ class AugeasParserNode(interfaces.ParserNode):
class AugeasCommentNode(AugeasParserNode):
""" Augeas implementation of CommentNode interface """
- def __init__(self, **kwargs):
+ def __init__(self, **kwargs: Any):
comment, kwargs = util.commentnode_kwargs(kwargs) # pylint: disable=unused-variable
super().__init__(**kwargs)
self.comment = comment
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
if isinstance(other, self.__class__):
return (self.comment == other.comment and
self.filepath == other.filepath and
@@ -184,15 +198,15 @@ class AugeasCommentNode(AugeasParserNode):
class AugeasDirectiveNode(AugeasParserNode):
""" Augeas implementation of DirectiveNode interface """
- def __init__(self, **kwargs):
+ def __init__(self, **kwargs: Any):
name, parameters, enabled, kwargs = util.directivenode_kwargs(kwargs)
super().__init__(**kwargs)
- self.name = name
- self.enabled = enabled
+ self.name: str = name
+ self.enabled: bool = enabled
if parameters:
self.set_parameters(parameters)
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
if isinstance(other, self.__class__):
return (self.name == other.name and
self.filepath == other.filepath and
@@ -203,7 +217,7 @@ class AugeasDirectiveNode(AugeasParserNode):
self.metadata == other.metadata)
return False
- def set_parameters(self, parameters):
+ def set_parameters(self, parameters: List[str]):
"""
Sets parameters of a DirectiveNode or BlockNode object.
@@ -222,7 +236,7 @@ class AugeasDirectiveNode(AugeasParserNode):
self.parser.aug.set(param_path, param)
@property
- def parameters(self):
+ def parameters(self) -> Tuple[Optional[str], ...]:
"""
Fetches the parameters from Augeas tree, ensuring that the sequence always
represents the current state
@@ -232,7 +246,7 @@ class AugeasDirectiveNode(AugeasParserNode):
"""
return tuple(self._aug_get_params(self.metadata["augeaspath"]))
- def _aug_get_params(self, path):
+ def _aug_get_params(self, path: str) -> List[Optional[str]]:
"""Helper function to get parameters for DirectiveNodes and BlockNodes"""
arg_paths = self.parser.aug.match(path + "/arg")
@@ -242,11 +256,11 @@ class AugeasDirectiveNode(AugeasParserNode):
class AugeasBlockNode(AugeasDirectiveNode):
""" Augeas implementation of BlockNode interface """
- def __init__(self, **kwargs):
+ def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
- self.children = ()
+ self.children: Tuple["AugeasBlockNode", ...] = ()
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
if isinstance(other, self.__class__):
return (self.name == other.name and
self.filepath == other.filepath and
@@ -259,21 +273,24 @@ class AugeasBlockNode(AugeasDirectiveNode):
return False
# pylint: disable=unused-argument
- def add_child_block(self, name, parameters=None, position=None): # pragma: no cover
+ def add_child_block(
+ self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
+ ) -> "AugeasBlockNode": # pragma: no cover
"""Adds a new BlockNode to the sequence of children"""
insertpath, realpath, before = self._aug_resolve_child_position(
name,
position
)
- new_metadata = {"augeasparser": self.parser, "augeaspath": realpath}
+ new_metadata: Dict[str, Any] = {"augeasparser": self.parser, "augeaspath": realpath}
# Create the new block
self.parser.aug.insert(insertpath, name, before)
# Check if the file was included from the root config or initial state
- enabled = self.parser.parsed_in_original(
- apache_util.get_file_path(realpath)
- )
+ file_path = apache_util.get_file_path(realpath)
+ if file_path is None:
+ raise errors.Error(f"No file path found for vhost: {realpath}")
+ enabled = self.parser.parsed_in_original(file_path)
# Parameters will be set at the initialization of the new object
return AugeasBlockNode(
@@ -281,12 +298,14 @@ class AugeasBlockNode(AugeasDirectiveNode):
parameters=parameters,
enabled=enabled,
ancestor=assertions.PASS,
- filepath=apache_util.get_file_path(realpath),
+ filepath=file_path,
metadata=new_metadata,
)
# pylint: disable=unused-argument
- def add_child_directive(self, name, parameters=None, position=None): # pragma: no cover
+ def add_child_directive(
+ self, name: str, parameters=None, position=None
+ ) -> "AugeasDirectiveNode": # pragma: no cover
"""Adds a new DirectiveNode to the sequence of children"""
if not parameters:
@@ -303,27 +322,32 @@ class AugeasBlockNode(AugeasDirectiveNode):
# Set the directive key
self.parser.aug.set(realpath, name)
# Check if the file was included from the root config or initial state
- enabled = self.parser.parsed_in_original(
- apache_util.get_file_path(realpath)
- )
+ file_path = apache_util.get_file_path(realpath)
+ if file_path is None:
+ raise errors.Error(f"No file path found for vhost: {realpath}")
+ enabled = self.parser.parsed_in_original(file_path)
return AugeasDirectiveNode(
name=name,
parameters=parameters,
enabled=enabled,
ancestor=assertions.PASS,
- filepath=apache_util.get_file_path(realpath),
+ filepath=file_path,
metadata=new_metadata,
)
- def add_child_comment(self, comment="", position=None):
+ def add_child_comment(
+ self, comment: str = "", position: Optional[int] = None
+ ) -> "AugeasCommentNode":
"""Adds a new CommentNode to the sequence of children"""
insertpath, realpath, before = self._aug_resolve_child_position(
"#comment",
position
)
- new_metadata = {"augeasparser": self.parser, "augeaspath": realpath}
+ new_metadata: Dict[str, Any] = {
+ "augeasparser": self.parser, "augeaspath": realpath,
+ }
# Create the new comment
self.parser.aug.insert(insertpath, "#comment", before)
@@ -337,11 +361,11 @@ class AugeasBlockNode(AugeasDirectiveNode):
metadata=new_metadata,
)
- def find_blocks(self, name, exclude=True):
+ def find_blocks(self, name: str, exclude: bool = True) -> List["AugeasBlockNode"]:
"""Recursive search of BlockNodes from the sequence of children"""
- nodes = []
- paths = self._aug_find_blocks(name)
+ nodes: List["AugeasBlockNode"] = []
+ paths: Iterable[str] = self._aug_find_blocks(name)
if exclude:
paths = self.parser.exclude_dirs(paths)
for path in paths:
@@ -349,7 +373,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
return nodes
- def find_directives(self, name, exclude=True):
+ def find_directives(self, name: str, exclude: bool = True) -> List["AugeasDirectiveNode"]:
"""Recursive search of DirectiveNodes from the sequence of children"""
nodes = []
@@ -368,14 +392,14 @@ class AugeasBlockNode(AugeasDirectiveNode):
return nodes
- def find_comments(self, comment):
+ def find_comments(self, comment: str) -> List["AugeasCommentNode"]:
"""
Recursive search of DirectiveNodes from the sequence of children.
:param str comment: Comment content to search for.
"""
- nodes = []
+ nodes: List["AugeasCommentNode"] = []
ownpath = self.metadata.get("augeaspath")
comments = self.parser.find_comments(comment, start=ownpath)
@@ -384,7 +408,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
return nodes
- def delete_child(self, child):
+ def delete_child(self, child: "AugeasParserNode") -> None:
"""
Deletes a ParserNode from the sequence of children, and raises an
exception if it's unable to do so.
@@ -397,11 +421,11 @@ class AugeasBlockNode(AugeasDirectiveNode):
"seem to exist.").format(child.metadata["augeaspath"])
)
- def unsaved_files(self):
+ def unsaved_files(self) -> Set[str]:
"""Returns a list of unsaved filepaths"""
return self.parser.unsaved_files()
- def parsed_paths(self):
+ def parsed_paths(self) -> List[str]:
"""
Returns a list of file paths that have currently been parsed into the parser
tree. The returned list may include paths with wildcard characters, for
@@ -412,7 +436,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
:returns: list of file paths of files that have been parsed
"""
- res_paths = []
+ res_paths: List[str] = []
paths = self.parser.existing_paths
for directory in paths:
@@ -421,7 +445,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
return res_paths
- def _create_commentnode(self, path):
+ def _create_commentnode(self, path: str) -> "AugeasCommentNode":
"""Helper function to create a CommentNode from Augeas path"""
comment = self.parser.aug.get(path)
@@ -434,14 +458,16 @@ class AugeasBlockNode(AugeasDirectiveNode):
filepath=apache_util.get_file_path(path),
metadata=metadata)
- def _create_directivenode(self, path):
+ def _create_directivenode(self, path: str) -> "AugeasDirectiveNode":
"""Helper function to create a DirectiveNode from Augeas path"""
name = self.parser.get_arg(path)
- metadata = {"augeasparser": self.parser, "augeaspath": path}
+ metadata: Dict[str, Union[parser.ApacheParser, str]] = {
+ "augeasparser": self.parser, "augeaspath": path,
+ }
# Check if the file was included from the root config or initial state
- enabled = self.parser.parsed_in_original(
+ enabled: bool = self.parser.parsed_in_original(
apache_util.get_file_path(path)
)
return AugeasDirectiveNode(name=name,
@@ -450,12 +476,12 @@ class AugeasBlockNode(AugeasDirectiveNode):
filepath=apache_util.get_file_path(path),
metadata=metadata)
- def _aug_find_blocks(self, name):
+ def _aug_find_blocks(self, name: str) -> Set[str]:
"""Helper function to perform a search to Augeas DOM tree to search
configuration blocks with a given name"""
# The code here is modified from configurator.get_virtual_hosts()
- blk_paths = set()
+ blk_paths: Set[str] = set()
for vhost_path in list(self.parser.parser_paths):
paths = self.parser.aug.match(
("/files%s//*[label()=~regexp('%s')]" %
@@ -464,7 +490,8 @@ class AugeasBlockNode(AugeasDirectiveNode):
name.lower() in os.path.basename(path).lower()])
return blk_paths
- def _aug_resolve_child_position(self, name, position):
+ def _aug_resolve_child_position(
+ self, name: str, position: Optional[int]) -> Tuple[str, str, bool]:
"""
Helper function that iterates through the immediate children and figures
out the insertion path for a new AugeasParserNode.
@@ -489,16 +516,16 @@ class AugeasBlockNode(AugeasDirectiveNode):
"""
# Default to appending
- before = False
+ before: bool = False
- all_children = self.parser.aug.match("{}/*".format(
+ all_children: str = self.parser.aug.match("{}/*".format(
self.metadata["augeaspath"])
)
# Calculate resulting_path
# Augeas indices start at 1. We use counter to calculate the index to
# be used in resulting_path.
- counter = 1
+ counter: int = 1
for i, child in enumerate(all_children):
if position is not None and i >= position:
# We're not going to insert the new node to an index after this
@@ -507,7 +534,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
if name == childname:
counter += 1
- resulting_path = "{}/{}[{}]".format(
+ resulting_path: str = "{}/{}[{}]".format(
self.metadata["augeaspath"],
name,
counter
diff --git a/certbot-apache/certbot_apache/_internal/configurator.py b/certbot-apache/certbot_apache/_internal/configurator.py
index 1decdc1a0..73b6a1e1b 100644
--- a/certbot-apache/certbot_apache/_internal/configurator.py
+++ b/certbot-apache/certbot_apache/_internal/configurator.py
@@ -1,26 +1,36 @@
"""Apache Configurator."""
-# pylint: disable=too-many-lines
-from collections import defaultdict
import copy
import fnmatch
import logging
import re
import socket
import time
+# pylint: disable=too-many-lines
+from collections import defaultdict
+from typing import Any
+from typing import Callable
+from typing import cast
from typing import DefaultDict
from typing import Dict
+from typing import Iterable
from typing import List
from typing import Optional
+from typing import Sequence
from typing import Set
+from typing import Tuple
+from typing import Type
from typing import Union
from acme import challenges
+from acme.challenges import Challenge
+from certbot import achallenges
from certbot import errors
from certbot import util
from certbot.achallenges import KeyAuthorizationAnnotatedChallenge
from certbot.compat import filesystem
from certbot.compat import os
from certbot.display import util as display_util
+from certbot.interfaces import RenewableCert
from certbot.plugins import common
from certbot.plugins.enhancements import AutoHSTSEnhancement
from certbot.plugins.util import path_surgery
@@ -32,9 +42,6 @@ from certbot_apache._internal import dualparser
from certbot_apache._internal import http_01
from certbot_apache._internal import obj
from certbot_apache._internal import parser
-from certbot_apache._internal.dualparser import DualBlockNode
-from certbot_apache._internal.obj import VirtualHost
-from certbot_apache._internal.parser import ApacheParser
try:
import apacheconfig
@@ -52,21 +59,21 @@ class OsOptions:
that the Apache configurator needs to be aware to operate properly.
"""
def __init__(self,
- server_root="/etc/apache2",
- vhost_root="/etc/apache2/sites-available",
- vhost_files="*",
- logs_root="/var/log/apache2",
- ctl="apache2ctl",
+ server_root: str = "/etc/apache2",
+ vhost_root: str = "/etc/apache2/sites-available",
+ vhost_files: str = "*",
+ logs_root: str = "/var/log/apache2",
+ ctl: str = "apache2ctl",
version_cmd: Optional[List[str]] = None,
restart_cmd: Optional[List[str]] = None,
restart_cmd_alt: Optional[List[str]] = None,
conftest_cmd: Optional[List[str]] = None,
enmod: Optional[str] = None,
dismod: Optional[str] = None,
- le_vhost_ext="-le-ssl.conf",
- handle_modules=False,
- handle_sites=False,
- challenge_location="/etc/apache2",
+ le_vhost_ext: str = "-le-ssl.conf",
+ handle_modules: bool = False,
+ handle_sites: bool = False,
+ challenge_location: str = "/etc/apache2",
apache_bin: Optional[str] = None,
):
self.server_root = server_root
@@ -132,16 +139,16 @@ class ApacheConfigurator(common.Configurator):
"""
- description = "Apache Web Server plugin"
+ description: str = "Apache Web Server plugin"
if os.environ.get("CERTBOT_DOCS") == "1":
description += ( # pragma: no cover
" (Please note that the default values of the Apache plugin options"
" change depending on the operating system Certbot is run on.)"
)
- OS_DEFAULTS = OsOptions()
+ OS_DEFAULTS: OsOptions = OsOptions()
- def pick_apache_config(self, warn_on_no_mod_ssl=True):
+ def pick_apache_config(self, warn_on_no_mod_ssl: bool = True) -> str:
"""
Pick the appropriate TLS Apache configuration file for current version of Apache and OS.
@@ -159,7 +166,7 @@ class ApacheConfigurator(common.Configurator):
return apache_util.find_ssl_apache_conf("old")
return apache_util.find_ssl_apache_conf("current")
- def _prepare_options(self):
+ def _prepare_options(self) -> None:
"""
Set the values possibly changed by command line parameters to
OS_DEFAULTS constant dictionary
@@ -180,7 +187,7 @@ class ApacheConfigurator(common.Configurator):
self.options.conftest_cmd[0] = self.options.ctl
@classmethod
- def add_parser_arguments(cls, add):
+ def add_parser_arguments(cls, add: Callable):
# When adding, modifying or deleting command line arguments, be sure to
# include the changes in the list used in method _prepare_options() to
# ensure consistent behavior.
@@ -218,7 +225,7 @@ class ApacheConfigurator(common.Configurator):
add("bin", default=DEFAULTS.bin,
help="Full path to apache2/httpd binary")
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any):
"""Initialize an Apache Configurator.
:param tup version: version of Apache as a tuple (2, 4, 7)
@@ -242,35 +249,37 @@ class ApacheConfigurator(common.Configurator):
# Temporary state for AutoHSTS enhancement
self._autohsts: Dict[str, Dict[str, Union[int, float]]] = {}
# Reverter save notes
- self.save_notes = ""
+ self.save_notes: str = ""
# Should we use ParserNode implementation instead of the old behavior
self.USE_PARSERNODE = use_parsernode
# Saves the list of file paths that were parsed initially, and
# not added to parser tree by self.conf("vhost-root") for example.
self.parsed_paths: List[str] = []
# These will be set in the prepare function
- self._prepared = False
- self.parser: ApacheParser
- self.parser_root: Optional[DualBlockNode] = None
+ self._prepared: bool = False
+ self.parser: parser.ApacheParser
+ self.parser_root: Optional[dualparser.DualBlockNode] = None
self.version = version
self._openssl_version = openssl_version
- self.vhosts: List[VirtualHost]
+ self.vhosts: List[obj.VirtualHost]
self.options = copy.deepcopy(self.OS_DEFAULTS)
- self._enhance_func = {"redirect": self._enable_redirect,
- "ensure-http-header": self._set_http_header,
- "staple-ocsp": self._enable_ocsp_stapling}
+ self._enhance_func: Dict[str, Callable] = {
+ "redirect": self._enable_redirect,
+ "ensure-http-header": self._set_http_header,
+ "staple-ocsp": self._enable_ocsp_stapling,
+ }
@property
- def mod_ssl_conf(self):
+ def mod_ssl_conf(self) -> str:
"""Full absolute path to SSL configuration file."""
return os.path.join(self.config.config_dir, constants.MOD_SSL_CONF_DEST)
@property
- def updated_mod_ssl_conf_digest(self):
+ def updated_mod_ssl_conf_digest(self) -> str:
"""Full absolute path to digest of updated SSL configuration file."""
return os.path.join(self.config.config_dir, constants.UPDATED_MOD_SSL_CONF_DIGEST)
- def _open_module_file(self, ssl_module_location):
+ def _open_module_file(self, ssl_module_location: str) -> Optional[bytes]:
"""Extract the open lines of openssl_version for testing purposes"""
try:
with open(ssl_module_location, mode="rb") as f:
@@ -280,7 +289,7 @@ class ApacheConfigurator(common.Configurator):
return None
return contents
- def openssl_version(self, warn_on_no_mod_ssl=True):
+ def openssl_version(self, warn_on_no_mod_ssl: bool = True) -> Optional[str]:
"""Lazily retrieve openssl version
:param bool warn_on_no_mod_ssl: `True` if we should warn if mod_ssl is not found. Set to
@@ -323,7 +332,7 @@ class ApacheConfigurator(common.Configurator):
self._openssl_version = matches[0].decode('UTF-8')
return self._openssl_version
- def prepare(self):
+ def prepare(self) -> None:
"""Prepare the authenticator/installer.
:raises .errors.NoInstallationError: If Apache configs cannot be found
@@ -391,7 +400,7 @@ class ApacheConfigurator(common.Configurator):
" Apache configuration?".format(self.options.server_root))
self._prepared = True
- def save(self, title=None, temporary=False):
+ def save(self, title: Optional[str] = None, temporary: bool = False):
"""Saves all changes to the configuration files.
This function first checks for save errors, if none are found,
@@ -416,7 +425,7 @@ class ApacheConfigurator(common.Configurator):
if title and not temporary:
self.finalize_checkpoint(title)
- def recovery_routine(self):
+ def recovery_routine(self) -> None:
"""Revert all previously modified files.
Reverts all modified files that have not been saved as a checkpoint
@@ -431,7 +440,7 @@ class ApacheConfigurator(common.Configurator):
# TODO: wrap into non-implementation specific parser interface
self.parser.aug.load()
- def revert_challenge_config(self):
+ def revert_challenge_config(self) -> None:
"""Used to cleanup challenge configurations.
:raises .errors.PluginError: If unable to revert the challenge config.
@@ -440,7 +449,7 @@ class ApacheConfigurator(common.Configurator):
self.revert_temporary_config()
self.parser.aug.load()
- def rollback_checkpoints(self, rollback=1):
+ def rollback_checkpoints(self, rollback: int = 1):
"""Rollback saved checkpoints.
:param int rollback: Number of checkpoints to revert
@@ -452,21 +461,20 @@ class ApacheConfigurator(common.Configurator):
super().rollback_checkpoints(rollback)
self.parser.aug.load()
- def _verify_exe_availability(self, exe):
+ def _verify_exe_availability(self, exe: str) -> None:
"""Checks availability of Apache executable"""
if not util.exe_exists(exe):
if not path_surgery(exe):
raise errors.NoInstallationError(
'Cannot find Apache executable {0}'.format(exe))
- def get_parser(self):
+ def get_parser(self) -> parser.ApacheParser:
"""Initializes the ApacheParser"""
# If user provided vhost_root value in command line, use it
return parser.ApacheParser(
- self.options.server_root, self.conf("vhost-root"),
- self.version, configurator=self)
+ self.options.server_root, self, self.conf("vhost-root"), version=self.version)
- def get_parsernode_root(self, metadata):
+ def get_parsernode_root(self, metadata: Dict[str, Any]) -> dualparser.DualBlockNode:
"""Initializes the ParserNode parser root instance."""
if HAS_APACHECONFIG:
@@ -489,8 +497,10 @@ class ApacheConfigurator(common.Configurator):
metadata=metadata,
)
- def deploy_cert(self, domain, cert_path, key_path,
- chain_path=None, fullchain_path=None):
+ def deploy_cert(
+ self, domain: str, cert_path: str, key_path: str,
+ chain_path: Optional[str] = None, fullchain_path: Optional[str] = None
+ ) -> None:
"""Deploys certificate to specified virtual host.
Currently tries to find the last directives to deploy the certificate
@@ -513,7 +523,7 @@ class ApacheConfigurator(common.Configurator):
display_util.notify("Successfully deployed certificate for {} to {}"
.format(domain, vhost.filep))
- def choose_vhosts(self, domain, create_if_no_ssl=True):
+ def choose_vhosts(self, domain: str, create_if_no_ssl: bool = True) -> List[obj.VirtualHost]:
"""
Finds VirtualHosts that can be used with the provided domain
@@ -535,14 +545,14 @@ class ApacheConfigurator(common.Configurator):
else:
return [self.choose_vhost(domain, create_if_no_ssl)]
- def _vhosts_for_wildcard(self, domain):
+ def _vhosts_for_wildcard(self, domain: str) -> List[obj.VirtualHost]:
"""
Get VHost objects for every VirtualHost that the user wants to handle
with the wildcard certificate.
"""
# Collect all vhosts that match the name
- matched = set()
+ matched: Set[obj.VirtualHost] = set()
for vhost in self.vhosts:
for name in vhost.get_names():
if self._in_wildcard_scope(name, domain):
@@ -550,20 +560,20 @@ class ApacheConfigurator(common.Configurator):
return list(matched)
- def _raise_no_suitable_vhost_error(self, target_name: str):
+ def _generate_no_suitable_vhost_error(self, target_name: str) -> errors.PluginError:
"""
Notifies the user that Certbot could not find a vhost to secure
and raises an error.
:param str target_name: The server name that could not be mapped
:raises errors.PluginError: Raised unconditionally
"""
- raise errors.PluginError(
+ return errors.PluginError(
"Certbot could not find a VirtualHost for {0} in the Apache "
"configuration. Please create a VirtualHost with a ServerName "
"matching {0} and try again.".format(target_name)
)
- def _in_wildcard_scope(self, name, domain):
+ def _in_wildcard_scope(self, name: str, domain: str) -> Optional[bool]:
"""
Helper method for _vhosts_for_wildcard() that makes sure that the domain
is in the scope of wildcard domain.
@@ -575,11 +585,12 @@ class ApacheConfigurator(common.Configurator):
return fnmatch.fnmatch(name, domain)
return None
- def _choose_vhosts_wildcard(self, domain, create_ssl=True):
+ def _choose_vhosts_wildcard(self, domain: str, create_ssl: bool = True
+ ) -> List[obj.VirtualHost]:
"""Prompts user to choose vhosts to install a wildcard certificate for"""
# Get all vhosts that are covered by the wildcard domain
- vhosts = self._vhosts_for_wildcard(domain)
+ vhosts: List = self._vhosts_for_wildcard(domain)
# Go through the vhosts, making sure that we cover all the names
# present, but preferring the SSL vhosts
@@ -600,7 +611,7 @@ class ApacheConfigurator(common.Configurator):
dialog_output = display_ops.select_vhost_multiple(list(dialog_input))
if not dialog_output:
- self._raise_no_suitable_vhost_error(domain)
+ raise self._generate_no_suitable_vhost_error(domain)
# Make sure we create SSL vhosts for the ones that are HTTP only
# if requested.
@@ -614,7 +625,10 @@ class ApacheConfigurator(common.Configurator):
self._wildcard_vhosts[domain] = return_vhosts
return return_vhosts
- def _deploy_cert(self, vhost, cert_path, key_path, chain_path, fullchain_path):
+ def _deploy_cert(
+ self, vhost: obj.VirtualHost, cert_path: str, key_path: str,
+ chain_path: Optional[str] = None, fullchain_path: Optional[str] = None,
+ ) -> None:
"""
Helper function for deploy_cert() that handles the actual deployment
this exists because we might want to do multiple deployments per
@@ -634,10 +648,10 @@ class ApacheConfigurator(common.Configurator):
self._add_dummy_ssl_directives(vhost.path)
self._clean_vhost(vhost)
- path = {"cert_path": self.parser.find_dir("SSLCertificateFile",
- None, vhost.path),
- "cert_key": self.parser.find_dir("SSLCertificateKeyFile",
- None, vhost.path)}
+ path: Dict[str, Any] = {
+ "cert_path": self.parser.find_dir("SSLCertificateFile", None, vhost.path),
+ "cert_key": self.parser.find_dir("SSLCertificateKeyFile", None, vhost.path),
+ }
# Only include if a certificate chain is specified
if chain_path is not None:
@@ -680,7 +694,7 @@ class ApacheConfigurator(common.Configurator):
if chain_path is not None:
self.save_notes += "\tSSLCertificateChainFile %s\n" % chain_path
- def choose_vhost(self, target_name, create_if_no_ssl=True):
+ def choose_vhost(self, target_name: str, create_if_no_ssl: bool = True) -> obj.VirtualHost:
"""Chooses a virtual host based on the given domain name.
If there is no clear virtual host to be selected, the user is prompted
@@ -720,32 +734,31 @@ class ApacheConfigurator(common.Configurator):
# to get created if a non-ssl vhost is selected.
return self._choose_vhost_from_list(target_name, temp=not create_if_no_ssl)
- def _choose_vhost_from_list(self, target_name, temp=False):
+ def _choose_vhost_from_list(self, target_name: str,
+ temp: bool = False) -> obj.VirtualHost:
# Select a vhost from a list
vhost = display_ops.select_vhost(target_name, self.vhosts)
if vhost is None:
- self._raise_no_suitable_vhost_error(target_name)
+ raise self._generate_no_suitable_vhost_error(target_name)
if temp:
return vhost
if not vhost.ssl:
addrs = self._get_proposed_addrs(vhost, "443")
# TODO: Conflicts is too conservative
- if not any(vhost.enabled and vhost.conflicts(addrs) for
- vhost in self.vhosts):
+ if not any(vhost.enabled and vhost.conflicts(addrs) for vhost in self.vhosts):
vhost = self.make_vhost_ssl(vhost)
else:
logger.error(
"The selected vhost would conflict with other HTTPS "
"VirtualHosts within Apache. Please select another "
"vhost or add ServerNames to your configuration.")
- raise errors.PluginError(
- "VirtualHost not able to be selected.")
+ raise errors.PluginError("VirtualHost not able to be selected.")
self._add_servername_alias(target_name, vhost)
self.assoc[target_name] = vhost
return vhost
- def domain_in_names(self, names, target_name):
+ def domain_in_names(self, names: Iterable[str], target_name: str) -> bool:
"""Checks if target domain is covered by one or more of the provided
names. The target name is matched by wildcard as well as exact match.
@@ -768,7 +781,9 @@ class ApacheConfigurator(common.Configurator):
return True
return False
- def find_best_http_vhost(self, target, filter_defaults, port="80"):
+ def find_best_http_vhost(
+ self, target: str, filter_defaults: bool, port: str = "80"
+ ) -> Optional[obj.VirtualHost]:
"""Returns non-HTTPS vhost objects found from the Apache config
:param str target: Domain name of the desired VirtualHost
@@ -785,7 +800,10 @@ class ApacheConfigurator(common.Configurator):
filtered_vhosts.append(vhost)
return self._find_best_vhost(target, filtered_vhosts, filter_defaults)
- def _find_best_vhost(self, target_name, vhosts=None, filter_defaults=True):
+ def _find_best_vhost(
+ self, target_name: str, vhosts: List[obj.VirtualHost] = None,
+ filter_defaults: bool = True
+ ) -> Optional[obj.VirtualHost]:
"""Finds the best vhost for a target_name.
This does not upgrade a vhost to HTTPS... it only finds the most
@@ -806,8 +824,8 @@ class ApacheConfigurator(common.Configurator):
# Points 3 - Servername no SSL
# Points 2 - Wildcard no SSL
# Points 1 - Address name with no SSL
- best_candidate = None
- best_points = 0
+ best_candidate: Optional[obj.VirtualHost] = None
+ best_points: int = 0
if vhosts is None:
vhosts = self.vhosts
@@ -846,13 +864,13 @@ class ApacheConfigurator(common.Configurator):
return best_candidate
- def _non_default_vhosts(self, vhosts):
+ def _non_default_vhosts(self, vhosts: List[obj.VirtualHost]) -> List[obj.VirtualHost]:
"""Return all non _default_ only vhosts."""
return [vh for vh in vhosts if not all(
addr.get_addr() == "_default_" for addr in vh.addrs
)]
- def get_all_names(self):
+ def get_all_names(self) -> Set[str]:
"""Returns all names found in the Apache Configuration.
:returns: All ServerNames, ServerAliases, and reverse DNS entries for
@@ -885,7 +903,7 @@ class ApacheConfigurator(common.Configurator):
return util.get_filtered_names(all_names)
- def get_name_from_ip(self, addr):
+ def get_name_from_ip(self, addr: obj.Addr) -> str:
"""Returns a reverse dns name if available.
:param addr: IP Address
@@ -905,7 +923,7 @@ class ApacheConfigurator(common.Configurator):
return ""
- def _get_vhost_names(self, path):
+ def _get_vhost_names(self, path: Optional[str]) -> Tuple[Optional[str], List[Optional[str]]]:
"""Helper method for getting the ServerName and
ServerAlias values from vhost in path
@@ -931,7 +949,7 @@ class ApacheConfigurator(common.Configurator):
return servername, serveraliases
- def _add_servernames(self, host):
+ def _add_servernames(self, host: obj.VirtualHost) -> None:
"""Helper function for get_virtual_hosts().
:param host: In progress vhost whose names will be added
@@ -942,13 +960,13 @@ class ApacheConfigurator(common.Configurator):
servername, serveraliases = self._get_vhost_names(host.path)
for alias in serveraliases:
- if not host.modmacro:
+ if not host.modmacro and alias is not None:
host.aliases.add(alias)
if not host.modmacro:
host.name = servername
- def _create_vhost(self, path):
+ def _create_vhost(self, path: str) -> Optional[obj.VirtualHost]:
"""Used by get_virtual_hosts to create vhost objects
:param str path: Augeas path to virtual host
@@ -957,16 +975,18 @@ class ApacheConfigurator(common.Configurator):
:rtype: :class:`~certbot_apache._internal.obj.VirtualHost`
"""
- addrs = set()
+ addrs: Set[obj.Addr] = set()
try:
args = self.parser.aug.match(path + "/arg")
except RuntimeError:
logger.warning("Encountered a problem while parsing file: %s, skipping", path)
return None
for arg in args:
- addr = obj.Addr.fromstring(self.parser.get_arg(arg))
- if addr:
- addrs.add(addr)
+ arg_value = self.parser.get_arg(arg)
+ if arg_value is not None:
+ addr = obj.Addr.fromstring(arg_value)
+ if addr is not None:
+ addrs.add(addr)
is_ssl = False
if self.parser.find_dir("SSLEngine", "on", start=path, exclude=False):
@@ -994,7 +1014,7 @@ class ApacheConfigurator(common.Configurator):
self._add_servernames(vhost)
return vhost
- def get_virtual_hosts(self):
+ def get_virtual_hosts(self) -> List[obj.VirtualHost]:
"""
Temporary wrapper for legacy and ParserNode version for
get_virtual_hosts. This should be replaced with the ParserNode
@@ -1017,7 +1037,7 @@ class ApacheConfigurator(common.Configurator):
return v2_vhosts
return v1_vhosts
- def get_virtual_hosts_v1(self):
+ def get_virtual_hosts_v1(self) -> List[obj.VirtualHost]:
"""Returns list of virtual hosts found in the Apache configuration.
:returns: List of :class:`~certbot_apache._internal.obj.VirtualHost`
@@ -1070,7 +1090,7 @@ class ApacheConfigurator(common.Configurator):
vhs.append(new_vhost)
return vhs
- def get_virtual_hosts_v2(self):
+ def get_virtual_hosts_v2(self) -> List[obj.VirtualHost]:
"""Returns list of virtual hosts found in the Apache configuration using
ParserNode interface.
:returns: List of :class:`~certbot_apache.obj.VirtualHost`
@@ -1087,10 +1107,10 @@ class ApacheConfigurator(common.Configurator):
vhs.append(self._create_vhost_v2(vhblock))
return vhs
- def _create_vhost_v2(self, node):
+ def _create_vhost_v2(self, node) -> obj.VirtualHost:
"""Used by get_virtual_hosts_v2 to create vhost objects using ParserNode
interfaces.
- :param interfaces.BlockNode node: The BlockNode object of VirtualHost block
+ :param ApacheBlockNode node: The BlockNode object of VirtualHost block
:returns: newly created vhost
:rtype: :class:`~certbot_apache.obj.VirtualHost`
"""
@@ -1121,13 +1141,16 @@ class ApacheConfigurator(common.Configurator):
# Check if the VirtualHost is contained in a mod_macro block
if node.find_ancestors("Macro"):
macro = True
+ # VirtualHost V2 is part of migration to the pure-Python Apache parser project. It is not
+ # used on production as of now.
+ # TODO: Use a meaning full value for augeas path instead of an empty string
vhost = obj.VirtualHost(
- node.filepath, None, addrs, is_ssl, enabled, modmacro=macro, node=node
+ node.filepath, "", addrs, is_ssl, enabled, modmacro=macro, node=node
)
self._populate_vhost_names_v2(vhost)
return vhost
- def _populate_vhost_names_v2(self, vhost):
+ def _populate_vhost_names_v2(self, vhost: obj.VirtualHost):
"""Helper function that populates the VirtualHost names.
:param host: In progress vhost whose names will be added
:type host: :class:`~certbot_apache.obj.VirtualHost`
@@ -1146,7 +1169,7 @@ class ApacheConfigurator(common.Configurator):
vhost.aliases.add(serveralias)
vhost.name = servername
- def is_name_vhost(self, target_addr):
+ def is_name_vhost(self, target_addr: obj.Addr):
"""Returns if vhost is a name based vhost
NameVirtualHost was deprecated in Apache 2.4 as all VirtualHosts are
@@ -1167,7 +1190,7 @@ class ApacheConfigurator(common.Configurator):
return (self.version >= (2, 4) or
self.parser.find_dir("NameVirtualHost", str(target_addr)))
- def add_name_vhost(self, addr):
+ def add_name_vhost(self, addr: obj.Addr):
"""Adds NameVirtualHost directive for given address.
:param addr: Address that will be added as NameVirtualHost directive
@@ -1186,20 +1209,20 @@ class ApacheConfigurator(common.Configurator):
logger.debug(msg)
self.save_notes += msg
- def prepare_server_https(self, port, temp=False):
+ def prepare_server_https(self, port: str, temp: bool = False) -> None:
"""Prepare the server for HTTPS.
Make sure that the ssl_module is loaded and that the server
is appropriately listening on port.
:param str port: Port to listen on
-
+ :param bool temp: If this is just temporary
"""
self.prepare_https_modules(temp)
self.ensure_listen(port, https=True)
- def ensure_listen(self, port, https=False):
+ def ensure_listen(self, port: str, https: bool = False) -> None:
"""Make sure that Apache is listening on the port. Checks if the
Listen statement for the port already exists, and adds it to the
configuration if necessary.
@@ -1218,8 +1241,8 @@ class ApacheConfigurator(common.Configurator):
# Check for Listen <port>
# Note: This could be made to also look for ip:443 combo
- listens = [self.parser.get_arg(x).split()[0] for
- x in self.parser.find_dir("Listen")]
+ directives = [self.parser.get_arg(x) for x in self.parser.find_dir("Listen")]
+ listens = [directive.split()[0] for directive in directives if directive]
# Listen already in place
if self._has_port_already(listens, port):
@@ -1249,7 +1272,7 @@ class ApacheConfigurator(common.Configurator):
else:
self._add_listens_http(listen_dirs, listens, port)
- def _add_listens_http(self, listens, listens_orig, port):
+ def _add_listens_http(self, listens: Set[str], listens_orig: List[str], port: str) -> None:
"""Helper method for ensure_listen to figure out which new
listen statements need adding for listening HTTP on port
@@ -1274,7 +1297,7 @@ class ApacheConfigurator(common.Configurator):
self.save_notes += (f"Added Listen {listen} directive to "
f"{self.parser.loc['listen']}\n")
- def _add_listens_https(self, listens, listens_orig, port):
+ def _add_listens_https(self, listens: Set[str], listens_orig: List[str], port: str):
"""Helper method for ensure_listen to figure out which new
listen statements need adding for listening HTTPS on port
@@ -1307,7 +1330,7 @@ class ApacheConfigurator(common.Configurator):
self.save_notes += (f"Added Listen {listen} directive to "
f"{self.parser.loc['listen']}\n")
- def _has_port_already(self, listens, port):
+ def _has_port_already(self, listens: List[str], port: str) -> Optional[bool]:
"""Helper method for prepare_server_https to find out if user
already has an active Listen statement for the port we need
@@ -1325,7 +1348,7 @@ class ApacheConfigurator(common.Configurator):
return True
return None
- def prepare_https_modules(self, temp):
+ def prepare_https_modules(self, temp: bool) -> None:
"""Helper method for prepare_server_https, taking care of enabling
needed modules
@@ -1347,7 +1370,7 @@ class ApacheConfigurator(common.Configurator):
self.updated_mod_ssl_conf_digest,
warn_on_no_mod_ssl=True)
- def make_vhost_ssl(self, nonssl_vhost):
+ def make_vhost_ssl(self, nonssl_vhost: obj.VirtualHost) -> obj.VirtualHost:
"""Makes an ssl_vhost version of a nonssl_vhost.
Duplicates vhost and adds default ssl options
@@ -1398,7 +1421,6 @@ class ApacheConfigurator(common.Configurator):
raise errors.PluginError(
"Could not reverse map the HTTPS VirtualHost to the original")
-
# Update Addresses
self._update_ssl_vhosts_addrs(vh_p)
@@ -1409,7 +1431,10 @@ class ApacheConfigurator(common.Configurator):
# We know the length is one because of the assertion above
# Create the Vhost object
- ssl_vhost = self._create_vhost(vh_p)
+ vhost = self._create_vhost(vh_p)
+ if not vhost:
+ raise errors.Error("Could not create a vhost") # pragma: no cover
+ ssl_vhost: obj.VirtualHost = vhost
ssl_vhost.ancestor = nonssl_vhost
self.vhosts.append(ssl_vhost)
@@ -1425,7 +1450,7 @@ class ApacheConfigurator(common.Configurator):
return ssl_vhost
- def _get_new_vh_path(self, orig_matches, new_matches):
+ def _get_new_vh_path(self, orig_matches: List[str], new_matches: List[str]) -> Optional[str]:
""" Helper method for make_vhost_ssl for matching augeas paths. Returns
VirtualHost path from new_matches that's not present in orig_matches.
@@ -1439,7 +1464,7 @@ class ApacheConfigurator(common.Configurator):
return match
return None
- def _get_ssl_vhost_path(self, non_ssl_vh_fp):
+ def _get_ssl_vhost_path(self, non_ssl_vh_fp: str) -> str:
""" Get a file path for SSL vhost, uses user defined path as priority,
but if the value is invalid or not defined, will fall back to non-ssl
vhost filepath.
@@ -1461,7 +1486,7 @@ class ApacheConfigurator(common.Configurator):
return fp[:-(len(".conf"))] + self.options.le_vhost_ext
return fp + self.options.le_vhost_ext
- def _sift_rewrite_rule(self, line):
+ def _sift_rewrite_rule(self, line: str) -> bool:
"""Decides whether a line should be copied to a SSL vhost.
A canonical example of when sifting a line is required:
@@ -1494,7 +1519,7 @@ class ApacheConfigurator(common.Configurator):
# Sift line if it redirects the request to a HTTPS site
return target.startswith("https://")
- def _copy_create_ssl_vhost_skeleton(self, vhost, ssl_fp):
+ def _copy_create_ssl_vhost_skeleton(self, vhost: obj.VirtualHost, ssl_fp: str) -> None:
"""Copies over existing Vhost with IfModule mod_ssl.c> skeleton.
:param obj.VirtualHost vhost: Original VirtualHost object
@@ -1512,7 +1537,7 @@ class ApacheConfigurator(common.Configurator):
self.reverter.add_to_checkpoint(files, notes)
else:
self.reverter.register_file_creation(False, ssl_fp)
- sift = False
+ sift: bool
try:
orig_contents = self._get_vhost_block(vhost)
@@ -1541,12 +1566,12 @@ class ApacheConfigurator(common.Configurator):
self.parser.aug.set("/augeas/files%s/mtime" % (self._escape(ssl_fp)), "0")
self.parser.aug.set("/augeas/files%s/mtime" % (self._escape(vhost.filep)), "0")
- def _sift_rewrite_rules(self, contents):
+ def _sift_rewrite_rules(self, contents: Iterable[str]) -> Tuple[List[str], bool]:
""" Helper function for _copy_create_ssl_vhost_skeleton to prepare the
new HTTPS VirtualHost contents. Currently disabling the rewrites """
- result = []
- sift = False
+ result: List[str] = []
+ sift: bool = False
contents = iter(contents)
comment = ("# Some rewrite rules in this file were "
@@ -1602,7 +1627,7 @@ class ApacheConfigurator(common.Configurator):
result.append('\n'.join(chunk))
return result, sift
- def _get_vhost_block(self, vhost):
+ def _get_vhost_block(self, vhost: obj.VirtualHost) -> List[str]:
""" Helper method to get VirtualHost contents from the original file.
This is done with help of augeas span, which returns the span start and
end positions
@@ -1625,7 +1650,7 @@ class ApacheConfigurator(common.Configurator):
self._remove_closing_vhost_tag(vh_contents)
return vh_contents
- def _remove_closing_vhost_tag(self, vh_contents):
+ def _remove_closing_vhost_tag(self, vh_contents: List[str]) -> None:
"""Removes the closing VirtualHost tag if it exists.
This method modifies vh_contents directly to remove the closing
@@ -1644,9 +1669,9 @@ class ApacheConfigurator(common.Configurator):
vh_contents[content_index] = line[:line_index]
break
- def _update_ssl_vhosts_addrs(self, vh_path):
- ssl_addrs = set()
- ssl_addr_p = self.parser.aug.match(vh_path + "/arg")
+ def _update_ssl_vhosts_addrs(self, vh_path: str) -> Set[obj.Addr]:
+ ssl_addrs: Set[obj.Addr] = set()
+ ssl_addr_p: List[str] = self.parser.aug.match(vh_path + "/arg")
for addr in ssl_addr_p:
old_addr = obj.Addr.fromstring(
@@ -1658,7 +1683,7 @@ class ApacheConfigurator(common.Configurator):
return ssl_addrs
- def _clean_vhost(self, vhost):
+ def _clean_vhost(self, vhost: obj.VirtualHost):
# remove duplicated or conflicting ssl directives
self._deduplicate_directives(vhost.path,
["SSLCertificateFile",
@@ -1666,7 +1691,7 @@ class ApacheConfigurator(common.Configurator):
# remove all problematic directives
self._remove_directives(vhost.path, ["SSLCertificateChainFile"])
- def _deduplicate_directives(self, vh_path, directives):
+ def _deduplicate_directives(self, vh_path: Optional[str], directives: List[str]):
for directive in directives:
while len(self.parser.find_dir(directive, None,
vh_path, False)) > 1:
@@ -1674,14 +1699,14 @@ class ApacheConfigurator(common.Configurator):
vh_path, False)
self.parser.aug.remove(re.sub(r"/\w*$", "", directive_path[0]))
- def _remove_directives(self, vh_path, directives):
+ def _remove_directives(self, vh_path: Optional[str], directives: List[str]):
for directive in directives:
while self.parser.find_dir(directive, None, vh_path, False):
directive_path = self.parser.find_dir(directive, None,
vh_path, False)
self.parser.aug.remove(re.sub(r"/\w*$", "", directive_path[0]))
- def _add_dummy_ssl_directives(self, vh_path):
+ def _add_dummy_ssl_directives(self, vh_path: str) -> None:
self.parser.add_dir(vh_path, "SSLCertificateFile",
"insert_cert_file_path")
self.parser.add_dir(vh_path, "SSLCertificateKeyFile",
@@ -1691,7 +1716,7 @@ class ApacheConfigurator(common.Configurator):
if not existing_inc:
self.parser.add_dir(vh_path, "Include", self.mod_ssl_conf)
- def _add_servername_alias(self, target_name, vhost):
+ def _add_servername_alias(self, target_name: str, vhost: obj.VirtualHost) -> None:
vh_path = vhost.path
sname, saliases = self._get_vhost_names(vh_path)
if target_name == sname or target_name in saliases:
@@ -1705,7 +1730,7 @@ class ApacheConfigurator(common.Configurator):
self.parser.add_dir(vh_path, "ServerAlias", target_name)
self._add_servernames(vhost)
- def _has_matching_wildcard(self, vh_path, target_name):
+ def _has_matching_wildcard(self, vh_path: str, target_name: str) -> bool:
"""Is target_name already included in a wildcard in the vhost?
:param str vh_path: Augeas path to the vhost
@@ -1721,7 +1746,7 @@ class ApacheConfigurator(common.Configurator):
aliases = (self.parser.aug.get(match) for match in matches)
return self.domain_in_names(aliases, target_name)
- def _add_name_vhost_if_necessary(self, vhost):
+ def _add_name_vhost_if_necessary(self, vhost: obj.VirtualHost) -> None:
"""Add NameVirtualHost Directives if necessary for new vhost.
NameVirtualHosts was a directive in Apache < 2.4
@@ -1731,7 +1756,7 @@ class ApacheConfigurator(common.Configurator):
:type vhost: :class:`~certbot_apache._internal.obj.VirtualHost`
"""
- need_to_save = False
+ need_to_save: bool = False
# See if the exact address appears in any other vhost
# Remember 1.1.1.1:* == 1.1.1.1 -> hence any()
@@ -1755,14 +1780,14 @@ class ApacheConfigurator(common.Configurator):
if need_to_save:
self.save()
- def find_vhost_by_id(self, id_str):
+ def find_vhost_by_id(self, id_str: str) -> obj.VirtualHost:
"""
Searches through VirtualHosts and tries to match the id in a comment
:param str id_str: Id string for matching
- :returns: The matched VirtualHost or None
- :rtype: :class:`~certbot_apache._internal.obj.VirtualHost` or None
+ :returns: The matched VirtualHost
+ :rtype: :class:`~certbot_apache._internal.obj.VirtualHost`
:raises .errors.PluginError: If no VirtualHost is found
"""
@@ -1774,7 +1799,7 @@ class ApacheConfigurator(common.Configurator):
logger.warning(msg)
raise errors.PluginError(msg)
- def _find_vhost_id(self, vhost):
+ def _find_vhost_id(self, vhost: obj.VirtualHost) -> Optional[str]:
"""Tries to find the unique ID from the VirtualHost comments. This is
used for keeping track of VirtualHost directive over time.
@@ -1792,10 +1817,11 @@ class ApacheConfigurator(common.Configurator):
if id_comment:
# Use the first value, multiple ones shouldn't exist
comment = self.parser.get_arg(id_comment[0])
- return comment.split(" ")[-1]
+ if comment is not None:
+ return comment.split(" ")[-1]
return None
- def add_vhost_id(self, vhost):
+ def add_vhost_id(self, vhost: obj.VirtualHost) -> Optional[str]:
"""Adds an unique ID to the VirtualHost as a comment for mapping back
to it on later invocations, as the config file order might have changed.
If ID already exists, returns that instead.
@@ -1812,11 +1838,11 @@ class ApacheConfigurator(common.Configurator):
return vh_id
id_string = apache_util.unique_id()
- comment = constants.MANAGED_COMMENT_ID.format(id_string)
+ comment: str = constants.MANAGED_COMMENT_ID.format(id_string)
self.parser.add_comment(vhost.path, comment)
return id_string
- def _escape(self, fp):
+ def _escape(self, fp: str) -> str:
fp = fp.replace(",", "\\,")
fp = fp.replace("[", "\\[")
fp = fp.replace("]", "\\]")
@@ -1830,11 +1856,12 @@ class ApacheConfigurator(common.Configurator):
######################################################################
# Enhancements
######################################################################
- def supported_enhancements(self):
+ def supported_enhancements(self) -> List[str]:
"""Returns currently supported enhancements."""
return ["redirect", "ensure-http-header", "staple-ocsp"]
- def enhance(self, domain, enhancement, options=None):
+ def enhance(self, domain: str, enhancement: str,
+ options: Optional[Union[List[str], str]] = None) -> None:
"""Enhance configuration.
:param str domain: domain to enhance
@@ -1864,7 +1891,7 @@ class ApacheConfigurator(common.Configurator):
"enhancement was not configured.")
msg_enhancement = enhancement
if options:
- msg_enhancement += ": " + options
+ msg_enhancement += ": " + str(options)
msg = msg_tmpl.format(domain, msg_enhancement)
logger.error(msg)
raise errors.PluginError(msg)
@@ -1875,7 +1902,9 @@ class ApacheConfigurator(common.Configurator):
logger.error("Failed %s for %s", enhancement, domain)
raise
- def _autohsts_increase(self, vhost, id_str, nextstep):
+ def _autohsts_increase(
+ self, vhost: obj.VirtualHost, id_str: str, nextstep: int
+ ) -> None:
"""Increase the AutoHSTS max-age value
:param vhost: Virtual host object to modify
@@ -1890,7 +1919,7 @@ class ApacheConfigurator(common.Configurator):
self._autohsts_write(vhost, nextstep_value)
self._autohsts[id_str] = {"laststep": nextstep, "timestamp": time.time()}
- def _autohsts_write(self, vhost, nextstep_value):
+ def _autohsts_write(self, vhost: obj.VirtualHost, nextstep_value: float) -> None:
"""
Write the new HSTS max-age value to the VirtualHost file
"""
@@ -1921,7 +1950,7 @@ class ApacheConfigurator(common.Configurator):
self.save_notes += note_msg
self.save(note_msg)
- def _autohsts_fetch_state(self):
+ def _autohsts_fetch_state(self) -> None:
"""
Populates the AutoHSTS state from the pluginstorage
"""
@@ -1930,21 +1959,23 @@ class ApacheConfigurator(common.Configurator):
except KeyError:
self._autohsts = {}
- def _autohsts_save_state(self):
+ def _autohsts_save_state(self) -> None:
"""
Saves the state of AutoHSTS object to pluginstorage
"""
self.storage.put("autohsts", self._autohsts)
self.storage.save()
- def _autohsts_vhost_in_lineage(self, vhost, lineage):
+ def _autohsts_vhost_in_lineage(
+ self, vhost: obj.VirtualHost, lineage: RenewableCert
+ ) -> bool:
"""
Searches AutoHSTS managed VirtualHosts that belong to the lineage.
Matches the private key path.
"""
return bool(self.parser.find_dir("SSLCertificateKeyFile", lineage.key_path, vhost.path))
- def _enable_ocsp_stapling(self, ssl_vhost, unused_options):
+ def _enable_ocsp_stapling(self, ssl_vhost: obj.VirtualHost, unused_options: Any) -> None:
"""Enables OCSP Stapling
In OCSP, each client (e.g. browser) would have to query the
@@ -1964,10 +1995,6 @@ class ApacheConfigurator(common.Configurator):
:param unused_options: Not currently used
:type unused_options: Not Available
-
- :returns: Success, general_vhost (HTTP vhost)
- :rtype: (bool, :class:`~certbot_apache._internal.obj.VirtualHost`)
-
"""
min_apache_ver = (2, 3, 3)
if self.get_version() < min_apache_ver:
@@ -2006,7 +2033,7 @@ class ApacheConfigurator(common.Configurator):
self.save()
logger.info(msg)
- def _set_http_header(self, ssl_vhost, header_substring):
+ def _set_http_header(self, ssl_vhost: obj.VirtualHost, header_substring: str):
"""Enables header that is identified by header_substring on ssl_vhost.
If the header identified by header_substring is not already set,
@@ -2022,9 +2049,6 @@ class ApacheConfigurator(common.Configurator):
e.g: Strict-Transport-Security, Upgrade-Insecure-Requests.
:type str
- :returns: Success, general_vhost (HTTP vhost)
- :rtype: (bool, :class:`~certbot_apache._internal.obj.VirtualHost`)
-
:raises .errors.PluginError: If no viable HTTP host can be created or
set with header header_substring.
@@ -2046,7 +2070,9 @@ class ApacheConfigurator(common.Configurator):
logger.info("Adding %s header to ssl vhost in %s", header_substring,
ssl_vhost.filep)
- def _verify_no_matching_http_header(self, ssl_vhost, header_substring):
+ def _verify_no_matching_http_header(
+ self, ssl_vhost: obj.VirtualHost, header_substring: str
+ ) -> None:
"""Checks to see if there is an existing Header directive that
contains the string header_substring.
@@ -2074,7 +2100,7 @@ class ApacheConfigurator(common.Configurator):
raise errors.PluginEnhancementAlreadyPresent(
"Existing %s header" % header_substring)
- def _enable_redirect(self, ssl_vhost, unused_options):
+ def _enable_redirect(self, ssl_vhost: obj.VirtualHost, unused_options: Any) -> None:
"""Redirect all equivalent HTTP traffic to ssl_vhost.
.. todo:: This enhancement should be rewritten and will
@@ -2155,15 +2181,13 @@ class ApacheConfigurator(common.Configurator):
logger.info("Redirecting vhost in %s to ssl vhost in %s",
general_vh.filep, ssl_vhost.filep)
- def _set_https_redirection_rewrite_rule(self, vhost):
+ def _set_https_redirection_rewrite_rule(self, vhost: obj.VirtualHost) -> None:
if self.get_version() >= (2, 3, 9):
- self.parser.add_dir(vhost.path, "RewriteRule",
- constants.REWRITE_HTTPS_ARGS_WITH_END)
+ self.parser.add_dir(vhost.path, "RewriteRule", constants.REWRITE_HTTPS_ARGS_WITH_END)
else:
- self.parser.add_dir(vhost.path, "RewriteRule",
- constants.REWRITE_HTTPS_ARGS)
+ self.parser.add_dir(vhost.path, "RewriteRule", constants.REWRITE_HTTPS_ARGS)
- def _verify_no_certbot_redirect(self, vhost):
+ def _verify_no_certbot_redirect(self, vhost: obj.VirtualHost) -> None:
"""Checks to see if a redirect was already installed by certbot.
Checks to see if virtualhost already contains a rewrite rule that is
@@ -2211,7 +2235,7 @@ class ApacheConfigurator(common.Configurator):
raise errors.PluginEnhancementAlreadyPresent(
"Certbot has already enabled redirection")
- def _is_rewrite_exists(self, vhost):
+ def _is_rewrite_exists(self, vhost: obj.VirtualHost) -> bool:
"""Checks if there exists a RewriteRule directive in vhost
:param vhost: vhost to check
@@ -2225,15 +2249,14 @@ class ApacheConfigurator(common.Configurator):
"RewriteRule", None, start=vhost.path)
return bool(rewrite_path)
- def _is_rewrite_engine_on(self, vhost):
+ def _is_rewrite_engine_on(self, vhost: obj.VirtualHost) -> Optional[Union[str, bool]]:
"""Checks if a RewriteEngine directive is on
:param vhost: vhost to check
:type vhost: :class:`~certbot_apache._internal.obj.VirtualHost`
"""
- rewrite_engine_path_list = self.parser.find_dir("RewriteEngine", "on",
- start=vhost.path)
+ rewrite_engine_path_list = self.parser.find_dir("RewriteEngine", "on", start=vhost.path)
if rewrite_engine_path_list:
for re_path in rewrite_engine_path_list:
# A RewriteEngine directive may also be included in per
@@ -2242,16 +2265,11 @@ class ApacheConfigurator(common.Configurator):
return self.parser.get_arg(re_path)
return False
- def _create_redirect_vhost(self, ssl_vhost):
+ def _create_redirect_vhost(self, ssl_vhost: obj.VirtualHost) -> None:
"""Creates an http_vhost specifically to redirect for the ssl_vhost.
:param ssl_vhost: ssl vhost
:type ssl_vhost: :class:`~certbot_apache._internal.obj.VirtualHost`
-
- :returns: tuple of the form
- (`success`, :class:`~certbot_apache._internal.obj.VirtualHost`)
- :rtype: tuple
-
"""
text = self._get_redirect_config_str(ssl_vhost)
@@ -2259,7 +2277,11 @@ class ApacheConfigurator(common.Configurator):
self.parser.aug.load()
# Make a new vhost data structure and add it to the lists
- new_vhost = self._create_vhost(parser.get_aug_path(self._escape(redirect_filepath)))
+ new_vhost = self._create_vhost(parser.get_aug_path(
+ self._escape(redirect_filepath))
+ )
+ if new_vhost is None:
+ raise errors.Error("Could not create a new vhost") # pragma: no cover
self.vhosts.append(new_vhost)
self._enhanced_vhosts["redirect"].add(new_vhost)
@@ -2268,7 +2290,7 @@ class ApacheConfigurator(common.Configurator):
"ssl vhost %s\n" %
(new_vhost.filep, ssl_vhost.filep))
- def _get_redirect_config_str(self, ssl_vhost):
+ def _get_redirect_config_str(self, ssl_vhost: obj.VirtualHost) -> str:
# get servernames and serveraliases
serveralias = ""
servername = ""
@@ -2278,7 +2300,7 @@ class ApacheConfigurator(common.Configurator):
if ssl_vhost.aliases:
serveralias = "ServerAlias " + " ".join(ssl_vhost.aliases)
- rewrite_rule_args: List[str] = []
+ rewrite_rule_args: List[str]
if self.get_version() >= (2, 3, 9):
rewrite_rule_args = constants.REWRITE_HTTPS_ARGS_WITH_END
else:
@@ -2298,7 +2320,7 @@ class ApacheConfigurator(common.Configurator):
f"</VirtualHost>\n"
)
- def _write_out_redirect(self, ssl_vhost, text):
+ def _write_out_redirect(self, ssl_vhost: obj.VirtualHost, text: str) -> str:
# This is the default name
redirect_filename = "le-redirect.conf"
@@ -2328,7 +2350,7 @@ class ApacheConfigurator(common.Configurator):
return redirect_filepath
- def _get_http_vhost(self, ssl_vhost):
+ def _get_http_vhost(self, ssl_vhost: obj.VirtualHost) -> Optional[obj.VirtualHost]:
"""Find appropriate HTTP vhost for ssl_vhost."""
# First candidate vhosts filter
if ssl_vhost.ancestor:
@@ -2348,7 +2370,7 @@ class ApacheConfigurator(common.Configurator):
return None
- def _get_proposed_addrs(self, vhost, port="80"):
+ def _get_proposed_addrs(self, vhost: obj.VirtualHost, port: str = "80") -> Iterable[obj.Addr]:
"""Return all addrs of vhost with the port replaced with the specified.
:param obj.VirtualHost ssl_vhost: Original Vhost
@@ -2363,7 +2385,7 @@ class ApacheConfigurator(common.Configurator):
return redirects
- def enable_site(self, vhost):
+ def enable_site(self, vhost: obj.VirtualHost) -> None:
"""Enables an available site, Apache reload required.
.. note:: Does not make sure that the site correctly works or that all
@@ -2390,7 +2412,8 @@ class ApacheConfigurator(common.Configurator):
vhost.enabled = True
return
- def enable_mod(self, mod_name, temp=False): # pylint: disable=unused-argument
+ # pylint: disable=unused-argument
+ def enable_mod(self, mod_name: str, temp: bool = False) -> None:
"""Enables module in Apache.
Both enables and reloads Apache so module is active.
@@ -2413,7 +2436,7 @@ class ApacheConfigurator(common.Configurator):
)
raise errors.MisconfigurationError(mod_message)
- def restart(self):
+ def restart(self) -> None:
"""Runs a config test and reloads the Apache server.
:raises .errors.MisconfigurationError: If either the config test
@@ -2423,7 +2446,7 @@ class ApacheConfigurator(common.Configurator):
self.config_test()
self._reload()
- def _reload(self):
+ def _reload(self) -> None:
"""Reloads the Apache server.
:raises .errors.MisconfigurationError: If reload fails
@@ -2448,7 +2471,7 @@ class ApacheConfigurator(common.Configurator):
error = str(err)
raise errors.MisconfigurationError(error)
- def config_test(self):
+ def config_test(self) -> None:
"""Check the configuration of Apache for errors.
:raises .errors.MisconfigurationError: If config_test fails
@@ -2459,7 +2482,7 @@ class ApacheConfigurator(common.Configurator):
except errors.SubprocessError as err:
raise errors.MisconfigurationError(str(err))
- def get_version(self):
+ def get_version(self) -> Tuple[int, ...]:
"""Return version of Apache Server.
Version is returned as tuple. (ie. 2.4.7 = (2, 4, 7))
@@ -2485,7 +2508,7 @@ class ApacheConfigurator(common.Configurator):
return tuple(int(i) for i in matches[0].split("."))
- def more_info(self):
+ def more_info(self) -> str:
"""Human-readable string to help understand the module"""
return (
"Configures Apache to authenticate and install HTTPS.{0}"
@@ -2495,7 +2518,9 @@ class ApacheConfigurator(common.Configurator):
version=".".join(str(i) for i in self.version))
)
- def auth_hint(self, failed_achalls): # pragma: no cover
+ def auth_hint(
+ self, failed_achalls: Iterable[achallenges.AnnotatedChallenge]
+ ) -> str: # pragma: no cover
return ("The Certificate Authority failed to verify the temporary Apache configuration "
"changes made by Certbot. Ensure that the listed domains point to this Apache "
"server and that it is accessible from the internet.")
@@ -2503,11 +2528,13 @@ class ApacheConfigurator(common.Configurator):
###########################################################################
# Challenges Section
###########################################################################
- def get_chall_pref(self, unused_domain):
+ def get_chall_pref(self, unused_domain: str) -> Sequence[Type[challenges.HTTP01]]:
"""Return list of challenge preferences."""
return [challenges.HTTP01]
- def perform(self, achalls):
+ def perform(
+ self, achalls: List[KeyAuthorizationAnnotatedChallenge]
+ ) -> List[Challenge]:
"""Perform the configuration related challenge.
This function currently assumes all challenges will be fulfilled.
@@ -2516,7 +2543,7 @@ class ApacheConfigurator(common.Configurator):
"""
self._chall_out.update(achalls)
- responses = [None] * len(achalls)
+ responses: List[Optional[Challenge]] = [None] * len(achalls)
http_doer = http_01.ApacheHttp01(self)
for i, achall in enumerate(achalls):
@@ -2538,16 +2565,22 @@ class ApacheConfigurator(common.Configurator):
self._update_responses(responses, http_response, http_doer)
- return responses
+ # We assume all challenges has been fulfilled as described in the function documentation.
+ return cast(List[Challenge], responses)
- def _update_responses(self, responses, chall_response, chall_doer):
+ def _update_responses(
+ self,
+ responses: List[Optional[challenges.HTTP01Response]],
+ chall_response: List[Challenge],
+ chall_doer: http_01.ApacheHttp01
+ ) -> None:
# Go through all of the challenges and assign them to the proper
# place in the responses return value. All responses must be in the
# same order as the original challenges.
for i, resp in enumerate(chall_response):
responses[chall_doer.indices[i]] = resp
- def cleanup(self, achalls):
+ def cleanup(self, achalls: List[KeyAuthorizationAnnotatedChallenge]) -> None:
"""Revert all challenges."""
self._chall_out.difference_update(achalls)
@@ -2557,7 +2590,9 @@ class ApacheConfigurator(common.Configurator):
self.restart()
self.parser.reset_modules()
- def install_ssl_options_conf(self, options_ssl, options_ssl_digest, warn_on_no_mod_ssl=True):
+ def install_ssl_options_conf(
+ self, options_ssl: str, options_ssl_digest: str, warn_on_no_mod_ssl: bool = True
+ ) -> None:
"""Copy Certbot's SSL options file into the system's config dir if required.
:param bool warn_on_no_mod_ssl: True if we should warn if mod_ssl is not found.
@@ -2571,7 +2606,7 @@ class ApacheConfigurator(common.Configurator):
return common.install_version_controlled_file(
options_ssl, options_ssl_digest, apache_config_path, constants.ALL_SSL_OPTIONS_HASHES)
- def enable_autohsts(self, _unused_lineage, domains):
+ def enable_autohsts(self, _unused_lineage: RenewableCert, domains: List[str]) -> None:
"""
Enable the AutoHSTS enhancement for defined domains
@@ -2583,7 +2618,7 @@ class ApacheConfigurator(common.Configurator):
"""
self._autohsts_fetch_state()
- _enhanced_vhosts = []
+ _enhanced_vhosts: List[obj.VirtualHost] = []
for d in domains:
matched_vhosts = self.choose_vhosts(d, create_if_no_ssl=False)
# We should be handling only SSL vhosts for AutoHSTS
@@ -2615,7 +2650,7 @@ class ApacheConfigurator(common.Configurator):
# Save the current state to pluginstorage
self._autohsts_save_state()
- def _enable_autohsts_domain(self, ssl_vhost):
+ def _enable_autohsts_domain(self, ssl_vhost: obj.VirtualHost) -> None:
"""Do the initial AutoHSTS deployment to a vhost
:param ssl_vhost: The VirtualHost object to deploy the AutoHSTS
@@ -2625,8 +2660,7 @@ class ApacheConfigurator(common.Configurator):
"""
# This raises the exception
- self._verify_no_matching_http_header(ssl_vhost,
- "Strict-Transport-Security")
+ self._verify_no_matching_http_header(ssl_vhost, "Strict-Transport-Security")
if "headers_module" not in self.parser.modules:
self.enable_mod("headers")
@@ -2637,6 +2671,8 @@ class ApacheConfigurator(common.Configurator):
# Add ID to the VirtualHost for mapping back to it later
uniq_id = self.add_vhost_id(ssl_vhost)
+ if uniq_id is None:
+ raise errors.Error("Could not generate a unique id") # pragma: no cover
self.save_notes += "Adding unique ID {0} to VirtualHost in {1}\n".format(
uniq_id, ssl_vhost.filep)
# Add the actual HSTS header
@@ -2649,7 +2685,7 @@ class ApacheConfigurator(common.Configurator):
# Save the current state to pluginstorage
self._autohsts[uniq_id] = {"laststep": 0, "timestamp": time.time()}
- def update_autohsts(self, _unused_domain):
+ def update_autohsts(self, _unused_domain: str) -> None:
"""
Increase the AutoHSTS values of VirtualHosts that the user has enabled
this enhancement for.
@@ -2668,7 +2704,7 @@ class ApacheConfigurator(common.Configurator):
if config["timestamp"] + constants.AUTOHSTS_FREQ > curtime:
# Skip if last increase was < AUTOHSTS_FREQ ago
continue
- nextstep = config["laststep"] + 1
+ nextstep = cast(int, config["laststep"]) + 1
if nextstep < len(constants.AUTOHSTS_STEPS):
# If installer hasn't been prepared yet, do it now
if not self._prepared:
@@ -2695,7 +2731,7 @@ class ApacheConfigurator(common.Configurator):
self._autohsts_save_state()
- def deploy_autohsts(self, lineage):
+ def deploy_autohsts(self, lineage: RenewableCert) -> None:
"""
Checks if autohsts vhost has reached maximum auto-increased value
and changes the HSTS max-age to a high value.
@@ -2708,14 +2744,14 @@ class ApacheConfigurator(common.Configurator):
# No autohsts enabled for any vhost
return
- vhosts = []
+ vhosts: List[obj.VirtualHost] = []
affected_ids = []
# Copy, as we are removing from the dict inside the loop
for id_str, config in list(self._autohsts.items()):
if config["laststep"]+1 >= len(constants.AUTOHSTS_STEPS):
# max value reached, try to make permanent
try:
- vhost = self.find_vhost_by_id(id_str)
+ vhost: obj.VirtualHost = self.find_vhost_by_id(id_str)
except errors.PluginError:
msg = ("VirtualHost with id {} was not found, unable to "
"make HSTS max-age permanent.").format(id_str)
diff --git a/certbot-apache/certbot_apache/_internal/constants.py b/certbot-apache/certbot_apache/_internal/constants.py
index dde57006b..c442d4bff 100644
--- a/certbot-apache/certbot_apache/_internal/constants.py
+++ b/certbot-apache/certbot_apache/_internal/constants.py
@@ -1,4 +1,6 @@
"""Apache plugin constants."""
+from typing import List, Dict
+
import pkg_resources
from certbot.compat import os
@@ -13,7 +15,7 @@ UPDATED_MOD_SSL_CONF_DIGEST = ".updated-options-ssl-apache-conf-digest.txt"
in `certbot.configuration.NamespaceConfig.config_dir`."""
# NEVER REMOVE A SINGLE HASH FROM THIS LIST UNLESS YOU KNOW EXACTLY WHAT YOU ARE DOING!
-ALL_SSL_OPTIONS_HASHES = [
+ALL_SSL_OPTIONS_HASHES: List[str] = [
'2086bca02db48daf93468332543c60ac6acdb6f0b58c7bfdf578a5d47092f82a',
'4844d36c9a0f587172d9fa10f4f1c9518e3bcfa1947379f155e16a70a728c21a',
'5a922826719981c0a234b1fbcd495f3213e49d2519e845ea0748ba513044b65b',
@@ -36,39 +38,39 @@ AUGEAS_LENS_DIR = pkg_resources.resource_filename(
"certbot_apache", os.path.join("_internal", "augeas_lens"))
"""Path to the Augeas lens directory"""
-REWRITE_HTTPS_ARGS = [
+REWRITE_HTTPS_ARGS: List[str] = [
"^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[L,NE,R=permanent]"]
"""Apache version<2.3.9 rewrite rule arguments used for redirections to
https vhost"""
-REWRITE_HTTPS_ARGS_WITH_END = [
+REWRITE_HTTPS_ARGS_WITH_END: List[str] = [
"^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[END,NE,R=permanent]"]
"""Apache version >= 2.3.9 rewrite rule arguments used for redirections to
https vhost"""
-OLD_REWRITE_HTTPS_ARGS = [
+OLD_REWRITE_HTTPS_ARGS: List[List[str]] = [
["^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[L,QSA,R=permanent]"],
["^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[END,QSA,R=permanent]"]]
-HSTS_ARGS = ["always", "set", "Strict-Transport-Security",
+HSTS_ARGS: List[str] = ["always", "set", "Strict-Transport-Security",
"\"max-age=31536000\""]
"""Apache header arguments for HSTS"""
-UIR_ARGS = ["always", "set", "Content-Security-Policy",
- "upgrade-insecure-requests"]
+UIR_ARGS: List[str] = ["always", "set", "Content-Security-Policy", "upgrade-insecure-requests"]
-HEADER_ARGS = {"Strict-Transport-Security": HSTS_ARGS,
- "Upgrade-Insecure-Requests": UIR_ARGS}
+HEADER_ARGS: Dict[str, List[str]] = {
+ "Strict-Transport-Security": HSTS_ARGS, "Upgrade-Insecure-Requests": UIR_ARGS,
+}
-AUTOHSTS_STEPS = [60, 300, 900, 3600, 21600, 43200, 86400]
+AUTOHSTS_STEPS: List[int] = [60, 300, 900, 3600, 21600, 43200, 86400]
"""AutoHSTS increase steps: 1min, 5min, 15min, 1h, 6h, 12h, 24h"""
-AUTOHSTS_PERMANENT = 31536000
+AUTOHSTS_PERMANENT: int = 31536000
"""Value for the last max-age of HSTS"""
-AUTOHSTS_FREQ = 172800
+AUTOHSTS_FREQ: int = 172800
"""Minimum time since last increase to perform a new one: 48h"""
-MANAGED_COMMENT = "DO NOT REMOVE - Managed by Certbot"
-MANAGED_COMMENT_ID = MANAGED_COMMENT+", VirtualHost id: {0}"
+MANAGED_COMMENT: str = "DO NOT REMOVE - Managed by Certbot"
+MANAGED_COMMENT_ID: str = MANAGED_COMMENT + ", VirtualHost id: {0}"
"""Managed by Certbot comments and the VirtualHost identification template"""
diff --git a/certbot-apache/certbot_apache/_internal/display_ops.py b/certbot-apache/certbot_apache/_internal/display_ops.py
index 07a332c71..b9d7c8245 100644
--- a/certbot-apache/certbot_apache/_internal/display_ops.py
+++ b/certbot-apache/certbot_apache/_internal/display_ops.py
@@ -1,18 +1,23 @@
"""Contains UI methods for Apache operations."""
import logging
+from typing import Iterable
+from typing import List
+from typing import Optional
+from typing import Tuple
from certbot import errors
from certbot.compat import os
from certbot.display import util as display_util
+from certbot_apache._internal import obj
logger = logging.getLogger(__name__)
-def select_vhost_multiple(vhosts):
+def select_vhost_multiple(vhosts: Optional[List[obj.VirtualHost]]) -> List[obj.VirtualHost]:
"""Select multiple Vhosts to install the certificate for
:param vhosts: Available Apache VirtualHosts
- :type vhosts: :class:`list` of type `~obj.Vhost`
+ :type vhosts: :class:`list` of type `~obj.VirtualHost`
:returns: List of VirtualHosts
:rtype: :class:`list`of type `~obj.Vhost`
@@ -32,7 +37,7 @@ def select_vhost_multiple(vhosts):
return []
-def _reversemap_vhosts(names, vhosts):
+def _reversemap_vhosts(names: Iterable[str], vhosts: List[obj.VirtualHost]):
"""Helper function for select_vhost_multiple for mapping string
representations back to actual vhost objects"""
return_vhosts = []
@@ -44,9 +49,10 @@ def _reversemap_vhosts(names, vhosts):
return return_vhosts
-def select_vhost(domain, vhosts):
+def select_vhost(domain: str, vhosts: List[obj.VirtualHost]) -> Optional[obj.VirtualHost]:
"""Select an appropriate Apache Vhost.
+ :param domain: Domain to select
:param vhosts: Available Apache VirtualHosts
:type vhosts: :class:`list` of type `~obj.Vhost`
@@ -62,7 +68,7 @@ def select_vhost(domain, vhosts):
return None
-def _vhost_menu(domain, vhosts):
+def _vhost_menu(domain: str, vhosts: List[obj.VirtualHost]) -> Tuple[str, int]:
"""Select an appropriate Apache Vhost.
:param vhosts: Available Apache Virtual Hosts
diff --git a/certbot-apache/certbot_apache/_internal/dualparser.py b/certbot-apache/certbot_apache/_internal/dualparser.py
index 7559e3839..8d99b5fbb 100644
--- a/certbot-apache/certbot_apache/_internal/dualparser.py
+++ b/certbot-apache/certbot_apache/_internal/dualparser.py
@@ -1,7 +1,16 @@
""" Dual ParserNode implementation """
+from typing import Any
+from typing import Callable
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Set
+from typing import Tuple
+
from certbot_apache._internal import apacheparser
from certbot_apache._internal import assertions
from certbot_apache._internal import augeasparser
+from certbot_apache._internal import interfaces
class DualNodeBase:
@@ -9,12 +18,12 @@ class DualNodeBase:
base class for dual parser interface classes. This class handles runtime
attribute value assertions."""
- def save(self, msg): # pragma: no cover
+ def save(self, msg: str): # pragma: no cover
""" Call save for both parsers """
self.primary.save(msg)
self.secondary.save(msg)
- def __getattr__(self, aname):
+ def __getattr__(self, aname: str) -> Any:
""" Attribute value assertion """
firstval = getattr(self.primary, aname)
secondval = getattr(self.secondary, aname)
@@ -28,11 +37,13 @@ class DualNodeBase:
assertions.assertEqualSimple(firstval, secondval)
return firstval
- def find_ancestors(self, name):
+ def find_ancestors(self, name: str) -> Sequence[interfaces.ParserNode]:
""" Traverses the ancestor tree and returns ancestors matching name """
return self._find_helper(DualBlockNode, "find_ancestors", name)
- def _find_helper(self, nodeclass, findfunc, search, **kwargs):
+ def _find_helper(
+ self, nodeclass: Callable, findfunc: str, search: str, **kwargs: Any
+ ) -> List[apacheparser.ApacheBlockNode]:
"""A helper for find_* functions. The function specific attributes should
be passed as keyword arguments.
@@ -75,7 +86,7 @@ class DualNodeBase:
class DualCommentNode(DualNodeBase):
""" Dual parser implementation of CommentNode interface """
- def __init__(self, **kwargs):
+ def __init__(self, **kwargs: Any):
""" This initialization implementation allows ordinary initialization
of CommentNode objects as well as creating a DualCommentNode object
using precreated or fetched CommentNode objects if provided as optional
@@ -107,7 +118,7 @@ class DualCommentNode(DualNodeBase):
class DualDirectiveNode(DualNodeBase):
""" Dual parser implementation of DirectiveNode interface """
- def __init__(self, **kwargs):
+ def __init__(self, **kwargs: Any):
""" This initialization implementation allows ordinary initialization
of DirectiveNode objects as well as creating a DualDirectiveNode object
using precreated or fetched DirectiveNode objects if provided as optional
@@ -118,8 +129,6 @@ class DualDirectiveNode(DualNodeBase):
:param DirectiveNode primary: Primary pre-created DirectiveNode, mainly
used when creating new DualParser nodes using add_* methods.
:param DirectiveNode secondary: Secondary pre-created DirectiveNode
-
-
"""
kwargs.setdefault("primary", None)
@@ -132,8 +141,12 @@ class DualDirectiveNode(DualNodeBase):
self.primary = primary
self.secondary = secondary
else:
- self.primary = augeasparser.AugeasDirectiveNode(**kwargs)
- self.secondary = apacheparser.ApacheDirectiveNode(**kwargs)
+ self.primary = augeasparser.AugeasDirectiveNode(
+ **kwargs
+ )
+ self.secondary = apacheparser.ApacheDirectiveNode(
+ **kwargs
+ )
assertions.assertEqual(self.primary, self.secondary)
@@ -149,7 +162,7 @@ class DualDirectiveNode(DualNodeBase):
class DualBlockNode(DualNodeBase):
""" Dual parser implementation of BlockNode interface """
- def __init__(self, **kwargs):
+ def __init__(self, **kwargs: Any):
""" This initialization implementation allows ordinary initialization
of BlockNode objects as well as creating a DualBlockNode object
using precreated or fetched BlockNode objects if provided as optional
@@ -164,8 +177,8 @@ class DualBlockNode(DualNodeBase):
kwargs.setdefault("primary", None)
kwargs.setdefault("secondary", None)
- primary = kwargs.pop("primary")
- secondary = kwargs.pop("secondary")
+ primary: Optional[augeasparser.AugeasBlockNode] = kwargs.pop("primary")
+ secondary: Optional[apacheparser.ApacheBlockNode] = kwargs.pop("secondary")
if primary or secondary:
assert primary and secondary
@@ -177,7 +190,9 @@ class DualBlockNode(DualNodeBase):
assertions.assertEqual(self.primary, self.secondary)
- def add_child_block(self, name, parameters=None, position=None):
+ def add_child_block(
+ self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
+ ) -> "DualBlockNode":
""" Creates a new child BlockNode, asserts that both implementations
did it in a similar way, and returns a newly created DualBlockNode object
encapsulating both of the newly created objects """
@@ -187,7 +202,9 @@ class DualBlockNode(DualNodeBase):
assertions.assertEqual(primary_new, secondary_new)
return DualBlockNode(primary=primary_new, secondary=secondary_new)
- def add_child_directive(self, name, parameters=None, position=None):
+ def add_child_directive(
+ self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
+ ) -> DualDirectiveNode:
""" Creates a new child DirectiveNode, asserts that both implementations
did it in a similar way, and returns a newly created DualDirectiveNode
object encapsulating both of the newly created objects """
@@ -197,17 +214,23 @@ class DualBlockNode(DualNodeBase):
assertions.assertEqual(primary_new, secondary_new)
return DualDirectiveNode(primary=primary_new, secondary=secondary_new)
- def add_child_comment(self, comment="", position=None):
+ def add_child_comment(
+ self, comment: str = "", position: Optional[int] = None
+ ) -> DualCommentNode:
""" Creates a new child CommentNode, asserts that both implementations
did it in a similar way, and returns a newly created DualCommentNode
object encapsulating both of the newly created objects """
- primary_new = self.primary.add_child_comment(comment, position)
- secondary_new = self.secondary.add_child_comment(comment, position)
+ primary_new = self.primary.add_child_comment(comment=comment, position=position)
+ secondary_new = self.secondary.add_child_comment(name=comment, position=position)
assertions.assertEqual(primary_new, secondary_new)
return DualCommentNode(primary=primary_new, secondary=secondary_new)
- def _create_matching_list(self, primary_list, secondary_list):
+ def _create_matching_list(
+ self,
+ primary_list: List[interfaces.ParserNode],
+ secondary_list: List[interfaces.ParserNode],
+ ) -> List[Tuple[interfaces.ParserNode, interfaces.ParserNode]]:
""" Matches the list of primary_list to a list of secondary_list and
returns a list of tuples. This is used to create results for find_
methods.
@@ -234,7 +257,7 @@ class DualBlockNode(DualNodeBase):
raise AssertionError("Could not find a matching node.")
return matched
- def find_blocks(self, name, exclude=True):
+ def find_blocks(self, name: str, exclude: bool = True) -> List[apacheparser.ApacheBlockNode]:
"""
Performs a search for BlockNodes using both implementations and does simple
checks for results. This is built upon the assumption that unimplemented
@@ -246,7 +269,8 @@ class DualBlockNode(DualNodeBase):
return self._find_helper(DualBlockNode, "find_blocks", name,
exclude=exclude)
- def find_directives(self, name, exclude=True):
+ def find_directives(self, name: str, exclude: bool = True
+ ) -> Sequence[apacheparser.ApacheDirectiveNode]:
"""
Performs a search for DirectiveNodes using both implementations and
checks the results. This is built upon the assumption that unimplemented
@@ -258,7 +282,7 @@ class DualBlockNode(DualNodeBase):
return self._find_helper(DualDirectiveNode, "find_directives", name,
exclude=exclude)
- def find_comments(self, comment):
+ def find_comments(self, comment: str) -> Sequence[apacheparser.ApacheParserNode]:
"""
Performs a search for CommentNodes using both implementations and
checks the results. This is built upon the assumption that unimplemented
@@ -269,7 +293,7 @@ class DualBlockNode(DualNodeBase):
return self._find_helper(DualCommentNode, "find_comments", comment)
- def delete_child(self, child):
+ def delete_child(self, child: "DualBlockNode"):
"""Deletes a child from the ParserNode implementations. The actual
ParserNode implementations are used here directly in order to be able
to match a child to the list of children."""
@@ -277,7 +301,7 @@ class DualBlockNode(DualNodeBase):
self.primary.delete_child(child.primary)
self.secondary.delete_child(child.secondary)
- def unsaved_files(self):
+ def unsaved_files(self) -> Set[str]:
""" Fetches the list of unsaved file paths and asserts that the lists
match """
primary_files = self.primary.unsaved_files()
@@ -286,7 +310,7 @@ class DualBlockNode(DualNodeBase):
return primary_files
- def parsed_paths(self):
+ def parsed_paths(self) -> List[str]:
"""
Returns a list of file paths that have currently been parsed into the parser
tree. The returned list may include paths with wildcard characters, for
diff --git a/certbot-apache/certbot_apache/_internal/entrypoint.py b/certbot-apache/certbot_apache/_internal/entrypoint.py
index 7a2034004..40b3b412e 100644
--- a/certbot-apache/certbot_apache/_internal/entrypoint.py
+++ b/certbot-apache/certbot_apache/_internal/entrypoint.py
@@ -1,4 +1,7 @@
""" Entry point for Apache Plugin """
+from typing import Callable
+from typing import Dict
+
from certbot import util
from certbot_apache._internal import configurator
from certbot_apache._internal import override_arch
@@ -10,7 +13,7 @@ from certbot_apache._internal import override_gentoo
from certbot_apache._internal import override_suse
from certbot_apache._internal import override_void
-OVERRIDE_CLASSES = {
+OVERRIDE_CLASSES: Dict[str, Callable] = {
"arch": override_arch.ArchConfigurator,
"cloudlinux": override_centos.CentOSConfigurator,
"darwin": override_darwin.DarwinConfigurator,
diff --git a/certbot-apache/certbot_apache/_internal/http_01.py b/certbot-apache/certbot_apache/_internal/http_01.py
index 117eec2a5..2fe981720 100644
--- a/certbot-apache/certbot_apache/_internal/http_01.py
+++ b/certbot-apache/certbot_apache/_internal/http_01.py
@@ -1,11 +1,14 @@
"""A class that performs HTTP-01 challenges for Apache"""
import errno
import logging
+from typing import Any
from typing import List
from typing import Set
from typing import TYPE_CHECKING
+from acme.challenges import HTTP01Response
from certbot import errors
+from certbot.achallenges import KeyAuthorizationAnnotatedChallenge
from certbot.compat import filesystem
from certbot.compat import os
from certbot.plugins import common
@@ -64,7 +67,7 @@ class ApacheHttp01(common.ChallengePerformer):
"http_challenges")
self.moded_vhosts: Set[VirtualHost] = set()
- def perform(self):
+ def perform(self) -> List[KeyAuthorizationAnnotatedChallenge]:
"""Perform all HTTP-01 challenges."""
if not self.achalls:
return []
@@ -72,8 +75,7 @@ class ApacheHttp01(common.ChallengePerformer):
# About to make temporary changes to the config
self.configurator.save("Changes before challenge setup", True)
- self.configurator.ensure_listen(str(
- self.configurator.config.http01_port))
+ self.configurator.ensure_listen(str(self.configurator.config.http01_port))
self.prepare_http01_modules()
responses = self._set_up_challenges()
@@ -84,7 +86,7 @@ class ApacheHttp01(common.ChallengePerformer):
return responses
- def prepare_http01_modules(self):
+ def prepare_http01_modules(self) -> None:
"""Make sure that we have the needed modules available for http01"""
if self.configurator.conf("handle-modules"):
@@ -97,7 +99,7 @@ class ApacheHttp01(common.ChallengePerformer):
if mod + "_module" not in self.configurator.parser.modules:
self.configurator.enable_mod(mod, temp=True)
- def _mod_config(self):
+ def _mod_config(self) -> None:
selected_vhosts: List[VirtualHost] = []
http_port = str(self.configurator.config.http01_port)
@@ -146,7 +148,7 @@ class ApacheHttp01(common.ChallengePerformer):
with open(self.challenge_conf_post, "w") as new_conf:
new_conf.write(config_text_post)
- def _matching_vhosts(self, domain):
+ def _matching_vhosts(self, domain: str) -> List[VirtualHost]:
"""Return all VirtualHost objects that have the requested domain name or
a wildcard name that would match the domain in ServerName or ServerAlias
directive.
@@ -160,9 +162,9 @@ class ApacheHttp01(common.ChallengePerformer):
return matching_vhosts
- def _relevant_vhosts(self):
+ def _relevant_vhosts(self) -> List[VirtualHost]:
http01_port = str(self.configurator.config.http01_port)
- relevant_vhosts = []
+ relevant_vhosts: List[VirtualHost] = []
for vhost in self.configurator.vhosts:
if any(a.is_wildcard() or a.get_port() == http01_port for a in vhost.addrs):
if not vhost.ssl:
@@ -180,7 +182,7 @@ class ApacheHttp01(common.ChallengePerformer):
"""Return all VirtualHost objects with no ServerName"""
return [vh for vh in self.configurator.vhosts if vh.name is None]
- def _set_up_challenges(self):
+ def _set_up_challenges(self) -> List[HTTP01Response]:
if not os.path.isdir(self.challenge_dir):
old_umask = filesystem.umask(0o022)
try:
@@ -198,10 +200,12 @@ class ApacheHttp01(common.ChallengePerformer):
return responses
- def _set_up_challenge(self, achall):
+ def _set_up_challenge(self, achall: KeyAuthorizationAnnotatedChallenge) -> HTTP01Response:
+ response: HTTP01Response
+ validation: Any
response, validation = achall.response_and_validation()
- name = os.path.join(self.challenge_dir, achall.chall.encode("token"))
+ name: str = os.path.join(self.challenge_dir, achall.chall.encode("token"))
self.configurator.reverter.register_file_creation(True, name)
with open(name, 'wb') as f:
@@ -210,7 +214,7 @@ class ApacheHttp01(common.ChallengePerformer):
return response
- def _set_up_include_directives(self, vhost):
+ def _set_up_include_directives(self, vhost: VirtualHost) -> None:
"""Includes override configuration to the beginning and to the end of
VirtualHost. Note that this include isn't added to Augeas search tree"""
diff --git a/certbot-apache/certbot_apache/_internal/interfaces.py b/certbot-apache/certbot_apache/_internal/interfaces.py
index 20b068ee7..ad1086632 100644
--- a/certbot-apache/certbot_apache/_internal/interfaces.py
+++ b/certbot-apache/certbot_apache/_internal/interfaces.py
@@ -100,6 +100,9 @@ For this reason the internal representation of data should not ignore the case.
"""
import abc
+from typing import Any
+from typing import List
+from typing import Optional
class ParserNode(metaclass=abc.ABCMeta):
@@ -146,7 +149,7 @@ class ParserNode(metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
- def __init__(self, **kwargs):
+ def __init__(self, **kwargs: Any):
"""
Initializes the ParserNode instance, and sets the ParserNode specific
instance variables. This is not meant to be used directly, but through
@@ -170,7 +173,7 @@ class ParserNode(metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
- def save(self, msg):
+ def save(self, msg: str) -> None:
"""
Save traverses the children, and attempts to write the AST to disk for
all the objects that are marked dirty. The actual operation of course
@@ -189,7 +192,7 @@ class ParserNode(metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
- def find_ancestors(self, name):
+ def find_ancestors(self, name: str):
"""
Traverses the ancestor tree up, searching for BlockNodes with a specific
name.
@@ -220,7 +223,7 @@ class CommentNode(ParserNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
- def __init__(self, **kwargs):
+ def __init__(self, **kwargs: Any):
"""
Initializes the CommentNode instance and sets its instance variables.
@@ -238,10 +241,12 @@ class CommentNode(ParserNode, metaclass=abc.ABCMeta):
created or changed after the last save. Default: False.
:type dirty: bool
"""
- super().__init__(ancestor=kwargs['ancestor'],
- dirty=kwargs.get('dirty', False),
- filepath=kwargs['filepath'],
- metadata=kwargs.get('metadata', {})) # pragma: no cover
+ super().__init__( # pragma: no cover
+ ancestor=kwargs['ancestor'],
+ dirty=kwargs.get('dirty', False),
+ filepath=kwargs['filepath'],
+ metadata=kwargs.get('metadata', {}),
+ )
class DirectiveNode(ParserNode, metaclass=abc.ABCMeta):
@@ -272,7 +277,7 @@ class DirectiveNode(ParserNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
- def __init__(self, **kwargs):
+ def __init__(self, **kwargs: Any) -> None:
"""
Initializes the DirectiveNode instance and sets its instance variables.
@@ -302,13 +307,15 @@ class DirectiveNode(ParserNode, metaclass=abc.ABCMeta):
:type enabled: bool
"""
- super().__init__(ancestor=kwargs['ancestor'],
- dirty=kwargs.get('dirty', False),
- filepath=kwargs['filepath'],
- metadata=kwargs.get('metadata', {})) # pragma: no cover
+ super().__init__( # pragma: no cover
+ ancestor=kwargs['ancestor'],
+ dirty=kwargs.get('dirty', False),
+ filepath=kwargs['filepath'],
+ metadata=kwargs.get('metadata', {}),
+ )
@abc.abstractmethod
- def set_parameters(self, parameters):
+ def set_parameters(self, parameters: List[str]) -> None:
"""
Sets the sequence of parameters for this ParserNode object without
whitespaces. While the whitespaces for parameters are discarded when using
@@ -361,7 +368,9 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
- def add_child_block(self, name, parameters=None, position=None):
+ def add_child_block(
+ self, name: str, parameters: List[str] = None, position: int = None
+ ) -> "BlockNode":
"""
Adds a new BlockNode child node with provided values and marks the callee
BlockNode dirty. This is used to add new children to the AST. The preceding
@@ -381,7 +390,9 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
- def add_child_directive(self, name, parameters=None, position=None):
+ def add_child_directive(
+ self, name: str, parameters: Optional[List[str]] = None, position: Optional[int] = None
+ ) -> "DirectiveNode":
"""
Adds a new DirectiveNode child node with provided values and marks the
callee BlockNode dirty. This is used to add new children to the AST. The
@@ -402,7 +413,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
- def add_child_comment(self, comment="", position=None):
+ def add_child_comment(self, comment: str = "", position: Optional[int] = None) -> "CommentNode":
"""
Adds a new CommentNode child node with provided value and marks the
callee BlockNode dirty. This is used to add new children to the AST. The
@@ -422,7 +433,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
- def find_blocks(self, name, exclude=True):
+ def find_blocks(self, name: str, exclude: bool = True) -> List["BlockNode"]:
"""
Find a configuration block by name. This method walks the child tree of
ParserNodes under the instance it was called from. This way it is possible
@@ -439,7 +450,23 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
- def find_directives(self, name, exclude=True):
+ def find_comments(self, comment: str) -> List["CommentNode"]:
+ """
+ Find comments with value containing the search term.
+
+ This method walks the child tree of ParserNodes under the instance it was
+ called from. This way it is possible to search for the whole configuration
+ tree, when starting from root node, or to do a partial search when starting
+ from a specified branch. The lookup should be case sensitive.
+
+ :param str comment: The content of comment to search for
+
+ :returns: A list of found CommentNode objects.
+
+ """
+
+ @abc.abstractmethod
+ def find_directives(self, name: str, exclude: bool = True):
"""
Find a directive by name. This method walks the child tree of ParserNodes
under the instance it was called from. This way it is possible to search
@@ -457,23 +484,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
- def find_comments(self, comment):
- """
- Find comments with value containing the search term.
-
- This method walks the child tree of ParserNodes under the instance it was
- called from. This way it is possible to search for the whole configuration
- tree, when starting from root node, or to do a partial search when starting
- from a specified branch. The lookup should be case sensitive.
-
- :param str comment: The content of comment to search for
-
- :returns: A list of found CommentNode objects.
-
- """
-
- @abc.abstractmethod
- def delete_child(self, child):
+ def delete_child(self, child: "ParserNode") -> None:
"""
Remove a specified child node from the list of children of the called
BlockNode object.
@@ -483,7 +494,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
- def unsaved_files(self):
+ def unsaved_files(self) -> List[str]:
"""
Returns a list of file paths that have been changed since the last save
(or the initial configuration parse). The intended use for this method
@@ -496,7 +507,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
- def parsed_paths(self):
+ def parsed_paths(self) -> List[str]:
"""
Returns a list of file paths that have currently been parsed into the parser
tree. The returned list may include paths with wildcard characters, for
diff --git a/certbot-apache/certbot_apache/_internal/obj.py b/certbot-apache/certbot_apache/_internal/obj.py
index 1d224a8ab..c206a42cb 100644
--- a/certbot-apache/certbot_apache/_internal/obj.py
+++ b/certbot-apache/certbot_apache/_internal/obj.py
@@ -1,14 +1,19 @@
"""Module contains classes used by the Apache Configurator."""
import re
+from typing import Any
+from typing import Iterable
+from typing import Optional
+from typing import Pattern
from typing import Set
from certbot.plugins import common
+from certbot_apache._internal import interfaces
class Addr(common.Addr):
"""Represents an Apache address."""
- def __eq__(self, other):
+ def __eq__(self, other: Any):
"""This is defined as equivalent within Apache.
ip_addr:* == ip_addr
@@ -28,12 +33,12 @@ class Addr(common.Addr):
# __cmp__ is overridden. See https://bugs.python.org/issue2235
return super().__hash__()
- def _addr_less_specific(self, addr):
+ def _addr_less_specific(self, addr: "Addr") -> bool:
"""Returns if addr.get_addr() is more specific than self.get_addr()."""
# pylint: disable=protected-access
return addr._rank_specific_addr() > self._rank_specific_addr()
- def _rank_specific_addr(self):
+ def _rank_specific_addr(self) -> int:
"""Returns numerical rank for get_addr()
:returns: 2 - FQ, 1 - wildcard, 0 - _default_
@@ -46,7 +51,7 @@ class Addr(common.Addr):
return 1
return 2
- def conflicts(self, addr):
+ def conflicts(self, addr: "Addr") -> bool:
r"""Returns if address could conflict with correct function of self.
Could addr take away service provided by self within Apache?
@@ -74,11 +79,11 @@ class Addr(common.Addr):
return True
return False
- def is_wildcard(self):
+ def is_wildcard(self) -> bool:
"""Returns if address has a wildcard port."""
return self.tup[1] == "*" or not self.tup[1]
- def get_sni_addr(self, port):
+ def get_sni_addr(self, port: str) -> common.Addr:
"""Returns the least specific address that resolves on the port.
Examples:
@@ -118,13 +123,16 @@ class VirtualHost:
"""
# ?: is used for not returning enclosed characters
- strip_name = re.compile(r"^(?:.+://)?([^ :$]*)")
+ strip_name: Pattern = re.compile(r"^(?:.+://)?([^ :$]*)")
- def __init__(self, filep, path, addrs, ssl, enabled, name=None,
- aliases=None, modmacro=False, ancestor=None, node=None):
+ def __init__(
+ self, filepath: str, path: str, addrs: Set["Addr"],
+ ssl: bool, enabled: bool, name: Optional[str] = None,
+ aliases: Optional[Set[str]] = None, modmacro: bool = False,
+ ancestor: Optional["VirtualHost"] = None, node = None):
"""Initialize a VH."""
- self.filep = filep
+ self.filep = filepath
self.path = path
self.addrs = addrs
self.name = name
@@ -133,9 +141,9 @@ class VirtualHost:
self.enabled = enabled
self.modmacro = modmacro
self.ancestor = ancestor
- self.node = node
+ self.node: interfaces.BlockNode = node
- def get_names(self):
+ def get_names(self) -> Set[str]:
"""Return a set of all names."""
all_names: Set[str] = set()
all_names.update(self.aliases)
@@ -157,7 +165,7 @@ class VirtualHost:
f"mod_macro Vhost: {'Yes' if self.modmacro else 'No'}"
)
- def display_repr(self):
+ def display_repr(self) -> str:
"""Return a representation of VHost to be used in dialog"""
return (
f"File: {self.filep}\n"
@@ -166,7 +174,7 @@ class VirtualHost:
f"HTTPS: {'Yes' if self.ssl else 'No'}\n"
)
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
if isinstance(other, self.__class__):
return (self.filep == other.filep and self.path == other.path and
self.addrs == other.addrs and
@@ -182,7 +190,7 @@ class VirtualHost:
tuple(self.addrs), tuple(self.get_names()),
self.ssl, self.enabled, self.modmacro))
- def conflicts(self, addrs):
+ def conflicts(self, addrs: Iterable[Addr]) -> bool:
"""See if vhost conflicts with any of the addrs.
This determines whether or not these addresses would/could overwrite
@@ -201,7 +209,7 @@ class VirtualHost:
return True
return False
- def same_server(self, vhost, generic=False):
+ def same_server(self, vhost: "VirtualHost", generic: bool = False) -> bool:
"""Determines if the vhost is the same 'server'.
Used in redirection - indicates whether or not the two virtual hosts
diff --git a/certbot-apache/certbot_apache/_internal/override_centos.py b/certbot-apache/certbot_apache/_internal/override_centos.py
index c898615bb..406c42e35 100644
--- a/certbot-apache/certbot_apache/_internal/override_centos.py
+++ b/certbot-apache/certbot_apache/_internal/override_centos.py
@@ -1,5 +1,6 @@
""" Distribution specific override class for CentOS family (RHEL, Fedora) """
import logging
+from typing import Any
from typing import cast
from typing import List
@@ -30,7 +31,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
challenge_location="/etc/httpd/conf.d",
)
- def config_test(self):
+ def config_test(self) -> None:
"""
Override config_test to mitigate configtest error in vanilla installation
of mod_ssl in Fedora. The error is caused by non-existent self-signed
@@ -49,7 +50,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
else:
raise
- def _try_restart_fedora(self):
+ def _try_restart_fedora(self) -> None:
"""
Tries to restart httpd using systemctl to generate the self signed key pair.
"""
@@ -62,7 +63,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
# Finish with actual config check to see if systemctl restart helped
super().config_test()
- def _prepare_options(self):
+ def _prepare_options(self) -> None:
"""
Override the options dictionary initialization in order to support
alternative restart cmd used in CentOS.
@@ -72,13 +73,12 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
raise ValueError("OS option restart_cmd_alt must be set for CentOS.")
self.options.restart_cmd_alt[0] = self.options.ctl
- def get_parser(self):
+ def get_parser(self) -> "CentOSParser":
"""Initializes the ApacheParser"""
return CentOSParser(
- self.options.server_root, self.options.vhost_root,
- self.version, configurator=self)
+ self.options.server_root, self, self.options.vhost_root, self.version)
- def _deploy_cert(self, *args, **kwargs): # pylint: disable=arguments-differ
+ def _deploy_cert(self, *args: Any, **kwargs: Any): # pylint: disable=arguments-differ
"""
Override _deploy_cert in order to ensure that the Apache configuration
has "LoadModule ssl_module..." before parsing the VirtualHost configuration
@@ -88,7 +88,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
if self.version < (2, 4, 0):
self._deploy_loadmodule_ssl_if_needed()
- def _deploy_loadmodule_ssl_if_needed(self):
+ def _deploy_loadmodule_ssl_if_needed(self) -> None:
"""
Add "LoadModule ssl_module <pre-existing path>" to main httpd.conf if
it doesn't exist there already.
@@ -110,14 +110,13 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
"use, and run Certbot again.")
raise MisconfigurationError(msg)
else:
- loadmod_args = path_args
+ loadmod_args = [arg for arg in path_args if arg]
centos_parser: CentOSParser = cast(CentOSParser, self.parser)
if centos_parser.not_modssl_ifmodule(noarg_path):
if centos_parser.loc["default"] in noarg_path:
# LoadModule already in the main configuration file
- if ("ifmodule/" in noarg_path.lower() or
- "ifmodule[1]" in noarg_path.lower()):
+ if "ifmodule/" in noarg_path.lower() or "ifmodule[1]" in noarg_path.lower():
# It's the first or only IfModule in the file
return
# Populate the list of known !mod_ssl.c IfModules
@@ -148,8 +147,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
self.parser.aug.remove(loadmod_path)
# Create a new IfModule !mod_ssl.c if not already found on path
- ssl_ifmod = self.parser.get_ifmod(nodir_path, "!mod_ssl.c",
- beginning=True)[:-1]
+ ssl_ifmod = self.parser.get_ifmod(nodir_path, "!mod_ssl.c", beginning=True)[:-1]
if ssl_ifmod not in correct_ifmods:
self.parser.add_dir(ssl_ifmod, "LoadModule", loadmod_args)
correct_ifmods.append(ssl_ifmod)
@@ -159,24 +157,24 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
class CentOSParser(parser.ApacheParser):
"""CentOS specific ApacheParser override class"""
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
# CentOS specific configuration file for Apache
- self.sysconfig_filep = "/etc/sysconfig/httpd"
+ self.sysconfig_filep: str = "/etc/sysconfig/httpd"
super().__init__(*args, **kwargs)
- def update_runtime_variables(self):
+ def update_runtime_variables(self) -> None:
""" Override for update_runtime_variables for custom parsing """
# Opportunistic, works if SELinux not enforced
super().update_runtime_variables()
self.parse_sysconfig_var()
- def parse_sysconfig_var(self):
+ def parse_sysconfig_var(self) -> None:
""" Parses Apache CLI options from CentOS configuration file """
defines = apache_util.parse_define_file(self.sysconfig_filep, "OPTIONS")
for k, v in defines.items():
self.variables[k] = v
- def not_modssl_ifmodule(self, path):
+ def not_modssl_ifmodule(self, path: str) -> bool:
"""Checks if the provided Augeas path has argument !mod_ssl"""
if "ifmodule" not in path.lower():
diff --git a/certbot-apache/certbot_apache/_internal/override_debian.py b/certbot-apache/certbot_apache/_internal/override_debian.py
index 78041ceca..0d138ee9d 100644
--- a/certbot-apache/certbot_apache/_internal/override_debian.py
+++ b/certbot-apache/certbot_apache/_internal/override_debian.py
@@ -8,6 +8,7 @@ from certbot.compat import os
from certbot_apache._internal import apache_util
from certbot_apache._internal import configurator
from certbot_apache._internal.configurator import OsOptions
+from certbot_apache._internal.obj import VirtualHost
logger = logging.getLogger(__name__)
@@ -22,7 +23,7 @@ class DebianConfigurator(configurator.ApacheConfigurator):
handle_sites=True,
)
- def enable_site(self, vhost):
+ def enable_site(self, vhost: VirtualHost) -> None:
"""Enables an available site, Apache reload required.
.. note:: Does not make sure that the site correctly works or that all
@@ -67,7 +68,7 @@ class DebianConfigurator(configurator.ApacheConfigurator):
self.save_notes += "Enabled site %s\n" % vhost.filep
return None
- def enable_mod(self, mod_name, temp=False):
+ def enable_mod(self, mod_name: str, temp: bool = False) -> None:
"""Enables module in Apache.
Both enables and reloads Apache so module is active.
@@ -113,7 +114,7 @@ class DebianConfigurator(configurator.ApacheConfigurator):
# Reload is not necessary as DUMP_RUN_CFG uses latest config.
self.parser.update_runtime_variables()
- def _enable_mod_debian(self, mod_name, temp):
+ def _enable_mod_debian(self, mod_name: str, temp: bool) -> None:
"""Assumes mods-available, mods-enabled layout."""
# Generate reversal command.
# Try to be safe here... check that we can probably reverse before
@@ -124,6 +125,5 @@ class DebianConfigurator(configurator.ApacheConfigurator):
"Unable to find a2dismod, please make sure a2enmod and "
"a2dismod are configured correctly for certbot.")
- self.reverter.register_undo_command(
- temp, [self.options.dismod, "-f", mod_name])
+ self.reverter.register_undo_command(temp, [self.options.dismod, "-f", mod_name])
util.run_script([self.options.enmod, mod_name])
diff --git a/certbot-apache/certbot_apache/_internal/override_fedora.py b/certbot-apache/certbot_apache/_internal/override_fedora.py
index 9f2fed9b6..56aca775e 100644
--- a/certbot-apache/certbot_apache/_internal/override_fedora.py
+++ b/certbot-apache/certbot_apache/_internal/override_fedora.py
@@ -23,7 +23,7 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
challenge_location="/etc/httpd/conf.d",
)
- def config_test(self):
+ def config_test(self) -> None:
"""
Override config_test to mitigate configtest error in vanilla installation
of mod_ssl in Fedora. The error is caused by non-existent self-signed
@@ -35,13 +35,12 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
except errors.MisconfigurationError:
self._try_restart_fedora()
- def get_parser(self):
+ def get_parser(self) -> "FedoraParser":
"""Initializes the ApacheParser"""
return FedoraParser(
- self.options.server_root, self.options.vhost_root,
- self.version, configurator=self)
+ self.options.server_root, self, self.options.vhost_root, self.version)
- def _try_restart_fedora(self):
+ def _try_restart_fedora(self) -> None:
"""
Tries to restart httpd using systemctl to generate the self signed key pair.
"""
@@ -53,7 +52,7 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
# Finish with actual config check to see if systemctl restart helped
super().config_test()
- def _prepare_options(self):
+ def _prepare_options(self) -> None:
"""
Override the options dictionary initialization to keep using apachectl
instead of httpd and so take advantages of this new bash script in newer versions
@@ -69,18 +68,18 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
class FedoraParser(parser.ApacheParser):
"""Fedora 29+ specific ApacheParser override class"""
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args, **kwargs) -> None:
# Fedora 29+ specific configuration file for Apache
self.sysconfig_filep = "/etc/sysconfig/httpd"
super().__init__(*args, **kwargs)
- def update_runtime_variables(self):
+ def update_runtime_variables(self) -> None:
""" Override for update_runtime_variables for custom parsing """
# Opportunistic, works if SELinux not enforced
super().update_runtime_variables()
self._parse_sysconfig_var()
- def _parse_sysconfig_var(self):
+ def _parse_sysconfig_var(self) -> None:
""" Parses Apache CLI options from Fedora configuration file """
defines = apache_util.parse_define_file(self.sysconfig_filep, "OPTIONS")
for k, v in defines.items():
diff --git a/certbot-apache/certbot_apache/_internal/override_gentoo.py b/certbot-apache/certbot_apache/_internal/override_gentoo.py
index 6e02c05e7..15368bf21 100644
--- a/certbot-apache/certbot_apache/_internal/override_gentoo.py
+++ b/certbot-apache/certbot_apache/_internal/override_gentoo.py
@@ -1,4 +1,6 @@
""" Distribution specific override class for Gentoo Linux """
+from typing import Any
+
from certbot_apache._internal import apache_util
from certbot_apache._internal import configurator
from certbot_apache._internal import parser
@@ -16,7 +18,7 @@ class GentooConfigurator(configurator.ApacheConfigurator):
challenge_location="/etc/apache2/vhosts.d",
)
- def _prepare_options(self):
+ def _prepare_options(self) -> None:
"""
Override the options dictionary initialization in order to support
alternative restart cmd used in Gentoo.
@@ -26,33 +28,32 @@ class GentooConfigurator(configurator.ApacheConfigurator):
raise ValueError("OS option restart_cmd_alt must be set for Gentoo.")
self.options.restart_cmd_alt[0] = self.options.ctl
- def get_parser(self):
+ def get_parser(self) -> "GentooParser":
"""Initializes the ApacheParser"""
return GentooParser(
- self.options.server_root, self.options.vhost_root,
- self.version, configurator=self)
+ self.options.server_root, self, self.options.vhost_root, self.version)
class GentooParser(parser.ApacheParser):
"""Gentoo specific ApacheParser override class"""
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
# Gentoo specific configuration file for Apache2
self.apacheconfig_filep = "/etc/conf.d/apache2"
super().__init__(*args, **kwargs)
- def update_runtime_variables(self):
+ def update_runtime_variables(self) -> None:
""" Override for update_runtime_variables for custom parsing """
self.parse_sysconfig_var()
self.update_modules()
- def parse_sysconfig_var(self):
+ def parse_sysconfig_var(self) -> None:
""" Parses Apache CLI options from Gentoo configuration file """
defines = apache_util.parse_define_file(self.apacheconfig_filep,
"APACHE2_OPTS")
for k, v in defines.items():
self.variables[k] = v
- def update_modules(self):
+ def update_modules(self) -> None:
"""Get loaded modules from httpd process, and add them to DOM"""
mod_cmd = [self.configurator.options.ctl, "modules"]
matches = apache_util.parse_from_subprocess(mod_cmd, r"(.*)_module")
diff --git a/certbot-apache/certbot_apache/_internal/parser.py b/certbot-apache/certbot_apache/_internal/parser.py
index 773f5f1e4..f52cb2416 100644
--- a/certbot-apache/certbot_apache/_internal/parser.py
+++ b/certbot-apache/certbot_apache/_internal/parser.py
@@ -3,15 +3,26 @@ import copy
import fnmatch
import logging
import re
+from typing import Collection
from typing import Dict
+from typing import Iterable
from typing import List
+from typing import Mapping
from typing import Optional
+from typing import Pattern
+from typing import Set
+from typing import TYPE_CHECKING
+from typing import Tuple
+from typing import Union
from certbot import errors
from certbot.compat import os
from certbot_apache._internal import apache_util
from certbot_apache._internal import constants
+if TYPE_CHECKING:
+ from certbot_apache._internal.configurator import ApacheConfigurator # pragma: no cover
+
try:
from augeas import Augeas
except ImportError: # pragma: no cover
@@ -32,11 +43,11 @@ class ApacheParser:
default - user config file, name - NameVirtualHost,
"""
- arg_var_interpreter = re.compile(r"\$\{[^ \}]*}")
- fnmatch_chars = {"*", "?", "\\", "[", "]"}
+ arg_var_interpreter: Pattern = re.compile(r"\$\{[^ \}]*}")
+ fnmatch_chars: Set[str] = {"*", "?", "\\", "[", "]"}
- def __init__(self, root, vhostroot=None, version=(2, 4),
- configurator=None):
+ def __init__(self, root: str, configurator: "ApacheConfigurator",
+ vhostroot: str, version: Tuple[int, ...] = (2, 4)) -> None:
# Note: Order is important here.
# Needed for calling save() with reverter functionality that resides in
@@ -45,7 +56,7 @@ class ApacheParser:
self.configurator = configurator
# Initialize augeas
- self.aug = init_augeas()
+ self.aug: Augeas = init_augeas()
if not self.check_aug_version():
raise errors.NotSupportedError(
@@ -58,8 +69,8 @@ class ApacheParser:
self.variables: Dict[str, str] = {}
# Find configuration root and make sure augeas can parse it.
- self.root = os.path.abspath(root)
- self.loc = {"root": self._find_config_root()}
+ self.root: str = os.path.abspath(root)
+ self.loc: Dict[str, str] = {"root": self._find_config_root()}
self.parse_file(self.loc["root"])
if version >= (2, 4):
@@ -88,7 +99,7 @@ class ApacheParser:
if self.find_dir("Define", exclude=False):
raise errors.PluginError("Error parsing runtime variables")
- def check_parsing_errors(self, lens):
+ def check_parsing_errors(self, lens: str) -> None:
"""Verify Augeas can parse all of the lens files.
:param str lens: lens to check for errors
@@ -114,7 +125,7 @@ class ApacheParser:
self.aug.get(path + "/message")))
raise errors.PluginError(msg)
- def check_aug_version(self):
+ def check_aug_version(self) -> bool:
""" Checks that we have recent enough version of libaugeas.
If augeas version is recent enough, it will support case insensitive
regexp matching"""
@@ -129,7 +140,7 @@ class ApacheParser:
self.aug.remove("/test/path")
return matches
- def unsaved_files(self):
+ def unsaved_files(self) -> Set[str]:
"""Lists files that have modified Augeas DOM but the changes have not
been written to the filesystem yet, used by `self.save()` and
ApacheConfigurator to check the file state.
@@ -168,7 +179,7 @@ class ApacheParser:
save_files.add(self.aug.get(path)[6:])
return save_files
- def ensure_augeas_state(self):
+ def ensure_augeas_state(self) -> None:
"""Makes sure that all Augeas dom changes are written to files to avoid
loss of configuration directives when doing additional augeas parsing,
causing a possible augeas.load() resulting dom reset
@@ -178,7 +189,7 @@ class ApacheParser:
self.configurator.save_notes += "(autosave)"
self.configurator.save()
- def save(self, save_files):
+ def save(self, save_files: Iterable[str]) -> None:
"""Saves all changes to the configuration files.
save() is called from ApacheConfigurator to handle the parser specific
@@ -197,7 +208,7 @@ class ApacheParser:
self.aug.remove("/files/"+sf)
self.aug.load()
- def _log_save_errors(self, ex_errs):
+ def _log_save_errors(self, ex_errs: List[str]) -> None:
"""Log errors due to bad Augeas save.
:param list ex_errs: Existing errors before save
@@ -211,7 +222,7 @@ class ApacheParser:
# Only new errors caused by recent save
if err not in ex_errs), self.configurator.save_notes)
- def add_include(self, main_config, inc_path):
+ def add_include(self, main_config: str, inc_path: str) -> None:
"""Add Include for a new configuration file if one does not exist
:param str main_config: file path to main Apache config file
@@ -230,21 +241,21 @@ class ApacheParser:
new_file = os.path.basename(inc_path)
self.existing_paths.setdefault(new_dir, []).append(new_file)
- def add_mod(self, mod_name):
+ def add_mod(self, mod_name: str) -> None:
"""Shortcut for updating parser modules."""
if mod_name + "_module" not in self.modules:
self.modules[mod_name + "_module"] = None
if "mod_" + mod_name + ".c" not in self.modules:
self.modules["mod_" + mod_name + ".c"] = None
- def reset_modules(self):
+ def reset_modules(self) -> None:
"""Reset the loaded modules list. This is called from cleanup to clear
temporarily loaded modules."""
self.modules = {}
self.update_modules()
self.parse_modules()
- def parse_modules(self):
+ def parse_modules(self) -> None:
"""Iterates on the configuration until no new modules are loaded.
..todo:: This should be attempted to be done with a binary to avoid
@@ -272,19 +283,18 @@ class ApacheParser:
match_name[6:])
self.modules.update(mods)
- def update_runtime_variables(self):
+ def update_runtime_variables(self) -> None:
"""Update Includes, Defines and Includes from httpd config dump data"""
self.update_defines()
self.update_includes()
self.update_modules()
- def update_defines(self):
+ def update_defines(self) -> None:
"""Updates the dictionary of known variables in the configuration"""
-
self.variables = apache_util.parse_defines(self.configurator.options.ctl)
- def update_includes(self):
+ def update_includes(self) -> None:
"""Get includes from httpd process, and add them to DOM if needed"""
# Find_dir iterates over configuration for Include and IncludeOptional
@@ -298,28 +308,28 @@ class ApacheParser:
if not self.parsed_in_current(i):
self.parse_file(i)
- def update_modules(self):
+ def update_modules(self) -> None:
"""Get loaded modules from httpd process, and add them to DOM"""
matches = apache_util.parse_modules(self.configurator.options.ctl)
for mod in matches:
self.add_mod(mod.strip())
- def filter_args_num(self, matches, args):
+ def filter_args_num(self, matches: str, args: int) -> List[str]:
"""Filter out directives with specific number of arguments.
This function makes the assumption that all related arguments are given
in order. Thus /files/apache/directive[5]/arg[2] must come immediately
after /files/apache/directive[5]/arg[1]. Runs in 1 linear pass.
- :param string matches: Matches of all directives with arg nodes
+ :param str matches: Matches of all directives with arg nodes
:param int args: Number of args you would like to filter
:returns: List of directives that contain # of arguments.
(arg is stripped off)
"""
- filtered = []
+ filtered: List[str] = []
if args == 1:
for i, match in enumerate(matches):
if match.endswith("/arg"):
@@ -336,7 +346,7 @@ class ApacheParser:
return filtered
- def add_dir_to_ifmodssl(self, aug_conf_path, directive, args):
+ def add_dir_to_ifmodssl(self, aug_conf_path: str, directive: str, args: List[str]) -> None:
"""Adds directive and value to IfMod ssl block.
Adds given directive and value along configuration path within
@@ -362,7 +372,7 @@ class ApacheParser:
for i, arg in enumerate(args):
self.aug.set("%s/arg[%d]" % (nvh_path, i + 1), arg)
- def get_ifmod(self, aug_conf_path, mod, beginning=False):
+ def get_ifmod(self, aug_conf_path: str, mod: str, beginning: bool = False) -> str:
"""Returns the path to <IfMod mod> and creates one if it doesn't exist.
:param str aug_conf_path: Augeas configuration path
@@ -384,7 +394,7 @@ class ApacheParser:
# Strip off "arg" at end of first ifmod path
return if_mods[0].rpartition("arg")[0]
- def create_ifmod(self, aug_conf_path, mod, beginning=False):
+ def create_ifmod(self, aug_conf_path: str, mod: str, beginning: bool = False) -> str:
"""Creates a new <IfMod mod> and returns its path.
:param str aug_conf_path: Augeas configuration path
@@ -411,7 +421,9 @@ class ApacheParser:
self.aug.set(c_path_arg, mod)
return retpath
- def add_dir(self, aug_conf_path, directive, args):
+ def add_dir(
+ self, aug_conf_path: str, directive: Optional[str], args: Union[List[str], str]
+ ) -> None:
"""Appends directive to the end fo the file given by aug_conf_path.
.. note:: Not added to AugeasConfigurator because it may depend
@@ -431,7 +443,8 @@ class ApacheParser:
else:
self.aug.set(aug_conf_path + "/directive[last()]/arg", args)
- def add_dir_beginning(self, aug_conf_path, dirname, args):
+ def add_dir_beginning(self, aug_conf_path: str, dirname: str,
+ args: Union[List[str], str]) -> None:
"""Adds the directive to the beginning of defined aug_conf_path.
:param str aug_conf_path: Augeas configuration path to add directive
@@ -452,7 +465,7 @@ class ApacheParser:
else:
self.aug.set(first_dir + "/arg", args)
- def add_comment(self, aug_conf_path, comment):
+ def add_comment(self, aug_conf_path: str, comment: str) -> None:
"""Adds the comment to the augeas path
:param str aug_conf_path: Augeas configuration path to add directive
@@ -461,7 +474,7 @@ class ApacheParser:
"""
self.aug.set(aug_conf_path + "/#comment[last() + 1]", comment)
- def find_comments(self, arg, start=None):
+ def find_comments(self, arg: str, start: Optional[str] = None) -> List[str]:
"""Finds a comment with specified content from the provided DOM path
:param str arg: Comment content to search
@@ -483,7 +496,8 @@ class ApacheParser:
results.append(comment)
return results
- def find_dir(self, directive, arg=None, start=None, exclude=True):
+ def find_dir(self, directive: str, arg: Optional[str] = None,
+ start: Optional[str] = None, exclude: bool = True) -> List[str]:
"""Finds directive in the configuration.
Recursively searches through config files to find directives
@@ -511,6 +525,8 @@ class ApacheParser:
:param bool exclude: Whether or not to exclude directives based on
variables and enabled modules
+ :rtype list
+
"""
# Cannot place member variable in the definition of the function so...
if not start:
@@ -559,7 +575,7 @@ class ApacheParser:
return ordered_matches
- def get_all_args(self, match):
+ def get_all_args(self, match: str) -> List[Optional[str]]:
"""
Tries to fetch all arguments for a directive. See get_arg.
@@ -569,11 +585,11 @@ class ApacheParser:
"""
if match[-1] != "/":
- match = match+"/"
+ match = match + "/"
allargs = self.aug.match(match + '*')
return [self.get_arg(arg) for arg in allargs]
- def get_arg(self, match):
+ def get_arg(self, match: Optional[str]) -> Optional[str]:
"""Uses augeas.get to get argument value and interprets result.
This also converts all variables and parameters appropriately.
@@ -588,6 +604,7 @@ class ApacheParser:
# e.g. strip now, not later
if not value:
return None
+
value = value.strip("'\"")
variables = ApacheParser.arg_var_interpreter.findall(value)
@@ -601,13 +618,13 @@ class ApacheParser:
return value
- def get_root_augpath(self):
+ def get_root_augpath(self) -> str:
"""
Returns the Augeas path of root configuration.
"""
return get_aug_path(self.loc["root"])
- def exclude_dirs(self, matches):
+ def exclude_dirs(self, matches: Iterable[str]) -> List[str]:
"""Exclude directives that are not loaded into the configuration."""
filters = [("ifmodule", self.modules.keys()), ("ifdefine", self.variables)]
@@ -621,7 +638,7 @@ class ApacheParser:
valid_matches.append(match)
return valid_matches
- def _pass_filter(self, match, filter_):
+ def _pass_filter(self, match: str, filter_: Tuple[str, Collection[str]]) -> bool:
"""Determine if directive passes a filter.
:param str match: Augeas path
@@ -650,7 +667,7 @@ class ApacheParser:
return True
- def standard_path_from_server_root(self, arg):
+ def standard_path_from_server_root(self, arg: str) -> str:
"""Ensure paths are consistent and absolute
:param str arg: Argument of directive
@@ -669,7 +686,7 @@ class ApacheParser:
arg = os.path.normpath(arg)
return arg
- def _get_include_path(self, arg):
+ def _get_include_path(self, arg: Optional[str]) -> Optional[str]:
"""Converts an Apache Include directive into Augeas path.
Converts an Apache Include directive argument into an Augeas
@@ -689,6 +706,8 @@ class ApacheParser:
# if matchObj.group() != arg:
# logger.error("Error: Invalid regexp characters in %s", arg)
# return []
+ if arg is None:
+ return None # pragma: no cover
arg = self.standard_path_from_server_root(arg)
# Attempts to add a transform to the file if one does not already exist
@@ -713,7 +732,7 @@ class ApacheParser:
return get_aug_path(arg)
- def fnmatch_to_re(self, clean_fn_match):
+ def fnmatch_to_re(self, clean_fn_match: str) -> str:
"""Method converts Apache's basic fnmatch to regular expression.
Assumption - Configs are assumed to be well-formed and only writable by
@@ -730,7 +749,7 @@ class ApacheParser:
# Since Python 3.6, it returns a different pattern like (?s:.*\.load)\Z
return fnmatch.translate(clean_fn_match)[4:-3] # pragma: no cover
- def parse_file(self, filepath):
+ def parse_file(self, filepath: str) -> None:
"""Parse file with Augeas
Checks to see if file_path is parsed by Augeas
@@ -757,7 +776,7 @@ class ApacheParser:
self._add_httpd_transform(filepath)
self.aug.load()
- def parsed_in_current(self, filep):
+ def parsed_in_current(self, filep: Optional[str]) -> bool:
"""Checks if the file path is parsed by current Augeas parser config
ie. returns True if the file is found on a path that's found in live
Augeas configuration.
@@ -767,9 +786,11 @@ class ApacheParser:
:returns: True if file is parsed in existing configuration tree
:rtype: bool
"""
+ if not filep:
+ return False # pragma: no cover
return self._parsed_by_parser_paths(filep, self.parser_paths)
- def parsed_in_original(self, filep):
+ def parsed_in_original(self, filep: Optional[str]) -> bool:
"""Checks if the file path is parsed by existing Apache config.
ie. returns True if the file is found on a path that matches Include or
IncludeOptional statement in the Apache configuration.
@@ -779,9 +800,11 @@ class ApacheParser:
:returns: True if file is parsed in existing configuration tree
:rtype: bool
"""
+ if not filep:
+ return False # pragma: no cover
return self._parsed_by_parser_paths(filep, self.existing_paths)
- def _parsed_by_parser_paths(self, filep, paths):
+ def _parsed_by_parser_paths(self, filep: str, paths: Mapping[str, List[str]]) -> bool:
"""Helper function that searches through provided paths and returns
True if file path is found in the set"""
for directory in paths:
@@ -790,7 +813,7 @@ class ApacheParser:
return True
return False
- def _check_path_actions(self, filepath):
+ def _check_path_actions(self, filepath: str) -> Tuple[bool, bool]:
"""Determine actions to take with a new augeas path
This helper function will return a tuple that defines
@@ -815,7 +838,7 @@ class ApacheParser:
remove_old = False
return use_new, remove_old
- def _remove_httpd_transform(self, filepath):
+ def _remove_httpd_transform(self, filepath: str) -> None:
"""Remove path from Augeas transform
:param str filepath: filepath to remove
@@ -830,7 +853,7 @@ class ApacheParser:
self.aug.remove(remove_inc[0])
self.parser_paths.pop(remove_dirname)
- def _add_httpd_transform(self, incl):
+ def _add_httpd_transform(self, incl: str) -> None:
"""Add a transform to Augeas.
This function will correctly add a transform to augeas
@@ -840,7 +863,7 @@ class ApacheParser:
:param str incl: filepath to include for transform
"""
- last_include = self.aug.match("/augeas/load/Httpd/incl [last()]")
+ last_include: str = self.aug.match("/augeas/load/Httpd/incl [last()]")
if last_include:
# Insert a new node immediately after the last incl
self.aug.insert(last_include[0], "incl", False)
@@ -858,7 +881,7 @@ class ApacheParser:
self.parser_paths[os.path.dirname(incl)] = [
os.path.basename(incl)]
- def standardize_excl(self):
+ def standardize_excl(self) -> None:
"""Standardize the excl arguments for the Httpd lens in Augeas.
Note: Hack!
@@ -890,16 +913,16 @@ class ApacheParser:
self.aug.load()
- def _set_locations(self):
+ def _set_locations(self) -> Dict[str, str]:
"""Set default location for directives.
Locations are given as file_paths
.. todo:: Make sure that files are included
"""
- default = self.loc["root"]
+ default: str = self.loc["root"]
- temp = os.path.join(self.root, "ports.conf")
+ temp: str = os.path.join(self.root, "ports.conf")
if os.path.isfile(temp):
listen = temp
name = temp
@@ -909,7 +932,7 @@ class ApacheParser:
return {"default": default, "listen": listen, "name": name}
- def _find_config_root(self):
+ def _find_config_root(self) -> str:
"""Find the Apache Configuration Root file."""
location = ["apache2.conf", "httpd.conf", "conf/httpd.conf"]
for name in location:
@@ -918,7 +941,7 @@ class ApacheParser:
raise errors.NoInstallationError("Could not find configuration root")
-def case_i(string):
+def case_i(string: str) -> str:
"""Returns case insensitive regex.
Returns a sloppy, but necessary version of a case insensitive regex.
@@ -934,7 +957,7 @@ def case_i(string):
if c.isalpha() else c for c in re.escape(string))
-def get_aug_path(file_path):
+def get_aug_path(file_path: str) -> str:
"""Return augeas path for full filepath.
:param str file_path: Full filepath
diff --git a/certbot-apache/certbot_apache/_internal/parsernode_util.py b/certbot-apache/certbot_apache/_internal/parsernode_util.py
index 0e39ec365..544d2c96e 100644
--- a/certbot-apache/certbot_apache/_internal/parsernode_util.py
+++ b/certbot-apache/certbot_apache/_internal/parsernode_util.py
@@ -1,7 +1,12 @@
"""ParserNode utils"""
+from typing import Dict
+from typing import Any
+from typing import List
+from typing import Optional
+from typing import Tuple
-def validate_kwargs(kwargs, required_names):
+def validate_kwargs(kwargs: Dict[str, Any], required_names: List[str]) -> Dict[str, Any]:
"""
Ensures that the kwargs dict has all the expected values. This function modifies
the kwargs dictionary, and hence the returned dictionary should be used instead
@@ -11,7 +16,7 @@ def validate_kwargs(kwargs, required_names):
:param list required_names: List of required parameter names.
"""
- validated_kwargs = {}
+ validated_kwargs: Dict[str, Any] = {}
for name in required_names:
try:
validated_kwargs[name] = kwargs.pop(name)
@@ -25,7 +30,7 @@ def validate_kwargs(kwargs, required_names):
return validated_kwargs
-def parsernode_kwargs(kwargs):
+def parsernode_kwargs(kwargs: Dict[str, Any]) -> Tuple[Any, ...]:
"""
Validates keyword arguments for ParserNode. This function modifies the kwargs
dictionary, and hence the returned dictionary should be used instead in the
@@ -55,7 +60,7 @@ def parsernode_kwargs(kwargs):
return kwargs["ancestor"], kwargs["dirty"], kwargs["filepath"], kwargs["metadata"]
-def commentnode_kwargs(kwargs):
+def commentnode_kwargs(kwargs: Dict[str, Any]) -> Tuple[Optional[str], Dict[str, str]]:
"""
Validates keyword arguments for CommentNode and sets the default values for
optional kwargs. This function modifies the kwargs dictionary, and hence the
@@ -90,7 +95,7 @@ def commentnode_kwargs(kwargs):
return comment, kwargs
-def directivenode_kwargs(kwargs):
+def directivenode_kwargs(kwargs: Dict[str, Any]) -> Tuple[Any, Any, Any, Dict]:
"""
Validates keyword arguments for DirectiveNode and BlockNode and sets the
default values for optional kwargs. This function modifies the kwargs
diff --git a/certbot-apache/tests/parser_test.py b/certbot-apache/tests/parser_test.py
index 5166d9e0e..8607eef11 100644
--- a/certbot-apache/tests/parser_test.py
+++ b/certbot-apache/tests/parser_test.py
@@ -345,8 +345,8 @@ class ParserInitTest(util.ApacheTest):
self.config.config_test = mock.Mock()
self.assertRaises(
errors.NoInstallationError, ApacheParser,
- os.path.relpath(self.config_path), "/dummy/vhostpath",
- version=(2, 4, 22), configurator=self.config)
+ os.path.relpath(self.config_path), self.config,
+ "/dummy/vhostpath", version=(2, 4, 22))
def test_init_old_aug(self):
from certbot_apache._internal.parser import ApacheParser
@@ -354,8 +354,8 @@ class ParserInitTest(util.ApacheTest):
mock_c.return_value = False
self.assertRaises(
errors.NotSupportedError,
- ApacheParser, os.path.relpath(self.config_path),
- "/dummy/vhostpath", version=(2, 4, 22), configurator=self.config)
+ ApacheParser, os.path.relpath(self.config_path), self.config,
+ "/dummy/vhostpath", version=(2, 4, 22))
@mock.patch("certbot_apache._internal.apache_util._get_runtime_cfg")
def test_unparseable(self, mock_cfg):
@@ -363,8 +363,8 @@ class ParserInitTest(util.ApacheTest):
mock_cfg.return_value = ('Define: TEST')
self.assertRaises(
errors.PluginError,
- ApacheParser, os.path.relpath(self.config_path),
- "/dummy/vhostpath", version=(2, 2, 22), configurator=self.config)
+ ApacheParser, os.path.relpath(self.config_path), self.config,
+ "/dummy/vhostpath", version=(2, 2, 22))
def test_root_normalized(self):
from certbot_apache._internal.parser import ApacheParser
@@ -375,7 +375,7 @@ class ParserInitTest(util.ApacheTest):
self.temp_dir,
"debian_apache_2_4/////multiple_vhosts/../multiple_vhosts/apache2")
- parser = ApacheParser(path, "/dummy/vhostpath", configurator=self.config)
+ parser = ApacheParser(path, self.config, "/dummy/vhostpath")
self.assertEqual(parser.root, self.config_path)
@@ -384,8 +384,7 @@ class ParserInitTest(util.ApacheTest):
with mock.patch("certbot_apache._internal.parser.ApacheParser."
"update_runtime_variables"):
parser = ApacheParser(
- os.path.relpath(self.config_path),
- "/dummy/vhostpath", configurator=self.config)
+ os.path.relpath(self.config_path), self.config, "/dummy/vhostpath")
self.assertEqual(parser.root, self.config_path)
@@ -394,8 +393,7 @@ class ParserInitTest(util.ApacheTest):
with mock.patch("certbot_apache._internal.parser.ApacheParser."
"update_runtime_variables"):
parser = ApacheParser(
- self.config_path + os.path.sep,
- "/dummy/vhostpath", configurator=self.config)
+ self.config_path + os.path.sep, self.config, "/dummy/vhostpath")
self.assertEqual(parser.root, self.config_path)
diff --git a/certbot-apache/tests/util.py b/certbot-apache/tests/util.py
index f0c76e693..a4191b3fe 100644
--- a/certbot-apache/tests/util.py
+++ b/certbot-apache/tests/util.py
@@ -73,7 +73,7 @@ class ParserTest(ApacheTest):
with mock.patch("certbot_apache._internal.parser.ApacheParser."
"update_runtime_variables"):
self.parser = ApacheParser(
- self.config_path, self.vhost_path, configurator=self.config)
+ self.config_path, self.config, self.vhost_path)
def get_apache_configurator(
diff --git a/certbot-compatibility-test/certbot_compatibility_test/configurators/common.py b/certbot-compatibility-test/certbot_compatibility_test/configurators/common.py
index 432f48f32..b25d2d04b 100644
--- a/certbot-compatibility-test/certbot_compatibility_test/configurators/common.py
+++ b/certbot-compatibility-test/certbot_compatibility_test/configurators/common.py
@@ -8,11 +8,11 @@ import tempfile
from typing import Iterable
from typing import List
from typing import Optional
+from typing import Union
from typing import overload
from typing import Set
from typing import Tuple
from typing import Type
-from typing import Union
from certbot_compatibility_test import errors
from certbot_compatibility_test import interfaces
diff --git a/certbot/certbot/_internal/client.py b/certbot/certbot/_internal/client.py
index fad48b3d3..fec016ec7 100644
--- a/certbot/certbot/_internal/client.py
+++ b/certbot/certbot/_internal/client.py
@@ -10,7 +10,6 @@ from typing import IO
from typing import List
from typing import Optional
from typing import Tuple
-from typing import Union
import warnings
from cryptography.hazmat.backends import default_backend
@@ -573,6 +572,7 @@ class Client:
:param list domains: list of domains to install the certificate
:param str privkey_path: path to certificate private key
:param str cert_path: certificate file path (optional)
+ :param str fullchain_path: path to the full chain of the certificate
:param str chain_path: chain file path
"""
@@ -643,13 +643,13 @@ class Client:
"Option %s is not supported by the selected installer. "
"Skipping enhancement.", config_name)
- msg = ("We were unable to restart web server")
+ msg = "We were unable to restart web server"
if enhanced:
with error_handler.ErrorHandler(self._rollback_and_restart, msg):
self.installer.restart()
def apply_enhancement(self, domains: List[str], enhancement: str,
- options: Optional[Union[List[str], str]] = None) -> None:
+ options: Optional[str] = None) -> None:
"""Applies an enhancement on all domains.
:param list domains: list of ssl_vhosts (as strings)
diff --git a/certbot/certbot/interfaces.py b/certbot/certbot/interfaces.py
index 16d3124e1..0d12ffd2e 100644
--- a/certbot/certbot/interfaces.py
+++ b/certbot/certbot/interfaces.py
@@ -5,13 +5,13 @@ from argparse import ArgumentParser
import sys
from types import ModuleType
from typing import Any
+from typing import Union
from typing import cast
from typing import Iterable
from typing import List
from typing import Optional
from typing import Type
from typing import TYPE_CHECKING
-from typing import Union
import warnings
import zope.interface
diff --git a/certbot/certbot/plugins/common.py b/certbot/certbot/plugins/common.py
index cc4ace4fa..3af0ee6ec 100644
--- a/certbot/certbot/plugins/common.py
+++ b/certbot/certbot/plugins/common.py
@@ -12,6 +12,8 @@ from typing import Iterable
from typing import List
from typing import Optional
from typing import Set
+from typing import Type
+from typing import TypeVar
from typing import Tuple
import pkg_resources
@@ -244,6 +246,9 @@ class Configurator(Installer, interfaces.Authenticator, metaclass=ABCMeta):
"""
+GenericAddr = TypeVar("GenericAddr", bound="Addr")
+
+
class Addr:
r"""Represents an virtual host address.
@@ -256,7 +261,7 @@ class Addr:
self.ipv6 = ipv6
@classmethod
- def fromstring(cls, str_addr: str) -> Optional['Addr']:
+ def fromstring(cls: Type[GenericAddr], str_addr: str) -> Optional[GenericAddr]:
"""Initialize Addr from string."""
if str_addr.startswith('['):
# ipv6 addresses starts with [
@@ -301,7 +306,7 @@ class Addr:
"""Return port."""
return self.tup[1]
- def get_addr_obj(self, port: str) -> 'Addr':
+ def get_addr_obj(self: GenericAddr, port: str) -> GenericAddr:
"""Return new address object with same addr and new port."""
return self.__class__((self.tup[0], port), self.ipv6)
diff --git a/certbot/certbot/tests/util.py b/certbot/certbot/tests/util.py
index b09a2d2a1..7889ded98 100644
--- a/certbot/certbot/tests/util.py
+++ b/certbot/certbot/tests/util.py
@@ -9,12 +9,12 @@ import sys
import tempfile
from typing import Any
from typing import Callable
+from typing import Union
from typing import cast
from typing import IO
from typing import Iterable
from typing import List
from typing import Optional
-from typing import Union
import unittest
import warnings