diff options
Diffstat (limited to 'certbot-nginx/certbot_nginx/_internal/parser.py')
-rw-r--r-- | certbot-nginx/certbot_nginx/_internal/parser.py | 764 |
1 files changed, 764 insertions, 0 deletions
diff --git a/certbot-nginx/certbot_nginx/_internal/parser.py b/certbot-nginx/certbot_nginx/_internal/parser.py new file mode 100644 index 000000000..edb77a1c1 --- /dev/null +++ b/certbot-nginx/certbot_nginx/_internal/parser.py @@ -0,0 +1,764 @@ +"""NginxParser is a member object of the NginxConfigurator class.""" +import copy +import functools +import glob +import logging +import re + +import pyparsing +import six + +from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module +from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module +from acme.magic_typing import Set # pylint: disable=unused-import, no-name-in-module +from acme.magic_typing import Tuple # pylint: disable=unused-import, no-name-in-module +from acme.magic_typing import Union # pylint: disable=unused-import, no-name-in-module +from certbot import errors +from certbot.compat import os +from certbot_nginx._internal import nginxparser +from certbot_nginx._internal import obj + +logger = logging.getLogger(__name__) + + +class NginxParser(object): + """Class handles the fine details of parsing the Nginx Configuration. + + :ivar str root: Normalized absolute path to the server root + directory. Without trailing slash. + :ivar dict parsed: Mapping of file paths to parsed trees + + """ + + def __init__(self, root): + self.parsed = {} # type: Dict[str, Union[List, nginxparser.UnspacedList]] + self.root = os.path.abspath(root) + self.config_root = self._find_config_root() + + # Parse nginx.conf and included files. + # TODO: Check sites-available/ as well. For now, the configurator does + # not enable sites from there. + self.load() + + def load(self): + """Loads Nginx files into a parsed tree. + + """ + self.parsed = {} + self._parse_recursively(self.config_root) + + def _parse_recursively(self, filepath): + """Parses nginx config files recursively by looking at 'include' + directives inside 'http' and 'server' blocks. Note that this only + reads Nginx files that potentially declare a virtual host. + + :param str filepath: The path to the files to parse, as a glob + + """ + # pylint: disable=too-many-nested-blocks + filepath = self.abs_path(filepath) + trees = self._parse_files(filepath) + for tree in trees: + for entry in tree: + if _is_include_directive(entry): + # Parse the top-level included file + self._parse_recursively(entry[1]) + elif entry[0] == ['http'] or entry[0] == ['server']: + # Look for includes in the top-level 'http'/'server' context + for subentry in entry[1]: + if _is_include_directive(subentry): + self._parse_recursively(subentry[1]) + elif entry[0] == ['http'] and subentry[0] == ['server']: + # Look for includes in a 'server' context within + # an 'http' context + for server_entry in subentry[1]: + if _is_include_directive(server_entry): + self._parse_recursively(server_entry[1]) + + def abs_path(self, path): + """Converts a relative path to an absolute path relative to the root. + Does nothing for paths that are already absolute. + + :param str path: The path + :returns: The absolute path + :rtype: str + + """ + if not os.path.isabs(path): + return os.path.normpath(os.path.join(self.root, path)) + return os.path.normpath(path) + + def _build_addr_to_ssl(self): + """Builds a map from address to whether it listens on ssl in any server block + """ + servers = self._get_raw_servers() + + addr_to_ssl = {} # type: Dict[Tuple[str, str], bool] + for filename in servers: + for server, _ in servers[filename]: + # Parse the server block to save addr info + parsed_server = _parse_server_raw(server) + for addr in parsed_server['addrs']: + addr_tuple = addr.normalized_tuple() + if addr_tuple not in addr_to_ssl: + addr_to_ssl[addr_tuple] = addr.ssl + addr_to_ssl[addr_tuple] = addr.ssl or addr_to_ssl[addr_tuple] + return addr_to_ssl + + def _get_raw_servers(self): + # pylint: disable=cell-var-from-loop + # type: () -> Dict + """Get a map of unparsed all server blocks + """ + servers = {} # type: Dict[str, Union[List, nginxparser.UnspacedList]] + for filename in self.parsed: + tree = self.parsed[filename] + servers[filename] = [] + srv = servers[filename] # workaround undefined loop var in lambdas + + # Find all the server blocks + _do_for_subarray(tree, lambda x: len(x) >= 2 and x[0] == ['server'], + lambda x, y: srv.append((x[1], y))) + + # Find 'include' statements in server blocks and append their trees + for i, (server, path) in enumerate(servers[filename]): + new_server = self._get_included_directives(server) + servers[filename][i] = (new_server, path) + return servers + + def get_vhosts(self): + # pylint: disable=cell-var-from-loop + """Gets list of all 'virtual hosts' found in Nginx configuration. + Technically this is a misnomer because Nginx does not have virtual + hosts, it has 'server blocks'. + + :returns: List of :class:`~certbot_nginx._internal.obj.VirtualHost` + objects found in configuration + :rtype: list + + """ + enabled = True # We only look at enabled vhosts for now + servers = self._get_raw_servers() + + vhosts = [] + for filename in servers: + for server, path in servers[filename]: + # Parse the server block into a VirtualHost object + + parsed_server = _parse_server_raw(server) + vhost = obj.VirtualHost(filename, + parsed_server['addrs'], + parsed_server['ssl'], + enabled, + parsed_server['names'], + server, + path) + vhosts.append(vhost) + + self._update_vhosts_addrs_ssl(vhosts) + + return vhosts + + def _update_vhosts_addrs_ssl(self, vhosts): + """Update a list of raw parsed vhosts to include global address sslishness + """ + addr_to_ssl = self._build_addr_to_ssl() + for vhost in vhosts: + for addr in vhost.addrs: + addr.ssl = addr_to_ssl[addr.normalized_tuple()] + if addr.ssl: + vhost.ssl = True + + def _get_included_directives(self, block): + """Returns array with the "include" directives expanded out by + concatenating the contents of the included file to the block. + + :param list block: + :rtype: list + + """ + result = copy.deepcopy(block) # Copy the list to keep self.parsed idempotent + for directive in block: + if _is_include_directive(directive): + included_files = glob.glob( + self.abs_path(directive[1])) + for incl in included_files: + try: + result.extend(self.parsed[incl]) + except KeyError: + pass + return result + + def _parse_files(self, filepath, override=False): + """Parse files from a glob + + :param str filepath: Nginx config file path + :param bool override: Whether to parse a file that has been parsed + :returns: list of parsed tree structures + :rtype: list + + """ + files = glob.glob(filepath) # nginx on unix calls glob(3) for this + # XXX Windows nginx uses FindFirstFile, and + # should have a narrower call here + trees = [] + for item in files: + if item in self.parsed and not override: + continue + try: + with open(item) as _file: + parsed = nginxparser.load(_file) + self.parsed[item] = parsed + trees.append(parsed) + except IOError: + logger.warning("Could not open file: %s", item) + except pyparsing.ParseException as err: + logger.debug("Could not parse file: %s due to %s", item, err) + return trees + + def _find_config_root(self): + """Return the Nginx Configuration Root file.""" + location = ['nginx.conf'] + + for name in location: + if os.path.isfile(os.path.join(self.root, name)): + return os.path.join(self.root, name) + + raise errors.NoInstallationError( + "Could not find Nginx root configuration file (nginx.conf)") + + def filedump(self, ext='tmp', lazy=True): + """Dumps parsed configurations into files. + + :param str ext: The file extension to use for the dumped files. If + empty, this overrides the existing conf files. + :param bool lazy: Only write files that have been modified + + """ + # Best-effort atomicity is enforced above us by reverter.py + for filename in self.parsed: + tree = self.parsed[filename] + if ext: + filename = filename + os.path.extsep + ext + try: + if lazy and not tree.is_dirty(): + continue + out = nginxparser.dumps(tree) + logger.debug('Writing nginx conf tree to %s:\n%s', filename, out) + with open(filename, 'w') as _file: + _file.write(out) + + except IOError: + logger.error("Could not open file for writing: %s", filename) + + def parse_server(self, server): + """Parses a list of server directives, accounting for global address sslishness. + + :param list server: list of directives in a server block + :rtype: dict + """ + addr_to_ssl = self._build_addr_to_ssl() + parsed_server = _parse_server_raw(server) + _apply_global_addr_ssl(addr_to_ssl, parsed_server) + return parsed_server + + def has_ssl_on_directive(self, vhost): + """Does vhost have ssl on for all ports? + + :param :class:`~certbot_nginx._internal.obj.VirtualHost` vhost: The vhost in question + + :returns: True if 'ssl on' directive is included + :rtype: bool + + """ + server = vhost.raw + for directive in server: + if not directive: + continue + if _is_ssl_on_directive(directive): + return True + + return False + + def add_server_directives(self, vhost, directives, insert_at_top=False): + """Add directives to the server block identified by vhost. + + This method modifies vhost to be fully consistent with the new directives. + + ..note :: It's an error to try and add a nonrepeatable directive that already + exists in the config block with a conflicting value. + + ..todo :: Doesn't match server blocks whose server_name directives are + split across multiple conf files. + + :param :class:`~certbot_nginx._internal.obj.VirtualHost` vhost: The vhost + whose information we use to match on + :param list directives: The directives to add + :param bool insert_at_top: True if the directives need to be inserted at the top + of the server block instead of the bottom + + """ + self._modify_server_directives(vhost, + functools.partial(_add_directives, directives, insert_at_top)) + + def update_or_add_server_directives(self, vhost, directives, insert_at_top=False): + """Add or replace directives in the server block identified by vhost. + + This method modifies vhost to be fully consistent with the new directives. + + ..note :: When a directive with the same name already exists in the + config block, the first instance will be replaced. Otherwise, the directive + will be appended/prepended to the config block as in add_server_directives. + + ..todo :: Doesn't match server blocks whose server_name directives are + split across multiple conf files. + + :param :class:`~certbot_nginx._internal.obj.VirtualHost` vhost: The vhost + whose information we use to match on + :param list directives: The directives to add + :param bool insert_at_top: True if the directives need to be inserted at the top + of the server block instead of the bottom + + """ + self._modify_server_directives(vhost, + functools.partial(_update_or_add_directives, directives, insert_at_top)) + + def remove_server_directives(self, vhost, directive_name, match_func=None): + """Remove all directives of type directive_name. + + :param :class:`~certbot_nginx._internal.obj.VirtualHost` vhost: The vhost + to remove directives from + :param string directive_name: The directive type to remove + :param callable match_func: Function of the directive that returns true for directives + to be deleted. + """ + self._modify_server_directives(vhost, + functools.partial(_remove_directives, directive_name, match_func)) + + def _update_vhost_based_on_new_directives(self, vhost, directives_list): + new_server = self._get_included_directives(directives_list) + parsed_server = self.parse_server(new_server) + vhost.addrs = parsed_server['addrs'] + vhost.ssl = parsed_server['ssl'] + vhost.names = parsed_server['names'] + vhost.raw = new_server + + def _modify_server_directives(self, vhost, block_func): + filename = vhost.filep + try: + result = self.parsed[filename] + for index in vhost.path: + result = result[index] + if not isinstance(result, list) or len(result) != 2: + raise errors.MisconfigurationError("Not a server block.") + result = result[1] + block_func(result) + + self._update_vhost_based_on_new_directives(vhost, result) + except errors.MisconfigurationError as err: + raise errors.MisconfigurationError("Problem in %s: %s" % (filename, str(err))) + + def duplicate_vhost(self, vhost_template, remove_singleton_listen_params=False, + only_directives=None): + """Duplicate the vhost in the configuration files. + + :param :class:`~certbot_nginx._internal.obj.VirtualHost` vhost_template: The vhost + whose information we copy + :param bool remove_singleton_listen_params: If we should remove parameters + from listen directives in the block that can only be used once per address + :param list only_directives: If it exists, only duplicate the named directives. Only + looks at first level of depth; does not expand includes. + + :returns: A vhost object for the newly created vhost + :rtype: :class:`~certbot_nginx._internal.obj.VirtualHost` + """ + # TODO: https://github.com/certbot/certbot/issues/5185 + # put it in the same file as the template, at the same level + new_vhost = copy.deepcopy(vhost_template) + + enclosing_block = self.parsed[vhost_template.filep] + for index in vhost_template.path[:-1]: + enclosing_block = enclosing_block[index] + raw_in_parsed = copy.deepcopy(enclosing_block[vhost_template.path[-1]]) + + if only_directives is not None: + new_directives = nginxparser.UnspacedList([]) + for directive in raw_in_parsed[1]: + if directive and directive[0] in only_directives: + new_directives.append(directive) + raw_in_parsed[1] = new_directives + + self._update_vhost_based_on_new_directives(new_vhost, new_directives) + + enclosing_block.append(raw_in_parsed) + new_vhost.path[-1] = len(enclosing_block) - 1 + if remove_singleton_listen_params: + for addr in new_vhost.addrs: + addr.default = False + addr.ipv6only = False + for directive in enclosing_block[new_vhost.path[-1]][1]: + if directive and directive[0] == 'listen': + # Exclude one-time use parameters which will cause an error if repeated. + # https://nginx.org/en/docs/http/ngx_http_core_module.html#listen + exclude = set(('default_server', 'default', 'setfib', 'fastopen', 'backlog', + 'rcvbuf', 'sndbuf', 'accept_filter', 'deferred', 'bind', + 'ipv6only', 'reuseport', 'so_keepalive')) + + for param in exclude: + # See: github.com/certbot/certbot/pull/6223#pullrequestreview-143019225 + keys = [x.split('=')[0] for x in directive] + if param in keys: + del directive[keys.index(param)] + return new_vhost + + +def _parse_ssl_options(ssl_options): + if ssl_options is not None: + try: + with open(ssl_options) as _file: + return nginxparser.load(_file) + except IOError: + logger.warning("Missing NGINX TLS options file: %s", ssl_options) + except pyparsing.ParseBaseException as err: + logger.debug("Could not parse file: %s due to %s", ssl_options, err) + return [] + +def _do_for_subarray(entry, condition, func, path=None): + """Executes a function for a subarray of a nested array if it matches + the given condition. + + :param list entry: The list to iterate over + :param function condition: Returns true iff func should be executed on item + :param function func: The function to call for each matching item + + """ + if path is None: + path = [] + if isinstance(entry, list): + if condition(entry): + func(entry, path) + else: + for index, item in enumerate(entry): + _do_for_subarray(item, condition, func, path + [index]) + + +def get_best_match(target_name, names): + """Finds the best match for target_name out of names using the Nginx + name-matching rules (exact > longest wildcard starting with * > + longest wildcard ending with * > regex). + + :param str target_name: The name to match + :param set names: The candidate server names + :returns: Tuple of (type of match, the name that matched) + :rtype: tuple + + """ + exact = [] + wildcard_start = [] + wildcard_end = [] + regex = [] + + for name in names: + if _exact_match(target_name, name): + exact.append(name) + elif _wildcard_match(target_name, name, True): + wildcard_start.append(name) + elif _wildcard_match(target_name, name, False): + wildcard_end.append(name) + elif _regex_match(target_name, name): + regex.append(name) + + if exact: + # There can be more than one exact match; e.g. eff.org, .eff.org + match = min(exact, key=len) + return ('exact', match) + if wildcard_start: + # Return the longest wildcard + match = max(wildcard_start, key=len) + return ('wildcard_start', match) + if wildcard_end: + # Return the longest wildcard + match = max(wildcard_end, key=len) + return ('wildcard_end', match) + if regex: + # Just return the first one for now + match = regex[0] + return ('regex', match) + + return (None, None) + + +def _exact_match(target_name, name): + return name in (target_name, '.' + target_name) + + +def _wildcard_match(target_name, name, start): + # Degenerate case + if name == '*': + return True + + parts = target_name.split('.') + match_parts = name.split('.') + + # If the domain ends in a wildcard, do the match procedure in reverse + if not start: + parts.reverse() + match_parts.reverse() + + # The first part must be a wildcard or blank, e.g. '.eff.org' + first = match_parts.pop(0) + if first not in ('*', ''): + return False + + target_name = '.'.join(parts) + name = '.'.join(match_parts) + + # Ex: www.eff.org matches *.eff.org, eff.org does not match *.eff.org + return target_name.endswith('.' + name) + + +def _regex_match(target_name, name): + # Must start with a tilde + if len(name) < 2 or name[0] != '~': + return False + + # After tilde is a perl-compatible regex + try: + regex = re.compile(name[1:]) + return re.match(regex, target_name) + except re.error: # pragma: no cover + # perl-compatible regexes are sometimes not recognized by python + return False + + +def _is_include_directive(entry): + """Checks if an nginx parsed entry is an 'include' directive. + + :param list entry: the parsed entry + :returns: Whether it's an 'include' directive + :rtype: bool + + """ + return (isinstance(entry, list) and + len(entry) == 2 and entry[0] == 'include' and + isinstance(entry[1], six.string_types)) + +def _is_ssl_on_directive(entry): + """Checks if an nginx parsed entry is an 'ssl on' directive. + + :param list entry: the parsed entry + :returns: Whether it's an 'ssl on' directive + :rtype: bool + + """ + return (isinstance(entry, list) and + len(entry) == 2 and entry[0] == 'ssl' and + entry[1] == 'on') + +def _add_directives(directives, insert_at_top, block): + """Adds directives to a config block.""" + for directive in directives: + _add_directive(block, directive, insert_at_top) + if block and '\n' not in block[-1]: # could be " \n " or ["\n"] ! + block.append(nginxparser.UnspacedList('\n')) + +def _update_or_add_directives(directives, insert_at_top, block): + """Adds or replaces directives in a config block.""" + for directive in directives: + _update_or_add_directive(block, directive, insert_at_top) + if block and '\n' not in block[-1]: # could be " \n " or ["\n"] ! + block.append(nginxparser.UnspacedList('\n')) + + +INCLUDE = 'include' +REPEATABLE_DIRECTIVES = set(['server_name', 'listen', INCLUDE, 'rewrite', 'add_header']) +COMMENT = ' managed by Certbot' +COMMENT_BLOCK = [' ', '#', COMMENT] + +def comment_directive(block, location): + """Add a ``#managed by Certbot`` comment to the end of the line at location. + + :param list block: The block containing the directive to be commented + :param int location: The location within ``block`` of the directive to be commented + """ + next_entry = block[location + 1] if location + 1 < len(block) else None + if isinstance(next_entry, list) and next_entry: + if len(next_entry) >= 2 and next_entry[-2] == "#" and COMMENT in next_entry[-1]: + return + if isinstance(next_entry, nginxparser.UnspacedList): + next_entry = next_entry.spaced[0] + else: + next_entry = next_entry[0] + + block.insert(location + 1, COMMENT_BLOCK[:]) + if next_entry is not None and "\n" not in next_entry: + block.insert(location + 2, '\n') + +def _comment_out_directive(block, location, include_location): + """Comment out the line at location, with a note of explanation.""" + comment_message = ' duplicated in {0}'.format(include_location) + # add the end comment + # create a dumpable object out of block[location] (so it includes the ;) + directive = block[location] + new_dir_block = nginxparser.UnspacedList([]) # just a wrapper + new_dir_block.append(directive) + dumped = nginxparser.dumps(new_dir_block) + commented = dumped + ' #' + comment_message # add the comment directly to the one-line string + new_dir = nginxparser.loads(commented) # reload into UnspacedList + + # add the beginning comment + insert_location = 0 + if new_dir[0].spaced[0] != new_dir[0][0]: # if there's whitespace at the beginning + insert_location = 1 + new_dir[0].spaced.insert(insert_location, "# ") # comment out the line + new_dir[0].spaced.append(";") # directly add in the ;, because now dumping won't work properly + dumped = nginxparser.dumps(new_dir) + new_dir = nginxparser.loads(dumped) # reload into an UnspacedList + + block[location] = new_dir[0] # set the now-single-line-comment directive back in place + +def _find_location(block, directive_name, match_func=None): + """Finds the index of the first instance of directive_name in block. + If no line exists, use None.""" + return next((index for index, line in enumerate(block) \ + if line and line[0] == directive_name and (match_func is None or match_func(line))), None) + +def _is_whitespace_or_comment(directive): + """Is this directive either a whitespace or comment directive?""" + return len(directive) == 0 or directive[0] == '#' + +def _add_directive(block, directive, insert_at_top): + if not isinstance(directive, nginxparser.UnspacedList): + directive = nginxparser.UnspacedList(directive) + if _is_whitespace_or_comment(directive): + # whitespace or comment + block.append(directive) + return + + location = _find_location(block, directive[0]) + + # Append or prepend directive. Fail if the name is not a repeatable directive name, + # and there is already a copy of that directive with a different value + # in the config file. + + # handle flat include files + + directive_name = directive[0] + def can_append(loc, dir_name): + """ Can we append this directive to the block? """ + return loc is None or (isinstance(dir_name, six.string_types) + and dir_name in REPEATABLE_DIRECTIVES) + + err_fmt = 'tried to insert directive "{0}" but found conflicting "{1}".' + + # Give a better error message about the specific directive than Nginx's "fail to restart" + if directive_name == INCLUDE: + # in theory, we might want to do this recursively, but in practice, that's really not + # necessary because we know what file we're talking about (and if we don't recurse, we + # just give a worse error message) + included_directives = _parse_ssl_options(directive[1]) + + for included_directive in included_directives: + included_dir_loc = _find_location(block, included_directive[0]) + included_dir_name = included_directive[0] + if (not _is_whitespace_or_comment(included_directive) + and not can_append(included_dir_loc, included_dir_name)): + if block[included_dir_loc] != included_directive: + raise errors.MisconfigurationError(err_fmt.format(included_directive, + block[included_dir_loc])) + _comment_out_directive(block, included_dir_loc, directive[1]) + + if can_append(location, directive_name): + if insert_at_top: + # Add a newline so the comment doesn't comment + # out existing directives + block.insert(0, nginxparser.UnspacedList('\n')) + block.insert(0, directive) + comment_directive(block, 0) + else: + block.append(directive) + comment_directive(block, len(block) - 1) + elif block[location] != directive: + raise errors.MisconfigurationError(err_fmt.format(directive, block[location])) + +def _update_directive(block, directive, location): + block[location] = directive + comment_directive(block, location) + +def _update_or_add_directive(block, directive, insert_at_top): + if not isinstance(directive, nginxparser.UnspacedList): + directive = nginxparser.UnspacedList(directive) + if _is_whitespace_or_comment(directive): + # whitespace or comment + block.append(directive) + return + + location = _find_location(block, directive[0]) + + # we can update directive + if location is not None: + _update_directive(block, directive, location) + return + + _add_directive(block, directive, insert_at_top) + +def _is_certbot_comment(directive): + return '#' in directive and COMMENT in directive + +def _remove_directives(directive_name, match_func, block): + """Removes directives of name directive_name from a config block if match_func matches. + """ + while True: + location = _find_location(block, directive_name, match_func=match_func) + if location is None: + return + # if the directive was made by us, remove the comment following + if location + 1 < len(block) and _is_certbot_comment(block[location + 1]): + del block[location + 1] + del block[location] + +def _apply_global_addr_ssl(addr_to_ssl, parsed_server): + """Apply global sslishness information to the parsed server block + """ + for addr in parsed_server['addrs']: + addr.ssl = addr_to_ssl[addr.normalized_tuple()] + if addr.ssl: + parsed_server['ssl'] = True + +def _parse_server_raw(server): + """Parses a list of server directives. + + :param list server: list of directives in a server block + :rtype: dict + + """ + addrs = set() # type: Set[obj.Addr] + ssl = False # type: bool + names = set() # type: Set[str] + + apply_ssl_to_all_addrs = False + + for directive in server: + if not directive: + continue + if directive[0] == 'listen': + addr = obj.Addr.fromstring(" ".join(directive[1:])) + if addr: + addrs.add(addr) + if addr.ssl: + ssl = True + elif directive[0] == 'server_name': + names.update(x.strip('"\'') for x in directive[1:]) + elif _is_ssl_on_directive(directive): + ssl = True + apply_ssl_to_all_addrs = True + + if apply_ssl_to_all_addrs: + for addr in addrs: + addr.ssl = True + + return { + 'addrs': addrs, + 'ssl': ssl, + 'names': names + } |