diff options
Diffstat (limited to 'certbot-nginx/certbot_nginx/configurator.py')
-rw-r--r-- | certbot-nginx/certbot_nginx/configurator.py | 1162 |
1 files changed, 0 insertions, 1162 deletions
diff --git a/certbot-nginx/certbot_nginx/configurator.py b/certbot-nginx/certbot_nginx/configurator.py deleted file mode 100644 index dd0bf9e8b..000000000 --- a/certbot-nginx/certbot_nginx/configurator.py +++ /dev/null @@ -1,1162 +0,0 @@ -"""Nginx Configuration""" -import logging -import os -import re -import socket -import subprocess -import tempfile -import time - -import OpenSSL -import zope.interface - -from acme import challenges -from acme import crypto_util as acme_crypto_util - -from certbot import constants as core_constants -from certbot import crypto_util -from certbot import errors -from certbot import interfaces -from certbot import util - -from certbot.plugins import common - -from certbot_nginx import constants -from certbot_nginx import display_ops -from certbot_nginx import nginxparser -from certbot_nginx import parser -from certbot_nginx import tls_sni_01 -from certbot_nginx import http_01 -from certbot_nginx import obj # pylint: disable=unused-import -from acme.magic_typing import List, Dict, Set # pylint: disable=unused-import, no-name-in-module - - -NAME_RANK = 0 -START_WILDCARD_RANK = 1 -END_WILDCARD_RANK = 2 -REGEX_RANK = 3 -NO_SSL_MODIFIER = 4 - - -logger = logging.getLogger(__name__) - - -@zope.interface.implementer(interfaces.IAuthenticator, interfaces.IInstaller) -@zope.interface.provider(interfaces.IPluginFactory) -class NginxConfigurator(common.Installer): - # pylint: disable=too-many-instance-attributes,too-many-public-methods - """Nginx configurator. - - .. todo:: Add proper support for comments in the config. Currently, - config files modified by the configurator will lose all their comments. - - :ivar config: Configuration. - :type config: :class:`~certbot.interfaces.IConfig` - - :ivar parser: Handles low level parsing - :type parser: :class:`~certbot_nginx.parser` - - :ivar str save_notes: Human-readable config change notes - - :ivar reverter: saves and reverts checkpoints - :type reverter: :class:`certbot.reverter.Reverter` - - :ivar tup version: version of Nginx - - """ - - description = "Nginx Web Server plugin" - - DEFAULT_LISTEN_PORT = '80' - - # SSL directives that Certbot can add when installing a new certificate. - SSL_DIRECTIVES = ['ssl_certificate', 'ssl_certificate_key', 'ssl_dhparam'] - - @classmethod - def add_parser_arguments(cls, add): - default_server_root = _determine_default_server_root() - add("server-root", default=constants.CLI_DEFAULTS["server_root"], - help="Nginx server root directory. (default: %s)" % default_server_root) - add("ctl", default=constants.CLI_DEFAULTS["ctl"], help="Path to the " - "'nginx' binary, used for 'configtest' and retrieving nginx " - "version number.") - - @property - def nginx_conf(self): - """Nginx config file path.""" - return os.path.join(self.conf("server_root"), "nginx.conf") - - def __init__(self, *args, **kwargs): - """Initialize an Nginx Configurator. - - :param tup version: version of Nginx as a tuple (1, 4, 7) - (used mostly for unittesting) - - """ - version = kwargs.pop("version", None) - super(NginxConfigurator, self).__init__(*args, **kwargs) - - # Verify that all directories and files exist with proper permissions - self._verify_setup() - - # Files to save - self.save_notes = "" - - # For creating new vhosts if no names match - self.new_vhost = None - - # List of vhosts configured per wildcard domain on this run. - # used by deploy_cert() and enhance() - self._wildcard_vhosts = {} # type: Dict[str, List[obj.VirtualHost]] - self._wildcard_redirect_vhosts = {} # type: Dict[str, List[obj.VirtualHost]] - - # Add number of outstanding challenges - self._chall_out = 0 - - # These will be set in the prepare function - self.parser = None - self.version = version - self._enhance_func = {"redirect": self._enable_redirect, - "ensure-http-header": self._set_http_header, - "staple-ocsp": self._enable_ocsp_stapling} - - self.reverter.recovery_routine() - - @property - def mod_ssl_conf(self): - """Full absolute path to SSL configuration file.""" - return os.path.join(self.config.config_dir, constants.MOD_SSL_CONF_DEST) - - @property - def updated_mod_ssl_conf_digest(self): - """Full absolute path to digest of updated SSL configuration file.""" - return os.path.join(self.config.config_dir, constants.UPDATED_MOD_SSL_CONF_DIGEST) - - # This is called in determine_authenticator and determine_installer - def prepare(self): - """Prepare the authenticator/installer. - - :raises .errors.NoInstallationError: If Nginx ctl cannot be found - :raises .errors.MisconfigurationError: If Nginx is misconfigured - """ - # Verify Nginx is installed - if not util.exe_exists(self.conf('ctl')): - raise errors.NoInstallationError( - "Could not find a usable 'nginx' binary. Ensure nginx exists, " - "the binary is executable, and your PATH is set correctly.") - - # Make sure configuration is valid - self.config_test() - - - self.parser = parser.NginxParser(self.conf('server-root')) - - install_ssl_options_conf(self.mod_ssl_conf, self.updated_mod_ssl_conf_digest) - - self.install_ssl_dhparams() - - # Set Version - if self.version is None: - self.version = self.get_version() - - # Prevent two Nginx plugins from modifying a config at once - try: - util.lock_dir_until_exit(self.conf('server-root')) - except (OSError, errors.LockError): - logger.debug('Encountered error:', exc_info=True) - raise errors.PluginError( - 'Unable to lock %s', self.conf('server-root')) - - - # Entry point in main.py for installing cert - def deploy_cert(self, domain, cert_path, key_path, - chain_path=None, fullchain_path=None): - # pylint: disable=unused-argument - """Deploys certificate to specified virtual host. - - .. note:: Aborts if the vhost is missing ssl_certificate or - ssl_certificate_key. - - .. note:: This doesn't save the config files! - - :raises errors.PluginError: When unable to deploy certificate due to - a lack of directives or configuration - - """ - if not fullchain_path: - raise errors.PluginError( - "The nginx plugin currently requires --fullchain-path to " - "install a cert.") - - vhosts = self.choose_vhosts(domain, create_if_no_match=True) - for vhost in vhosts: - self._deploy_cert(vhost, cert_path, key_path, chain_path, fullchain_path) - - def _deploy_cert(self, vhost, cert_path, key_path, chain_path, fullchain_path): - # pylint: disable=unused-argument - """ - Helper function for deploy_cert() that handles the actual deployment - this exists because we might want to do multiple deployments per - domain originally passed for deploy_cert(). This is especially true - with wildcard certificates - """ - cert_directives = [['\n ', 'ssl_certificate', ' ', fullchain_path], - ['\n ', 'ssl_certificate_key', ' ', key_path]] - - self.parser.update_or_add_server_directives(vhost, - cert_directives) - logger.info("Deploying Certificate to VirtualHost %s", vhost.filep) - - self.save_notes += ("Changed vhost at %s with addresses of %s\n" % - (vhost.filep, - ", ".join(str(addr) for addr in vhost.addrs))) - self.save_notes += "\tssl_certificate %s\n" % fullchain_path - self.save_notes += "\tssl_certificate_key %s\n" % key_path - - def _choose_vhosts_wildcard(self, domain, prefer_ssl, no_ssl_filter_port=None): - """Prompts user to choose vhosts to install a wildcard certificate for""" - if prefer_ssl: - vhosts_cache = self._wildcard_vhosts - preference_test = lambda x: x.ssl - else: - vhosts_cache = self._wildcard_redirect_vhosts - preference_test = lambda x: not x.ssl - - # Caching! - if domain in vhosts_cache: - # Vhosts for a wildcard domain were already selected - return vhosts_cache[domain] - - # Get all vhosts whether or not they are covered by the wildcard domain - vhosts = self.parser.get_vhosts() - - # Go through the vhosts, making sure that we cover all the names - # present, but preferring the SSL or non-SSL vhosts - filtered_vhosts = {} - for vhost in vhosts: - # Ensure we're listening non-sslishly on no_ssl_filter_port - if no_ssl_filter_port is not None: - if not self._vhost_listening_on_port_no_ssl(vhost, no_ssl_filter_port): - continue - for name in vhost.names: - if preference_test(vhost): - # Prefer either SSL or non-SSL vhosts - filtered_vhosts[name] = vhost - elif name not in filtered_vhosts: - # Add if not in list previously - filtered_vhosts[name] = vhost - - # Only unique VHost objects - dialog_input = set([vhost for vhost in filtered_vhosts.values()]) - - # Ask the user which of names to enable, expect list of names back - return_vhosts = display_ops.select_vhost_multiple(list(dialog_input)) - - for vhost in return_vhosts: - if domain not in vhosts_cache: - vhosts_cache[domain] = [] - vhosts_cache[domain].append(vhost) - - return return_vhosts - - ####################### - # Vhost parsing methods - ####################### - def _choose_vhost_single(self, target_name): - matches = self._get_ranked_matches(target_name) - vhosts = [x for x in [self._select_best_name_match(matches)] if x is not None] - return vhosts - - def choose_vhosts(self, target_name, create_if_no_match=False): - """Chooses a virtual host based on the given domain name. - - .. note:: This makes the vhost SSL-enabled if it isn't already. Follows - Nginx's server block selection rules preferring blocks that are - already SSL. - - .. todo:: This should maybe return list if no obvious answer - is presented. - - .. todo:: The special name "$hostname" corresponds to the machine's - hostname. Currently we just ignore this. - - :param str target_name: domain name - :param bool create_if_no_match: If we should create a new vhost from default - when there is no match found. If we can't choose a default, raise a - MisconfigurationError. - - :returns: ssl vhosts associated with name - :rtype: list of :class:`~certbot_nginx.obj.VirtualHost` - - """ - if util.is_wildcard_domain(target_name): - # Ask user which VHosts to support. - vhosts = self._choose_vhosts_wildcard(target_name, prefer_ssl=True) - else: - vhosts = self._choose_vhost_single(target_name) - if not vhosts: - if create_if_no_match: - # result will not be [None] because it errors on failure - vhosts = [self._vhost_from_duplicated_default(target_name, True, - str(self.config.tls_sni_01_port))] - else: - # No matches. Raise a misconfiguration error. - raise errors.MisconfigurationError( - ("Cannot find a VirtualHost matching domain %s. " - "In order for Certbot to correctly perform the challenge " - "please add a corresponding server_name directive to your " - "nginx configuration for every domain on your certificate: " - "https://nginx.org/en/docs/http/server_names.html") % (target_name)) - # Note: if we are enhancing with ocsp, vhost should already be ssl. - for vhost in vhosts: - if not vhost.ssl: - self._make_server_ssl(vhost) - - return vhosts - - def ipv6_info(self, port): - """Returns tuple of booleans (ipv6_active, ipv6only_present) - ipv6_active is true if any server block listens ipv6 address in any port - - ipv6only_present is true if ipv6only=on option exists in any server - block ipv6 listen directive for the specified port. - - :param str port: Port to check ipv6only=on directive for - - :returns: Tuple containing information if IPv6 is enabled in the global - configuration, and existence of ipv6only directive for specified port - :rtype: tuple of type (bool, bool) - """ - # port should be a string, but it's easy to mess up, so let's - # make sure it is one - port = str(port) - vhosts = self.parser.get_vhosts() - ipv6_active = False - ipv6only_present = False - for vh in vhosts: - for addr in vh.addrs: - if addr.ipv6: - ipv6_active = True - if addr.ipv6only and addr.get_port() == port: - ipv6only_present = True - return (ipv6_active, ipv6only_present) - - def _vhost_from_duplicated_default(self, domain, allow_port_mismatch, port): - """if allow_port_mismatch is False, only server blocks with matching ports will be - used as a default server block template. - """ - if self.new_vhost is None: - default_vhost = self._get_default_vhost(domain, allow_port_mismatch, port) - self.new_vhost = self.parser.duplicate_vhost(default_vhost, - remove_singleton_listen_params=True) - self.new_vhost.names = set() - - self._add_server_name_to_vhost(self.new_vhost, domain) - return self.new_vhost - - def _add_server_name_to_vhost(self, vhost, domain): - vhost.names.add(domain) - name_block = [['\n ', 'server_name']] - for name in vhost.names: - name_block[0].append(' ') - name_block[0].append(name) - self.parser.update_or_add_server_directives(vhost, name_block) - - def _get_default_vhost(self, domain, allow_port_mismatch, port): - """Helper method for _vhost_from_duplicated_default; see argument documentation there""" - vhost_list = self.parser.get_vhosts() - # if one has default_server set, return that one - all_default_vhosts = [] - port_matching_vhosts = [] - for vhost in vhost_list: - for addr in vhost.addrs: - if addr.default: - all_default_vhosts.append(vhost) - if self._port_matches(port, addr.get_port()): - port_matching_vhosts.append(vhost) - break - - if len(port_matching_vhosts) == 1: - return port_matching_vhosts[0] - elif len(all_default_vhosts) == 1 and allow_port_mismatch: - return all_default_vhosts[0] - - # TODO: present a list of vhosts for user to choose from - - raise errors.MisconfigurationError("Could not automatically find a matching server" - " block for %s. Set the `server_name` directive to use the Nginx installer." % domain) - - def _get_ranked_matches(self, target_name): - """Returns a ranked list of vhosts that match target_name. - The ranking gives preference to SSL vhosts. - - :param str target_name: The name to match - :returns: list of dicts containing the vhost, the matching name, and - the numerical rank - :rtype: list - - """ - vhost_list = self.parser.get_vhosts() - return self._rank_matches_by_name_and_ssl(vhost_list, target_name) - - def _select_best_name_match(self, matches): - """Returns the best name match of a ranked list of vhosts. - - :param list matches: list of dicts containing the vhost, the matching name, - and the numerical rank - :returns: the most matching vhost - :rtype: :class:`~certbot_nginx.obj.VirtualHost` - - """ - if not matches: - return None - elif matches[0]['rank'] in [START_WILDCARD_RANK, END_WILDCARD_RANK, - START_WILDCARD_RANK + NO_SSL_MODIFIER, END_WILDCARD_RANK + NO_SSL_MODIFIER]: - # Wildcard match - need to find the longest one - rank = matches[0]['rank'] - wildcards = [x for x in matches if x['rank'] == rank] - return max(wildcards, key=lambda x: len(x['name']))['vhost'] - else: - # Exact or regex match - return matches[0]['vhost'] - - def _rank_matches_by_name(self, vhost_list, target_name): - """Returns a ranked list of vhosts from vhost_list that match target_name. - This method should always be followed by a call to _select_best_name_match. - - :param list vhost_list: list of vhosts to filter and rank - :param str target_name: The name to match - :returns: list of dicts containing the vhost, the matching name, and - the numerical rank - :rtype: list - - """ - # Nginx chooses a matching server name for a request with precedence: - # 1. exact name match - # 2. longest wildcard name starting with * - # 3. longest wildcard name ending with * - # 4. first matching regex in order of appearance in the file - matches = [] - for vhost in vhost_list: - name_type, name = parser.get_best_match(target_name, vhost.names) - if name_type == 'exact': - matches.append({'vhost': vhost, - 'name': name, - 'rank': NAME_RANK}) - elif name_type == 'wildcard_start': - matches.append({'vhost': vhost, - 'name': name, - 'rank': START_WILDCARD_RANK}) - elif name_type == 'wildcard_end': - matches.append({'vhost': vhost, - 'name': name, - 'rank': END_WILDCARD_RANK}) - elif name_type == 'regex': - matches.append({'vhost': vhost, - 'name': name, - 'rank': REGEX_RANK}) - return sorted(matches, key=lambda x: x['rank']) - - def _rank_matches_by_name_and_ssl(self, vhost_list, target_name): - """Returns a ranked list of vhosts from vhost_list that match target_name. - The ranking gives preference to SSLishness before name match level. - - :param list vhost_list: list of vhosts to filter and rank - :param str target_name: The name to match - :returns: list of dicts containing the vhost, the matching name, and - the numerical rank - :rtype: list - - """ - matches = self._rank_matches_by_name(vhost_list, target_name) - for match in matches: - if not match['vhost'].ssl: - match['rank'] += NO_SSL_MODIFIER - return sorted(matches, key=lambda x: x['rank']) - - def choose_redirect_vhosts(self, target_name, port, create_if_no_match=False): - """Chooses a single virtual host for redirect enhancement. - - Chooses the vhost most closely matching target_name that is - listening to port without using ssl. - - .. todo:: This should maybe return list if no obvious answer - is presented. - - .. todo:: The special name "$hostname" corresponds to the machine's - hostname. Currently we just ignore this. - - :param str target_name: domain name - :param str port: port number - :param bool create_if_no_match: If we should create a new vhost from default - when there is no match found. If we can't choose a default, raise a - MisconfigurationError. - - :returns: vhosts associated with name - :rtype: list of :class:`~certbot_nginx.obj.VirtualHost` - - """ - if util.is_wildcard_domain(target_name): - # Ask user which VHosts to enhance. - vhosts = self._choose_vhosts_wildcard(target_name, prefer_ssl=False, - no_ssl_filter_port=port) - else: - matches = self._get_redirect_ranked_matches(target_name, port) - vhosts = [x for x in [self._select_best_name_match(matches)]if x is not None] - if not vhosts and create_if_no_match: - vhosts = [self._vhost_from_duplicated_default(target_name, False, port)] - return vhosts - - def _port_matches(self, test_port, matching_port): - # test_port is a number, matching is a number or "" or None - if matching_port == "" or matching_port is None: - # if no port is specified, Nginx defaults to listening on port 80. - return test_port == self.DEFAULT_LISTEN_PORT - else: - return test_port == matching_port - - def _vhost_listening_on_port_no_ssl(self, vhost, port): - found_matching_port = False - if len(vhost.addrs) == 0: - # if there are no listen directives at all, Nginx defaults to - # listening on port 80. - found_matching_port = (port == self.DEFAULT_LISTEN_PORT) - else: - for addr in vhost.addrs: - if self._port_matches(port, addr.get_port()) and addr.ssl == False: - found_matching_port = True - - if found_matching_port: - # make sure we don't have an 'ssl on' directive - return not self.parser.has_ssl_on_directive(vhost) - else: - return False - - def _get_redirect_ranked_matches(self, target_name, port): - """Gets a ranked list of plaintextish port-listening vhosts matching target_name - - Filter all hosts for those listening on port without using ssl. - Rank by how well these match target_name. - - :param str target_name: The name to match - :param str port: port number as a string - :returns: list of dicts containing the vhost, the matching name, and - the numerical rank - :rtype: list - - """ - all_vhosts = self.parser.get_vhosts() - - def _vhost_matches(vhost, port): - return self._vhost_listening_on_port_no_ssl(vhost, port) - - matching_vhosts = [vhost for vhost in all_vhosts if _vhost_matches(vhost, port)] - - return self._rank_matches_by_name(matching_vhosts, target_name) - - def get_all_names(self): - """Returns all names found in the Nginx Configuration. - - :returns: All ServerNames, ServerAliases, and reverse DNS entries for - virtual host addresses - :rtype: set - - """ - all_names = set() # type: Set[str] - - for vhost in self.parser.get_vhosts(): - all_names.update(vhost.names) - - for addr in vhost.addrs: - host = addr.get_addr() - if common.hostname_regex.match(host): - # If it's a hostname, add it to the names. - all_names.add(host) - elif not common.private_ips_regex.match(host): - # If it isn't a private IP, do a reverse DNS lookup - try: - if addr.ipv6: - host = addr.get_ipv6_exploded() - socket.inet_pton(socket.AF_INET6, host) - else: - socket.inet_pton(socket.AF_INET, host) - all_names.add(socket.gethostbyaddr(host)[0]) - except (socket.error, socket.herror, socket.timeout): - continue - - return util.get_filtered_names(all_names) - - def _get_snakeoil_paths(self): - """Generate invalid certs that let us create ssl directives for Nginx""" - # TODO: generate only once - tmp_dir = os.path.join(self.config.work_dir, "snakeoil") - le_key = crypto_util.init_save_key( - key_size=1024, key_dir=tmp_dir, keyname="key.pem") - key = OpenSSL.crypto.load_privatekey( - OpenSSL.crypto.FILETYPE_PEM, le_key.pem) - cert = acme_crypto_util.gen_ss_cert(key, domains=[socket.gethostname()]) - cert_pem = OpenSSL.crypto.dump_certificate( - OpenSSL.crypto.FILETYPE_PEM, cert) - cert_file, cert_path = util.unique_file( - os.path.join(tmp_dir, "cert.pem"), mode="wb") - with cert_file: - cert_file.write(cert_pem) - return cert_path, le_key.file - - def _make_server_ssl(self, vhost): - """Make a server SSL. - - Make a server SSL by adding new listen and SSL directives. - - :param vhost: The vhost to add SSL to. - :type vhost: :class:`~certbot_nginx.obj.VirtualHost` - - """ - ipv6info = self.ipv6_info(self.config.tls_sni_01_port) - ipv6_block = [''] - ipv4_block = [''] - - # If the vhost was implicitly listening on the default Nginx port, - # have it continue to do so. - if len(vhost.addrs) == 0: - listen_block = [['\n ', 'listen', ' ', self.DEFAULT_LISTEN_PORT]] - self.parser.add_server_directives(vhost, listen_block) - - if vhost.ipv6_enabled(): - ipv6_block = ['\n ', - 'listen', - ' ', - '[::]:{0}'.format(self.config.tls_sni_01_port), - ' ', - 'ssl'] - if not ipv6info[1]: - # ipv6only=on is absent in global config - ipv6_block.append(' ') - ipv6_block.append('ipv6only=on') - - if vhost.ipv4_enabled(): - ipv4_block = ['\n ', - 'listen', - ' ', - '{0}'.format(self.config.tls_sni_01_port), - ' ', - 'ssl'] - - snakeoil_cert, snakeoil_key = self._get_snakeoil_paths() - - ssl_block = ([ - ipv6_block, - ipv4_block, - ['\n ', 'ssl_certificate', ' ', snakeoil_cert], - ['\n ', 'ssl_certificate_key', ' ', snakeoil_key], - ['\n ', 'include', ' ', self.mod_ssl_conf], - ['\n ', 'ssl_dhparam', ' ', self.ssl_dhparams], - ]) - - self.parser.add_server_directives( - vhost, ssl_block) - - ################################## - # enhancement methods (IInstaller) - ################################## - def supported_enhancements(self): # pylint: disable=no-self-use - """Returns currently supported enhancements.""" - return ['redirect', 'ensure-http-header', 'staple-ocsp'] - - def enhance(self, domain, enhancement, options=None): - """Enhance configuration. - - :param str domain: domain to enhance - :param str enhancement: enhancement type defined in - :const:`~certbot.constants.ENHANCEMENTS` - :param options: options for the enhancement - See :const:`~certbot.constants.ENHANCEMENTS` - documentation for appropriate parameter. - - """ - try: - return self._enhance_func[enhancement](domain, options) - except (KeyError, ValueError): - raise errors.PluginError( - "Unsupported enhancement: {0}".format(enhancement)) - except errors.PluginError: - logger.warning("Failed %s for %s", enhancement, domain) - raise - - def _has_certbot_redirect(self, vhost, domain): - test_redirect_block = _test_block_from_block(_redirect_block_for_domain(domain)) - return vhost.contains_list(test_redirect_block) - - def _set_http_header(self, domain, header_substring): - """Enables header identified by header_substring on domain. - - If the vhost is listening plaintextishly, separates out the relevant - directives into a new server block, and only add header directive to - HTTPS block. - - :param str domain: the domain to enable header for. - :param str header_substring: String to uniquely identify a header. - e.g. Strict-Transport-Security, Upgrade-Insecure-Requests - :returns: Success - :raises .errors.PluginError: If no viable HTTPS host can be created or - set with header header_substring. - """ - vhosts = self.choose_vhosts(domain) - if not vhosts: - raise errors.PluginError( - "Unable to find corresponding HTTPS host for enhancement.") - for vhost in vhosts: - if vhost.has_header(header_substring): - raise errors.PluginEnhancementAlreadyPresent( - "Existing %s header" % (header_substring)) - - # if there is no separate SSL block, break the block into two and - # choose the SSL block. - if vhost.ssl and any([not addr.ssl for addr in vhost.addrs]): - _, vhost = self._split_block(vhost) - - header_directives = [ - ['\n ', 'add_header', ' ', header_substring, ' '] + - constants.HEADER_ARGS[header_substring], - ['\n']] - self.parser.add_server_directives(vhost, header_directives) - - def _add_redirect_block(self, vhost, domain): - """Add redirect directive to vhost - """ - redirect_block = _redirect_block_for_domain(domain) - - self.parser.add_server_directives( - vhost, redirect_block, insert_at_top=True) - - def _split_block(self, vhost, only_directives=None): - """Splits this "virtual host" (i.e. this nginx server block) into - separate HTTP and HTTPS blocks. - - :param vhost: The server block to break up into two. - :param list only_directives: If this exists, only duplicate these directives - when splitting the block. - :type vhost: :class:`~certbot_nginx.obj.VirtualHost` - :returns: tuple (http_vhost, https_vhost) - :rtype: tuple of type :class:`~certbot_nginx.obj.VirtualHost` - """ - http_vhost = self.parser.duplicate_vhost(vhost, only_directives=only_directives) - - def _ssl_match_func(directive): - return 'ssl' in directive - - def _ssl_config_match_func(directive): - return self.mod_ssl_conf in directive - - def _no_ssl_match_func(directive): - return 'ssl' not in directive - - # remove all ssl addresses and related directives from the new block - for directive in self.SSL_DIRECTIVES: - self.parser.remove_server_directives(http_vhost, directive) - self.parser.remove_server_directives(http_vhost, 'listen', match_func=_ssl_match_func) - self.parser.remove_server_directives(http_vhost, 'include', - match_func=_ssl_config_match_func) - - # remove all non-ssl addresses from the existing block - self.parser.remove_server_directives(vhost, 'listen', match_func=_no_ssl_match_func) - return http_vhost, vhost - - def _enable_redirect(self, domain, unused_options): - """Redirect all equivalent HTTP traffic to ssl_vhost. - - If the vhost is listening plaintextishly, separate out the - relevant directives into a new server block and add a rewrite directive. - - .. note:: This function saves the configuration - - :param str domain: domain to enable redirect for - :param unused_options: Not currently used - :type unused_options: Not Available - """ - - port = self.DEFAULT_LISTEN_PORT - # If there are blocks listening plaintextishly on self.DEFAULT_LISTEN_PORT, - # choose the most name-matching one. - - vhosts = self.choose_redirect_vhosts(domain, port) - - if not vhosts: - logger.info("No matching insecure server blocks listening on port %s found.", - self.DEFAULT_LISTEN_PORT) - return - - for vhost in vhosts: - self._enable_redirect_single(domain, vhost) - - def _enable_redirect_single(self, domain, vhost): - """Redirect all equivalent HTTP traffic to ssl_vhost. - - If the vhost is listening plaintextishly, separate out the - relevant directives into a new server block and add a rewrite directive. - - .. note:: This function saves the configuration - - :param str domain: domain to enable redirect for - :param `~obj.Vhost` vhost: vhost to enable redirect for - """ - - http_vhost = None - if vhost.ssl: - http_vhost, _ = self._split_block(vhost, ['listen', 'server_name']) - - # Add this at the bottom to get the right order of directives - return_404_directive = [['\n ', 'return', ' ', '404']] - self.parser.add_server_directives(http_vhost, return_404_directive) - - vhost = http_vhost - - if self._has_certbot_redirect(vhost, domain): - logger.info("Traffic on port %s already redirecting to ssl in %s", - self.DEFAULT_LISTEN_PORT, vhost.filep) - else: - # Redirect plaintextish host to https - self._add_redirect_block(vhost, domain) - logger.info("Redirecting all traffic on port %s to ssl in %s", - self.DEFAULT_LISTEN_PORT, vhost.filep) - - def _enable_ocsp_stapling(self, domain, chain_path): - """Include OCSP response in TLS handshake - - :param str domain: domain to enable OCSP response for - :param chain_path: chain file path - :type chain_path: `str` or `None` - - """ - vhosts = self.choose_vhosts(domain) - for vhost in vhosts: - self._enable_ocsp_stapling_single(vhost, chain_path) - - def _enable_ocsp_stapling_single(self, vhost, chain_path): - """Include OCSP response in TLS handshake - - :param str vhost: vhost to enable OCSP response for - :param chain_path: chain file path - :type chain_path: `str` or `None` - - """ - if self.version < (1, 3, 7): - raise errors.PluginError("Version 1.3.7 or greater of nginx " - "is needed to enable OCSP stapling") - - if chain_path is None: - raise errors.PluginError( - "--chain-path is required to enable " - "Online Certificate Status Protocol (OCSP) stapling " - "on nginx >= 1.3.7.") - - stapling_directives = [ - ['\n ', 'ssl_trusted_certificate', ' ', chain_path], - ['\n ', 'ssl_stapling', ' ', 'on'], - ['\n ', 'ssl_stapling_verify', ' ', 'on'], ['\n']] - - try: - self.parser.add_server_directives(vhost, - stapling_directives) - except errors.MisconfigurationError as error: - logger.debug(str(error)) - raise errors.PluginError("An error occurred while enabling OCSP " - "stapling for {0}.".format(vhost.names)) - - self.save_notes += ("OCSP Stapling was enabled " - "on SSL Vhost: {0}.\n".format(vhost.filep)) - self.save_notes += "\tssl_trusted_certificate {0}\n".format(chain_path) - self.save_notes += "\tssl_stapling on\n" - self.save_notes += "\tssl_stapling_verify on\n" - - ###################################### - # Nginx server management (IInstaller) - ###################################### - def restart(self): - """Restarts nginx server. - - :raises .errors.MisconfigurationError: If either the reload fails. - - """ - nginx_restart(self.conf('ctl'), self.nginx_conf) - - def config_test(self): # pylint: disable=no-self-use - """Check the configuration of Nginx for errors. - - :raises .errors.MisconfigurationError: If config_test fails - - """ - try: - util.run_script([self.conf('ctl'), "-c", self.nginx_conf, "-t"]) - except errors.SubprocessError as err: - raise errors.MisconfigurationError(str(err)) - - def _verify_setup(self): - """Verify the setup to ensure safe operating environment. - - Make sure that files/directories are setup with appropriate permissions - Aim for defensive coding... make sure all input files - have permissions of root. - - """ - uid = os.geteuid() - util.make_or_verify_dir( - self.config.work_dir, core_constants.CONFIG_DIRS_MODE, uid) - util.make_or_verify_dir( - self.config.backup_dir, core_constants.CONFIG_DIRS_MODE, uid) - util.make_or_verify_dir( - self.config.config_dir, core_constants.CONFIG_DIRS_MODE, uid) - - def get_version(self): - """Return version of Nginx Server. - - Version is returned as tuple. (ie. 2.4.7 = (2, 4, 7)) - - :returns: version - :rtype: tuple - - :raises .PluginError: - Unable to find Nginx version or version is unsupported - - """ - try: - proc = subprocess.Popen( - [self.conf('ctl'), "-c", self.nginx_conf, "-V"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True) - text = proc.communicate()[1] # nginx prints output to stderr - except (OSError, ValueError) as error: - logger.debug(str(error), exc_info=True) - raise errors.PluginError( - "Unable to run %s -V" % self.conf('ctl')) - - version_regex = re.compile(r"nginx version: ([^/]+)/([0-9\.]*)", re.IGNORECASE) - version_matches = version_regex.findall(text) - - sni_regex = re.compile(r"TLS SNI support enabled", re.IGNORECASE) - sni_matches = sni_regex.findall(text) - - ssl_regex = re.compile(r" --with-http_ssl_module") - ssl_matches = ssl_regex.findall(text) - - if not version_matches: - raise errors.PluginError("Unable to find Nginx version") - if not ssl_matches: - raise errors.PluginError( - "Nginx build is missing SSL module (--with-http_ssl_module).") - if not sni_matches: - raise errors.PluginError("Nginx build doesn't support SNI") - - product_name, product_version = version_matches[0] - if product_name != 'nginx': - logger.warning("NGINX derivative %s is not officially supported by" - " certbot", product_name) - - nginx_version = tuple([int(i) for i in product_version.split(".")]) - - # nginx < 0.8.48 uses machine hostname as default server_name instead of - # the empty string - if nginx_version < (0, 8, 48): - raise errors.NotSupportedError("Nginx version must be 0.8.48+") - - return nginx_version - - def more_info(self): - """Human-readable string to help understand the module""" - return ( - "Configures Nginx to authenticate and install HTTPS.{0}" - "Server root: {root}{0}" - "Version: {version}".format( - os.linesep, root=self.parser.config_root, - version=".".join(str(i) for i in self.version)) - ) - - ################################################### - # Wrapper functions for Reverter class (IInstaller) - ################################################### - def save(self, title=None, temporary=False): - """Saves all changes to the configuration files. - - :param str title: The title of the save. If a title is given, the - configuration will be saved as a new checkpoint and put in a - timestamped directory. - - :param bool temporary: Indicates whether the changes made will - be quickly reversed in the future (ie. challenges) - - :raises .errors.PluginError: If there was an error in - an attempt to save the configuration, or an error creating a - checkpoint - - """ - save_files = set(self.parser.parsed.keys()) - self.add_to_checkpoint(save_files, self.save_notes, temporary) - self.save_notes = "" - - # Change 'ext' to something else to not override existing conf files - self.parser.filedump(ext='') - if title and not temporary: - self.finalize_checkpoint(title) - - def recovery_routine(self): - """Revert all previously modified files. - - Reverts all modified files that have not been saved as a checkpoint - - :raises .errors.PluginError: If unable to recover the configuration - - """ - super(NginxConfigurator, self).recovery_routine() - self.new_vhost = None - self.parser.load() - - def revert_challenge_config(self): - """Used to cleanup challenge configurations. - - :raises .errors.PluginError: If unable to revert the challenge config. - - """ - self.revert_temporary_config() - self.new_vhost = None - self.parser.load() - - def rollback_checkpoints(self, rollback=1): - """Rollback saved checkpoints. - - :param int rollback: Number of checkpoints to revert - - :raises .errors.PluginError: If there is a problem with the input or - the function is unable to correctly revert the configuration - - """ - super(NginxConfigurator, self).rollback_checkpoints(rollback) - self.new_vhost = None - self.parser.load() - - ########################################################################### - # Challenges Section for IAuthenticator - ########################################################################### - def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use - """Return list of challenge preferences.""" - return [challenges.HTTP01, challenges.TLSSNI01] - - # Entry point in main.py for performing challenges - def perform(self, achalls): - """Perform the configuration related challenge. - - This function currently assumes all challenges will be fulfilled. - If this turns out not to be the case in the future. Cleanup and - outstanding challenges will have to be designed better. - - """ - self._chall_out += len(achalls) - responses = [None] * len(achalls) - sni_doer = tls_sni_01.NginxTlsSni01(self) - http_doer = http_01.NginxHttp01(self) - - for i, achall in enumerate(achalls): - # Currently also have chall_doer hold associated index of the - # challenge. This helps to put all of the responses back together - # when they are all complete. - if isinstance(achall.chall, challenges.HTTP01): - http_doer.add_chall(achall, i) - else: # tls-sni-01 - sni_doer.add_chall(achall, i) - - sni_response = sni_doer.perform() - http_response = http_doer.perform() - # Must restart in order to activate the challenges. - # Handled here because we may be able to load up other challenge types - self.restart() - - # Go through all of the challenges and assign them to the proper place - # in the responses return value. All responses must be in the same order - # as the original challenges. - for chall_response, chall_doer in ((sni_response, sni_doer), (http_response, http_doer)): - for i, resp in enumerate(chall_response): - responses[chall_doer.indices[i]] = resp - - return responses - - # called after challenges are performed - def cleanup(self, achalls): - """Revert all challenges.""" - self._chall_out -= len(achalls) - - # If all of the challenges have been finished, clean up everything - if self._chall_out <= 0: - self.revert_challenge_config() - self.restart() - - -def _test_block_from_block(block): - test_block = nginxparser.UnspacedList(block) - parser.comment_directive(test_block, 0) - return test_block[:-1] - - -def _redirect_block_for_domain(domain): - updated_domain = domain - match_symbol = '=' - if util.is_wildcard_domain(domain): - match_symbol = '~' - updated_domain = updated_domain.replace('.', r'\.') - updated_domain = updated_domain.replace('*', '[^.]+') - updated_domain = '^' + updated_domain + '$' - redirect_block = [[ - ['\n ', 'if', ' ', '($host', ' ', match_symbol, ' ', '%s)' % updated_domain, ' '], - [['\n ', 'return', ' ', '301', ' ', 'https://$host$request_uri'], - '\n ']], - ['\n']] - return redirect_block - - -def nginx_restart(nginx_ctl, nginx_conf): - """Restarts the Nginx Server. - - .. todo:: Nginx restart is fatal if the configuration references - non-existent SSL cert/key files. Remove references to /etc/letsencrypt - before restart. - - :param str nginx_ctl: Path to the Nginx binary. - - """ - try: - proc = subprocess.Popen([nginx_ctl, "-c", nginx_conf, "-s", "reload"]) - proc.communicate() - - if proc.returncode != 0: - # Maybe Nginx isn't running - # Write to temporary files instead of piping because of communication issues on Arch - # https://github.com/certbot/certbot/issues/4324 - with tempfile.TemporaryFile() as out: - with tempfile.TemporaryFile() as err: - nginx_proc = subprocess.Popen([nginx_ctl, "-c", nginx_conf], - stdout=out, stderr=err) - nginx_proc.communicate() - if nginx_proc.returncode != 0: - # Enter recovery routine... - raise errors.MisconfigurationError( - "nginx restart failed:\n%s\n%s" % (out.read(), err.read())) - - except (OSError, ValueError): - raise errors.MisconfigurationError("nginx restart failed") - # Nginx can take a moment to recognize a newly added TLS SNI servername, so sleep - # for a second. TODO: Check for expected servername and loop until it - # appears or return an error if looping too long. - time.sleep(1) - - -def install_ssl_options_conf(options_ssl, options_ssl_digest): - """Copy Certbot's SSL options file into the system's config dir if required.""" - return common.install_version_controlled_file(options_ssl, options_ssl_digest, - constants.MOD_SSL_CONF_SRC, constants.ALL_SSL_OPTIONS_HASHES) - -def _determine_default_server_root(): - if os.environ.get("CERTBOT_DOCS") == "1": - default_server_root = "%s or %s" % (constants.LINUX_SERVER_ROOT, - constants.FREEBSD_DARWIN_SERVER_ROOT) - else: - default_server_root = constants.CLI_DEFAULTS["server_root"] - return default_server_root |