Welcome to mirror list, hosted at ThFree Co, Russian Federation.

util.py « certbot_postfix « certbot-postfix - github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: f06989903861853cd60952833f549f45ea13e025 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
"""Utility functions for use in the Postfix installer."""
import logging
import re
import subprocess

from certbot import errors
from certbot import util as certbot_util
from certbot.plugins import util as plugins_util
from certbot_postfix import constants

logger = logging.getLogger(__name__)

COMMAND = "postfix"

class PostfixUtilBase(object):
    """A base class for wrapping Postfix command line utilities."""

    def __init__(self, executable, config_dir=None):
        """Sets up the Postfix utility class.

        :param str executable: name or path of the Postfix utility
        :param str config_dir: path to an alternative Postfix config

        :raises .NoInstallationError: when the executable isn't found

        """
        self.executable = executable
        verify_exe_exists(executable)
        self._set_base_command(config_dir)
        self.config_dir = None

    def _set_base_command(self, config_dir):
        self._base_command = [self.executable]
        if config_dir is not None:
            self._base_command.extend(('-c', config_dir,))

    def _call(self, extra_args=None):
        """Runs the Postfix utility and returns the result.

        :param list extra_args: additional arguments for the command

        :returns: data written to stdout and stderr
        :rtype: `tuple` of `str`

        :raises subprocess.CalledProcessError: if the command fails

        """
        args = list(self._base_command)
        if extra_args is not None:
            args.extend(extra_args)
        return check_all_output(args)

    def _get_output(self, extra_args=None):
        """Runs the Postfix utility and returns only stdout output.

        This function relies on self._call for running the utility.

        :param list extra_args: additional arguments for the command

        :returns: data written to stdout
        :rtype: str

        :raises subprocess.CalledProcessError: if the command fails

        """
        return self._call(extra_args)[0]

class PostfixUtil(PostfixUtilBase):
    """Wrapper around Postfix CLI tool.
    """

    def __init__(self, config_dir=None):
        super(PostfixUtil, self).__init__(COMMAND, config_dir)

    def test(self):
        """Make sure the configuration is valid.

        :raises .MisconfigurationError: if the config is invalid
        """
        try:
            self._call(["check"])
        except subprocess.CalledProcessError as e:
            logger.debug("Could not check postfix configuration:\n%s", e)
            raise errors.MisconfigurationError(
                "Postfix failed internal configuration check.")

    def restart(self):
        """Restart or refresh the server content.

        :raises .PluginError: when server cannot be restarted

        """
        logger.info("Reloading Postfix configuration...")
        if self._is_running():
            self._reload()
        else:
            self._start()


    def _is_running(self):
        """Is Postfix currently running?

        Uses the 'postfix status' command to determine if Postfix is
        currently running using the specified configuration files.

        :returns: True if Postfix is running, otherwise, False
        :rtype: bool

        """
        try:
            self._call(["status"])
        except subprocess.CalledProcessError:
            return False
        return True

    def _start(self):
        """Instructions Postfix to start running.

        :raises .PluginError: when Postfix cannot start

        """
        try:
            self._call(["start"])
        except subprocess.CalledProcessError:
            raise errors.PluginError("Postfix failed to start")

    def _reload(self):
        """Instructs Postfix to reload its configuration.

        If Postfix isn't currently running, this method will fail.

        :raises .PluginError: when Postfix cannot reload
        """
        try:
            self._call(["reload"])
        except subprocess.CalledProcessError:
            raise errors.PluginError(
                "Postfix failed to reload its configuration")

def check_all_output(*args, **kwargs):
    """A version of subprocess.check_output that also captures stderr.

    This is the same as :func:`subprocess.check_output` except output
    written to stderr is also captured and returned to the caller. The
    return value is a tuple of two strings (rather than byte strings).
    To accomplish this, the caller cannot set the stdout, stderr, or
    universal_newlines parameters to :class:`subprocess.Popen`.

    Additionally, if the command exits with a nonzero status, output is
    not included in the raised :class:`subprocess.CalledProcessError`
    because Python 2.6 does not support this. Instead, the failure
    including the output is logged.

    :param tuple args: positional arguments for Popen
    :param dict kwargs: keyword arguments for Popen

    :returns: data written to stdout and stderr
    :rtype: `tuple` of `str`

    :raises ValueError: if arguments are invalid
    :raises subprocess.CalledProcessError: if the command fails

    """
    for keyword in ('stdout', 'stderr', 'universal_newlines',):
        if keyword in kwargs:
            raise ValueError(
                keyword + ' argument not allowed, it will be overridden.')

    kwargs['stdout'] = subprocess.PIPE
    kwargs['stderr'] = subprocess.PIPE
    kwargs['universal_newlines'] = True

    process = subprocess.Popen(*args, **kwargs)
    output, err = process.communicate()
    retcode = process.poll()
    if retcode:
        cmd = kwargs.get('args')
        if cmd is None:
            cmd = args[0]
        logger.debug(
            "'%s' exited with %d. stdout output was:\n%s\nstderr output was:\n%s",
            cmd, retcode, output, err)
        raise subprocess.CalledProcessError(retcode, cmd)
    return (output, err)


def verify_exe_exists(exe, message=None):
    """Ensures an executable with the given name is available.

    If an executable isn't found for the given path or name, extra
    directories are added to the user's PATH to help find system
    utilities that may not be available in the default cron PATH.

    :param str exe: executable path or name
    :param str message: Error message to print.

    :raises .NoInstallationError: when the executable isn't found

    """
    if message is None:
        message = "Cannot find executable '{0}'.".format(exe)
    if not (certbot_util.exe_exists(exe) or plugins_util.path_surgery(exe)):
        raise errors.NoInstallationError(message)

def report_master_overrides(name, overrides, acceptable_overrides=None):
    """If the value for a parameter `name` is overridden by other services,
    report a warning to notify the user. If `parameter` is a TLS version parameter
    (i.e., `parameter` contains 'tls_protocols' or 'tls_mandatory_protocols'), then
    `acceptable_overrides` isn't used each value in overrides is inspected for secure TLS
    versions.

    :param str name: The name of the parameter that is being overridden.
    :param list overrides: The values that other services are setting for `name`.
        Each override is a tuple: (service name, value)
    :param tuple acceptable_overrides: Override values that are acceptable. For instance, if
        another service is overriding our parameter with a more secure option, we don't have
        to warn. If this is set to None, errors are raised for *any* overrides of `name`!
    """
    error_string = ""
    for override in overrides:
        service, value = override
        # If this override is acceptable:
        if acceptable_overrides is not None and \
            is_acceptable_value(name, value, acceptable_overrides):
            continue
        error_string += "  {0}: {1}\n".format(service, value)
    if error_string:
        raise errors.PluginError("{0} is overridden with less secure options by the "
             "following services in master.cf:\n".format(name) + error_string)


def is_acceptable_value(parameter, value, acceptable=None):
    """ Returns whether the `value` for this `parameter` is acceptable,
    given a tuple of `acceptable` values. If `parameter` is a TLS version parameter
    (i.e., `parameter` contains 'tls_protocols' or 'tls_mandatory_protocols'), then
    `acceptable` isn't used and `value` is inspected for secure TLS versions.

    :param str parameter:       The name of the parameter being set.
    :param str value:           Proposed new value for parameter.
    :param tuple acceptable:    List of acceptable values for parameter.
    """
    # Check if param value is a comma-separated list of protocols.
    # Otherwise, just check whether the value is in the acceptable list.
    if 'tls_protocols' in parameter or 'tls_mandatory_protocols' in parameter:
        return _has_acceptable_tls_versions(value)
    if acceptable is not None:
        return value in acceptable
    return False


def _has_acceptable_tls_versions(parameter_string):
    """
    Checks to see if the list of TLS protocols is acceptable.
    This requires that TLSv1.2 is supported, and neither SSLv2 nor SSLv3 are supported.

    Should be a string of protocol names delimited by commas, spaces, or colons.

    Postfix's documents suggest listing protocols to exclude, like "!SSLv2, !SSLv3".
    Listing the protocols to include, like "TLSv1, TLSv1.1, TLSv1.2" is okay as well,
    though not recommended

    When these two modes are interspersed, the presence of a single non-negated protocol name
    (i.e. "TLSv1" rather than "!TLSv1") automatically excludes all other unnamed protocols.

    In addition, the presence of both a protocol name inclusion and exclusion isn't explicitly
    documented, so this method should return False if it encounters contradicting statements
    about TLSv1.2, SSLv2, or SSLv3. (for instance, "SSLv3, !SSLv3").
    """
    if not parameter_string:
        return False
    bad_versions = list(constants.TLS_VERSIONS)
    for version in constants.ACCEPTABLE_TLS_VERSIONS:
        del bad_versions[bad_versions.index(version)]
    supported_version_list = re.split("[, :]+", parameter_string)
    # The presence of any non-"!" protocol listing excludes the others by default.
    inclusion_list = False
    for version in supported_version_list:
        if not version:
            continue
        if version in bad_versions: # short-circuit if we recognize any bad version
            return False
        if version[0] != "!":
            inclusion_list = True
    if inclusion_list: # For any inclusion list, we still require TLS 1.2.
        if "TLSv1.2" not in supported_version_list or "!TLSv1.2" in supported_version_list:
            return False
    else:
        for bad_version in bad_versions:
            if "!" + bad_version not in supported_version_list:
                return False
    return True