Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/certbot/certbot.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdrien Ferrand <adferrand@users.noreply.github.com>2020-03-12 23:53:19 +0300
committerGitHub <noreply@github.com>2020-03-12 23:53:19 +0300
commit07abe7a8d68961042ee301039dd4da87306cb1a0 (patch)
tree4270fd493857fd7ae53eda4e32e217a7cad28a08
parent2fd85a4f36c37cd7dfa96f129338c2b6d95dd0d8 (diff)
Reimplement tls-alpn-01 in acme (#6886)
This PR is the first part of work described in #6724. It reintroduces the tls-alpn-01 challenge in `acme` module, that was introduced by #5894 and reverted by #6100. The reason it was removed in the past is because some tests showed that with `1.0.2` branch of OpenSSL, the self-signed certificate containing the authorization key is sent to the requester even if the ALPN protocol `acme-tls/1` was not declared as supported by the requester during the TLS handshake. However recent discussions lead to the conclusion that this behavior was not a security issue, because first it is coherent with the behavior with servers that do not support ALPN at all, and second it cannot make a tls-alpn-01 challenge be validated in this kind of corner case. On top of the original modifications given by #5894, I merged the code to be up-to-date with our `master`, and fixed tests to match recent evolution about not displaying the `keyAuthorization` in the deserialized JSON form of an ACME challenge. I also move the logic to verify if ALPN is available on the current system, and so that the tls-alpn-01 challenge can be used, to a dedicated static function `is_available` in `acme.challenge.TLSALPN01`. This function is used in the related tests to skip them, and will be used in the future from Certbot plugins to trigger or not the logic related to tls-alpn-01, depending on the OpenSSL version available to Python. * Reimplement TLS-ALPN-01 challenge and standalone TLS-ALPN server from #5894. * Setup a class method to check if tls-alpn-01 is supported. * Add potential missing parameter in validation for tls-alpn * Improve comments * Make a class private * Handle old versions of openssl that do not terminate the handshake when they should do. * Add changelog * Explicitly close the TLS connection by the book. * Remove unused exception * Fix lint
-rw-r--r--acme/acme/challenges.py168
-rw-r--r--acme/acme/crypto_util.py63
-rw-r--r--acme/acme/standalone.py56
-rw-r--r--acme/tests/challenges_test.py87
-rw-r--r--acme/tests/crypto_util_test.py16
-rw-r--r--acme/tests/standalone_test.py57
-rw-r--r--acme/tests/testdata/README6
-rw-r--r--acme/tests/testdata/rsa1024_cert.pem13
-rw-r--r--certbot/CHANGELOG.md2
9 files changed, 423 insertions, 45 deletions
diff --git a/acme/acme/challenges.py b/acme/acme/challenges.py
index 39c8d6269..0b112be00 100644
--- a/acme/acme/challenges.py
+++ b/acme/acme/challenges.py
@@ -1,14 +1,20 @@
"""ACME Identifier Validation Challenges."""
import abc
+import codecs
import functools
import hashlib
import logging
+import socket
from cryptography.hazmat.primitives import hashes # type: ignore
import josepy as jose
import requests
import six
+from OpenSSL import SSL # type: ignore # https://github.com/python/typeshed/issues/2052
+from OpenSSL import crypto
+from acme import crypto_util
+from acme import errors
from acme import fields
logger = logging.getLogger(__name__)
@@ -362,29 +368,163 @@ class HTTP01(KeyAuthorizationChallenge):
@ChallengeResponse.register
class TLSALPN01Response(KeyAuthorizationChallengeResponse):
- """ACME TLS-ALPN-01 challenge response.
+ """ACME tls-alpn-01 challenge response."""
+ typ = "tls-alpn-01"
+
+ PORT = 443
+ """Verification port as defined by the protocol.
+
+ You can override it (e.g. for testing) by passing ``port`` to
+ `simple_verify`.
- This class only allows initiating a TLS-ALPN-01 challenge returned from the
- CA. Full support for responding to TLS-ALPN-01 challenges by generating and
- serving the expected response certificate is not currently provided.
"""
- typ = "tls-alpn-01"
+ ID_PE_ACME_IDENTIFIER_V1 = b"1.3.6.1.5.5.7.1.30.1"
+ ACME_TLS_1_PROTOCOL = "acme-tls/1"
-@Challenge.register
-class TLSALPN01(KeyAuthorizationChallenge):
- """ACME tls-alpn-01 challenge.
+ @property
+ def h(self):
+ """Hash value stored in challenge certificate"""
+ return hashlib.sha256(self.key_authorization.encode('utf-8')).digest()
- This class simply allows parsing the TLS-ALPN-01 challenge returned from
- the CA. Full TLS-ALPN-01 support is not currently provided.
+ def gen_cert(self, domain, key=None, bits=2048):
+ """Generate tls-alpn-01 certificate.
- """
- typ = "tls-alpn-01"
+ :param unicode domain: Domain verified by the challenge.
+ :param OpenSSL.crypto.PKey key: Optional private key used in
+ certificate generation. If not provided (``None``), then
+ fresh key will be generated.
+ :param int bits: Number of bits for newly generated key.
+
+ :rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey`
+
+ """
+ if key is None:
+ key = crypto.PKey()
+ key.generate_key(crypto.TYPE_RSA, bits)
+
+
+ der_value = b"DER:" + codecs.encode(self.h, 'hex')
+ acme_extension = crypto.X509Extension(self.ID_PE_ACME_IDENTIFIER_V1,
+ critical=True, value=der_value)
+
+ return crypto_util.gen_ss_cert(key, [domain], force_san=True,
+ extensions=[acme_extension]), key
+
+ def probe_cert(self, domain, host=None, port=None):
+ """Probe tls-alpn-01 challenge certificate.
+
+ :param unicode domain: domain being validated, required.
+ :param string host: IP address used to probe the certificate.
+ :param int port: Port used to probe the certificate.
+
+ """
+ if host is None:
+ host = socket.gethostbyname(domain)
+ logger.debug('%s resolved to %s', domain, host)
+ if port is None:
+ port = self.PORT
+
+ return crypto_util.probe_sni(host=host, port=port, name=domain,
+ alpn_protocols=[self.ACME_TLS_1_PROTOCOL])
+
+ def verify_cert(self, domain, cert):
+ """Verify tls-alpn-01 challenge certificate.
+
+ :param unicode domain: Domain name being validated.
+ :param OpensSSL.crypto.X509 cert: Challenge certificate.
+
+ :returns: Whether the certificate was successfully verified.
+ :rtype: bool
+
+ """
+ # pylint: disable=protected-access
+ names = crypto_util._pyopenssl_cert_or_req_all_names(cert)
+ logger.debug('Certificate %s. SANs: %s', cert.digest('sha256'), names)
+ if len(names) != 1 or names[0].lower() != domain.lower():
+ return False
+
+ for i in range(cert.get_extension_count()):
+ ext = cert.get_extension(i)
+ # FIXME: assume this is the ACME extension. Currently there is no
+ # way to get full OID of an unknown extension from pyopenssl.
+ if ext.get_short_name() == b'UNDEF':
+ data = ext.get_data()
+ return data == self.h
+
+ return False
+
+ # pylint: disable=too-many-arguments
+ def simple_verify(self, chall, domain, account_public_key,
+ cert=None, host=None, port=None):
+ """Simple verify.
+
+ Verify ``validation`` using ``account_public_key``, optionally
+ probe tls-alpn-01 certificate and check using `verify_cert`.
+
+ :param .challenges.TLSALPN01 chall: Corresponding challenge.
+ :param str domain: Domain name being validated.
+ :param JWK account_public_key:
+ :param OpenSSL.crypto.X509 cert: Optional certificate. If not
+ provided (``None``) certificate will be retrieved using
+ `probe_cert`.
+ :param string host: IP address used to probe the certificate.
+ :param int port: Port used to probe the certificate.
+
+
+ :returns: ``True`` if and only if client's control of the domain has been verified.
+ :rtype: bool
+
+ """
+ if not self.verify(chall, account_public_key):
+ logger.debug("Verification of key authorization in response failed")
+ return False
+
+ if cert is None:
+ try:
+ cert = self.probe_cert(domain=domain, host=host, port=port)
+ except errors.Error as error:
+ logger.debug(str(error), exc_info=True)
+ return False
+
+ return self.verify_cert(domain, cert)
+
+
+@Challenge.register # pylint: disable=too-many-ancestors
+class TLSALPN01(KeyAuthorizationChallenge):
+ """ACME tls-alpn-01 challenge."""
response_cls = TLSALPN01Response
+ typ = response_cls.typ
def validation(self, account_key, **kwargs):
- """Generate validation for the challenge."""
- raise NotImplementedError()
+ """Generate validation.
+
+ :param JWK account_key:
+ :param unicode domain: Domain verified by the challenge.
+ :param OpenSSL.crypto.PKey cert_key: Optional private key used
+ in certificate generation. If not provided (``None``), then
+ fresh key will be generated.
+
+ :rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey`
+
+ """
+ return self.response(account_key).gen_cert(
+ key=kwargs.get('cert_key'),
+ domain=kwargs.get('domain'))
+
+ @staticmethod
+ def is_supported():
+ """
+ Check if TLS-ALPN-01 challenge is supported on this machine.
+ This implies that a recent version of OpenSSL is installed (>= 1.0.2),
+ or a recent cryptography version shipped with the OpenSSL library is installed.
+
+ :returns: ``True`` if TLS-ALPN-01 is supported on this machine, ``False`` otherwise.
+ :rtype: bool
+
+ """
+ return (hasattr(SSL.Connection, "set_alpn_protos")
+ and hasattr(SSL.Context, "set_alpn_select_callback"))
@Challenge.register
diff --git a/acme/acme/crypto_util.py b/acme/acme/crypto_util.py
index dc8fedad0..f8b7e2b30 100644
--- a/acme/acme/crypto_util.py
+++ b/acme/acme/crypto_util.py
@@ -27,19 +27,41 @@ logger = logging.getLogger(__name__)
_DEFAULT_SSL_METHOD = SSL.SSLv23_METHOD # type: ignore
-class SSLSocket(object):
+class _DefaultCertSelection(object):
+ def __init__(self, certs):
+ self.certs = certs
+
+ def __call__(self, connection):
+ server_name = connection.get_servername()
+ return self.certs.get(server_name, None)
+
+
+class SSLSocket(object): # pylint: disable=too-few-public-methods
"""SSL wrapper for sockets.
:ivar socket sock: Original wrapped socket.
:ivar dict certs: Mapping from domain names (`bytes`) to
`OpenSSL.crypto.X509`.
:ivar method: See `OpenSSL.SSL.Context` for allowed values.
+ :ivar alpn_selection: Hook to select negotiated ALPN protocol for
+ connection.
+ :ivar cert_selection: Hook to select certificate for connection. If given,
+ `certs` parameter would be ignored, and therefore must be empty.
"""
- def __init__(self, sock, certs, method=_DEFAULT_SSL_METHOD):
+ def __init__(self, sock, certs=None,
+ method=_DEFAULT_SSL_METHOD, alpn_selection=None,
+ cert_selection=None):
self.sock = sock
- self.certs = certs
+ self.alpn_selection = alpn_selection
self.method = method
+ if not cert_selection and not certs:
+ raise ValueError("Neither cert_selection or certs specified.")
+ if cert_selection and certs:
+ raise ValueError("Both cert_selection and certs specified.")
+ if cert_selection is None:
+ cert_selection = _DefaultCertSelection(certs)
+ self.cert_selection = cert_selection
def __getattr__(self, name):
return getattr(self.sock, name)
@@ -56,18 +78,19 @@ class SSLSocket(object):
:type connection: :class:`OpenSSL.Connection`
"""
- server_name = connection.get_servername()
- try:
- key, cert = self.certs[server_name]
- except KeyError:
- logger.debug("Server name (%s) not recognized, dropping SSL",
- server_name)
+ pair = self.cert_selection(connection)
+ if pair is None:
+ logger.debug("Certificate selection for server name %s failed, dropping SSL",
+ connection.get_servername())
return
+ key, cert = pair
new_context = SSL.Context(self.method)
new_context.set_options(SSL.OP_NO_SSLv2)
new_context.set_options(SSL.OP_NO_SSLv3)
new_context.use_privatekey(key)
new_context.use_certificate(cert)
+ if self.alpn_selection is not None:
+ new_context.set_alpn_select_callback(self.alpn_selection)
connection.set_context(new_context)
class FakeConnection(object):
@@ -92,6 +115,8 @@ class SSLSocket(object):
context.set_options(SSL.OP_NO_SSLv2)
context.set_options(SSL.OP_NO_SSLv3)
context.set_tlsext_servername_callback(self._pick_certificate_cb)
+ if self.alpn_selection is not None:
+ context.set_alpn_select_callback(self.alpn_selection)
ssl_sock = self.FakeConnection(SSL.Connection(context, sock))
ssl_sock.set_accept_state()
@@ -107,8 +132,9 @@ class SSLSocket(object):
return ssl_sock, addr
-def probe_sni(name, host, port=443, timeout=300,
- method=_DEFAULT_SSL_METHOD, source_address=('', 0)):
+def probe_sni(name, host, port=443, timeout=300, # pylint: disable=too-many-arguments
+ method=_DEFAULT_SSL_METHOD, source_address=('', 0),
+ alpn_protocols=None):
"""Probe SNI server for SSL certificate.
:param bytes name: Byte string to send as the server name in the
@@ -120,6 +146,8 @@ def probe_sni(name, host, port=443, timeout=300,
:param tuple source_address: Enables multi-path probing (selection
of source interface). See `socket.creation_connection` for more
info. Available only in Python 2.7+.
+ :param alpn_protocols: Protocols to request using ALPN.
+ :type alpn_protocols: `list` of `bytes`
:raises acme.errors.Error: In case of any problems.
@@ -149,6 +177,8 @@ def probe_sni(name, host, port=443, timeout=300,
client_ssl = SSL.Connection(context, client)
client_ssl.set_connect_state()
client_ssl.set_tlsext_host_name(name) # pyOpenSSL>=0.13
+ if alpn_protocols is not None:
+ client_ssl.set_alpn_protos(alpn_protocols)
try:
client_ssl.do_handshake()
client_ssl.shutdown()
@@ -239,12 +269,14 @@ def _pyopenssl_cert_or_req_san(cert_or_req):
def gen_ss_cert(key, domains, not_before=None,
- validity=(7 * 24 * 60 * 60), force_san=True):
+ validity=(7 * 24 * 60 * 60), force_san=True, extensions=None):
"""Generate new self-signed certificate.
:type domains: `list` of `unicode`
:param OpenSSL.crypto.PKey key:
:param bool force_san:
+ :param extensions: List of additional extensions to include in the cert.
+ :type extensions: `list` of `OpenSSL.crypto.X509Extension`
If more than one domain is provided, all of the domains are put into
``subjectAltName`` X.509 extension and first domain is set as the
@@ -257,10 +289,13 @@ def gen_ss_cert(key, domains, not_before=None,
cert.set_serial_number(int(binascii.hexlify(os.urandom(16)), 16))
cert.set_version(2)
- extensions = [
+ if extensions is None:
+ extensions = []
+
+ extensions.append(
crypto.X509Extension(
b"basicConstraints", True, b"CA:TRUE, pathlen:0"),
- ]
+ )
cert.get_subject().CN = domains[0]
# TODO: what to put into cert.get_subject()?
diff --git a/acme/acme/standalone.py b/acme/acme/standalone.py
index 236f2c234..52ac07915 100644
--- a/acme/acme/standalone.py
+++ b/acme/acme/standalone.py
@@ -33,7 +33,14 @@ class TLSServer(socketserver.TCPServer):
def _wrap_sock(self):
self.socket = crypto_util.SSLSocket(
- self.socket, certs=self.certs, method=self.method)
+ self.socket, cert_selection=self._cert_selection,
+ alpn_selection=getattr(self, '_alpn_selection', None),
+ method=self.method)
+
+ def _cert_selection(self, connection): # pragma: no cover
+ """Callback selecting certificate for connection."""
+ server_name = connection.get_servername()
+ return self.certs.get(server_name, None)
def server_bind(self):
self._wrap_sock()
@@ -120,6 +127,40 @@ class BaseDualNetworkedServers(object):
self.threads = []
+class TLSALPN01Server(TLSServer, ACMEServerMixin):
+ """TLSALPN01 Server."""
+
+ ACME_TLS_1_PROTOCOL = b"acme-tls/1"
+
+ def __init__(self, server_address, certs, challenge_certs, ipv6=False):
+ TLSServer.__init__(
+ self, server_address, _BaseRequestHandlerWithLogging, certs=certs,
+ ipv6=ipv6)
+ self.challenge_certs = challenge_certs
+
+ def _cert_selection(self, connection):
+ # TODO: We would like to serve challenge cert only if asked for it via
+ # ALPN. To do this, we need to retrieve the list of protos from client
+ # hello, but this is currently impossible with openssl [0], and ALPN
+ # negotiation is done after cert selection.
+ # Therefore, currently we always return challenge cert, and terminate
+ # handshake in alpn_selection() if ALPN protos are not what we expect.
+ # [0] https://github.com/openssl/openssl/issues/4952
+ server_name = connection.get_servername()
+ logger.debug("Serving challenge cert for server name %s", server_name)
+ return self.challenge_certs.get(server_name, None)
+
+ def _alpn_selection(self, _connection, alpn_protos):
+ """Callback to select alpn protocol."""
+ if len(alpn_protos) == 1 and alpn_protos[0] == self.ACME_TLS_1_PROTOCOL:
+ logger.debug("Agreed on %s ALPN", self.ACME_TLS_1_PROTOCOL)
+ return self.ACME_TLS_1_PROTOCOL
+ logger.debug("Cannot agree on ALPN proto. Got: %s", str(alpn_protos))
+ # Explicitly close the connection now, by returning an empty string.
+ # See https://www.pyopenssl.org/en/stable/api/ssl.html#OpenSSL.SSL.Context.set_alpn_select_callback # pylint: disable=line-too-long
+ return b""
+
+
class HTTPServer(BaseHTTPServer.HTTPServer):
"""Generic HTTP Server."""
@@ -222,3 +263,16 @@ class HTTP01RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""
return functools.partial(
cls, simple_http_resources=simple_http_resources)
+
+
+class _BaseRequestHandlerWithLogging(socketserver.BaseRequestHandler):
+ """BaseRequestHandler with logging."""
+
+ def log_message(self, format, *args): # pylint: disable=redefined-builtin
+ """Log arbitrary message."""
+ logger.debug("%s - - %s", self.client_address[0], format % args)
+
+ def handle(self):
+ """Handle request."""
+ self.log_message("Incoming request")
+ socketserver.BaseRequestHandler.handle(self)
diff --git a/acme/tests/challenges_test.py b/acme/tests/challenges_test.py
index adebaffc5..2b44d677d 100644
--- a/acme/tests/challenges_test.py
+++ b/acme/tests/challenges_test.py
@@ -2,10 +2,13 @@
import unittest
import josepy as jose
+import OpenSSL
import mock
import requests
from six.moves.urllib import parse as urllib_parse
+from acme import errors
+
import test_util
CERT = test_util.load_comparable_cert('cert.pem')
@@ -256,30 +259,87 @@ class HTTP01Test(unittest.TestCase):
class TLSALPN01ResponseTest(unittest.TestCase):
def setUp(self):
- from acme.challenges import TLSALPN01Response
- self.msg = TLSALPN01Response(key_authorization=u'foo')
+ from acme.challenges import TLSALPN01
+ self.chall = TLSALPN01(
+ token=jose.b64decode(b'a82d5ff8ef740d12881f6d3c2277ab2e'))
+ self.domain = u'example.com'
+ self.domain2 = u'example2.com'
+
+ self.response = self.chall.response(KEY)
self.jmsg = {
'resource': 'challenge',
'type': 'tls-alpn-01',
- 'keyAuthorization': u'foo',
+ 'keyAuthorization': self.response.key_authorization,
}
- from acme.challenges import TLSALPN01
- self.chall = TLSALPN01(token=(b'x' * 16))
- self.response = self.chall.response(KEY)
-
def test_to_partial_json(self):
self.assertEqual({k: v for k, v in self.jmsg.items() if k != 'keyAuthorization'},
- self.msg.to_partial_json())
+ self.response.to_partial_json())
def test_from_json(self):
from acme.challenges import TLSALPN01Response
- self.assertEqual(self.msg, TLSALPN01Response.from_json(self.jmsg))
+ self.assertEqual(self.response, TLSALPN01Response.from_json(self.jmsg))
def test_from_json_hashable(self):
from acme.challenges import TLSALPN01Response
hash(TLSALPN01Response.from_json(self.jmsg))
+ def test_gen_verify_cert(self):
+ key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem')
+ cert, key2 = self.response.gen_cert(self.domain, key1)
+ self.assertEqual(key1, key2)
+ self.assertTrue(self.response.verify_cert(self.domain, cert))
+
+ def test_gen_verify_cert_gen_key(self):
+ cert, key = self.response.gen_cert(self.domain)
+ self.assertTrue(isinstance(key, OpenSSL.crypto.PKey))
+ self.assertTrue(self.response.verify_cert(self.domain, cert))
+
+ def test_verify_bad_cert(self):
+ self.assertFalse(self.response.verify_cert(self.domain,
+ test_util.load_cert('cert.pem')))
+
+ def test_verify_bad_domain(self):
+ key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem')
+ cert, key2 = self.response.gen_cert(self.domain, key1)
+ self.assertEqual(key1, key2)
+ self.assertFalse(self.response.verify_cert(self.domain2, cert))
+
+ def test_simple_verify_bad_key_authorization(self):
+ key2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
+ self.response.simple_verify(self.chall, "local", key2.public_key())
+
+ @mock.patch('acme.challenges.TLSALPN01Response.verify_cert', autospec=True)
+ def test_simple_verify(self, mock_verify_cert):
+ mock_verify_cert.return_value = mock.sentinel.verification
+ self.assertEqual(
+ mock.sentinel.verification, self.response.simple_verify(
+ self.chall, self.domain, KEY.public_key(),
+ cert=mock.sentinel.cert))
+ mock_verify_cert.assert_called_once_with(
+ self.response, self.domain, mock.sentinel.cert)
+
+ @mock.patch('acme.challenges.socket.gethostbyname')
+ @mock.patch('acme.challenges.crypto_util.probe_sni')
+ def test_probe_cert(self, mock_probe_sni, mock_gethostbyname):
+ mock_gethostbyname.return_value = '127.0.0.1'
+ self.response.probe_cert('foo.com')
+ mock_gethostbyname.assert_called_once_with('foo.com')
+ mock_probe_sni.assert_called_once_with(
+ host='127.0.0.1', port=self.response.PORT, name='foo.com',
+ alpn_protocols=['acme-tls/1'])
+
+ self.response.probe_cert('foo.com', host='8.8.8.8')
+ mock_probe_sni.assert_called_with(
+ host='8.8.8.8', port=mock.ANY, name='foo.com',
+ alpn_protocols=['acme-tls/1'])
+
+ @mock.patch('acme.challenges.TLSALPN01Response.probe_cert')
+ def test_simple_verify_false_on_probe_error(self, mock_probe_cert):
+ mock_probe_cert.side_effect = errors.Error
+ self.assertFalse(self.response.simple_verify(
+ self.chall, self.domain, KEY.public_key()))
+
class TLSALPN01Test(unittest.TestCase):
@@ -309,8 +369,13 @@ class TLSALPN01Test(unittest.TestCase):
self.assertRaises(
jose.DeserializationError, TLSALPN01.from_json, self.jmsg)
- def test_validation(self):
- self.assertRaises(NotImplementedError, self.msg.validation, KEY)
+ @mock.patch('acme.challenges.TLSALPN01Response.gen_cert')
+ def test_validation(self, mock_gen_cert):
+ mock_gen_cert.return_value = ('cert', 'key')
+ self.assertEqual(('cert', 'key'), self.msg.validation(
+ KEY, cert_key=mock.sentinel.cert_key, domain=mock.sentinel.domain))
+ mock_gen_cert.assert_called_once_with(key=mock.sentinel.cert_key,
+ domain=mock.sentinel.domain)
class DNSTest(unittest.TestCase):
diff --git a/acme/tests/crypto_util_test.py b/acme/tests/crypto_util_test.py
index 41640ed60..ff08a5405 100644
--- a/acme/tests/crypto_util_test.py
+++ b/acme/tests/crypto_util_test.py
@@ -18,7 +18,6 @@ import test_util
class SSLSocketAndProbeSNITest(unittest.TestCase):
"""Tests for acme.crypto_util.SSLSocket/probe_sni."""
-
def setUp(self):
self.cert = test_util.load_comparable_cert('rsa2048_cert.pem')
key = test_util.load_pyopenssl_private_key('rsa2048_key.pem')
@@ -32,7 +31,8 @@ class SSLSocketAndProbeSNITest(unittest.TestCase):
# six.moves.* | pylint: disable=attribute-defined-outside-init,no-init
def server_bind(self): # pylint: disable=missing-docstring
- self.socket = SSLSocket(socket.socket(), certs=certs)
+ self.socket = SSLSocket(socket.socket(),
+ certs)
socketserver.TCPServer.server_bind(self)
self.server = _TestServer(('', 0), socketserver.BaseRequestHandler)
@@ -73,6 +73,18 @@ class SSLSocketAndProbeSNITest(unittest.TestCase):
socket.setdefaulttimeout(original_timeout)
+class SSLSocketTest(unittest.TestCase):
+ """Tests for acme.crypto_util.SSLSocket."""
+
+ def test_ssl_socket_invalid_arguments(self):
+ from acme.crypto_util import SSLSocket
+ with self.assertRaises(ValueError):
+ _ = SSLSocket(None, {'sni': ('key', 'cert')},
+ cert_selection=lambda _: None)
+ with self.assertRaises(ValueError):
+ _ = SSLSocket(None)
+
+
class PyOpenSSLCertOrReqAllNamesTest(unittest.TestCase):
"""Test for acme.crypto_util._pyopenssl_cert_or_req_all_names."""
diff --git a/acme/tests/standalone_test.py b/acme/tests/standalone_test.py
index 83ced12b0..e2817b29c 100644
--- a/acme/tests/standalone_test.py
+++ b/acme/tests/standalone_test.py
@@ -10,7 +10,10 @@ from six.moves import http_client # pylint: disable=import-error
from six.moves import socketserver # type: ignore # pylint: disable=import-error
from acme import challenges
+from acme import crypto_util
+from acme import errors
from acme.magic_typing import Set # pylint: disable=unused-import, no-name-in-module
+
import test_util
@@ -84,6 +87,59 @@ class HTTP01ServerTest(unittest.TestCase):
self.assertFalse(self._test_http01(add=False))
+@unittest.skipIf(not challenges.TLSALPN01.is_supported(), "pyOpenSSL too old")
+class TLSALPN01ServerTest(unittest.TestCase):
+ """Test for acme.standalone.TLSALPN01Server."""
+
+ def setUp(self):
+ self.certs = {b'localhost': (
+ test_util.load_pyopenssl_private_key('rsa2048_key.pem'),
+ test_util.load_cert('rsa2048_cert.pem'),
+ )}
+ # Use different certificate for challenge.
+ self.challenge_certs = {b'localhost': (
+ test_util.load_pyopenssl_private_key('rsa1024_key.pem'),
+ test_util.load_cert('rsa1024_cert.pem'),
+ )}
+ from acme.standalone import TLSALPN01Server
+ self.server = TLSALPN01Server(("localhost", 0), certs=self.certs,
+ challenge_certs=self.challenge_certs)
+ # pylint: disable=no-member
+ self.thread = threading.Thread(target=self.server.serve_forever)
+ self.thread.start()
+
+ def tearDown(self):
+ self.server.shutdown() # pylint: disable=no-member
+ self.thread.join()
+
+ # TODO: This is not implemented yet, see comments in standalone.py
+ # def test_certs(self):
+ # host, port = self.server.socket.getsockname()[:2]
+ # cert = crypto_util.probe_sni(
+ # b'localhost', host=host, port=port, timeout=1)
+ # # Expect normal cert when connecting without ALPN.
+ # self.assertEqual(jose.ComparableX509(cert),
+ # jose.ComparableX509(self.certs[b'localhost'][1]))
+
+ def test_challenge_certs(self):
+ host, port = self.server.socket.getsockname()[:2]
+ cert = crypto_util.probe_sni(
+ b'localhost', host=host, port=port, timeout=1,
+ alpn_protocols=[b"acme-tls/1"])
+ # Expect challenge cert when connecting with ALPN.
+ self.assertEqual(
+ jose.ComparableX509(cert),
+ jose.ComparableX509(self.challenge_certs[b'localhost'][1])
+ )
+
+ def test_bad_alpn(self):
+ host, port = self.server.socket.getsockname()[:2]
+ with self.assertRaises(errors.Error):
+ crypto_util.probe_sni(
+ b'localhost', host=host, port=port, timeout=1,
+ alpn_protocols=[b"bad-alpn"])
+
+
class BaseDualNetworkedServersTest(unittest.TestCase):
"""Test for acme.standalone.BaseDualNetworkedServers."""
@@ -138,7 +194,6 @@ class BaseDualNetworkedServersTest(unittest.TestCase):
class HTTP01DualNetworkedServersTest(unittest.TestCase):
"""Tests for acme.standalone.HTTP01DualNetworkedServers."""
-
def setUp(self):
self.account_key = jose.JWK.load(
test_util.load_vector('rsa1024_key.pem'))
diff --git a/acme/tests/testdata/README b/acme/tests/testdata/README
index dfe3f5405..d65cc3018 100644
--- a/acme/tests/testdata/README
+++ b/acme/tests/testdata/README
@@ -10,6 +10,8 @@ and for the CSR:
openssl req -key rsa2048_key.pem -new -subj '/CN=example.com' -outform DER > csr.der
-and for the certificate:
+and for the certificates:
- openssl req -key rsa2047_key.pem -new -subj '/CN=example.com' -x509 -outform DER > cert.der
+ openssl req -key rsa2048_key.pem -new -subj '/CN=example.com' -x509 -outform DER > cert.der
+ openssl req -key rsa2048_key.pem -new -subj '/CN=example.com' -x509 > rsa2048_cert.pem
+ openssl req -key rsa1024_key.pem -new -subj '/CN=example.com' -x509 > rsa1024_cert.pem
diff --git a/acme/tests/testdata/rsa1024_cert.pem b/acme/tests/testdata/rsa1024_cert.pem
new file mode 100644
index 000000000..1b7912181
--- /dev/null
+++ b/acme/tests/testdata/rsa1024_cert.pem
@@ -0,0 +1,13 @@
+-----BEGIN CERTIFICATE-----
+MIIB/TCCAWagAwIBAgIJAOyRIBs3QT8QMA0GCSqGSIb3DQEBCwUAMBYxFDASBgNV
+BAMMC2V4YW1wbGUuY29tMB4XDTE4MDQyMzEwMzE0NFoXDTE4MDUyMzEwMzE0NFow
+FjEUMBIGA1UEAwwLZXhhbXBsZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJ
+AoGBAJqJ87R8aVwByONxgQA9hwgvQd/QqI1r1UInXhEF2VnEtZGtUWLi100IpIqr
+Mq4qusDwNZ3g8cUPtSkvJGs89djoajMDIJP7lQUEKUYnYrI0q755Tr/DgLWSk7iW
+l5ezym0VzWUD0/xXUz8yRbNMTjTac80rS5SZk2ja2wWkYlRJAgMBAAGjUzBRMB0G
+A1UdDgQWBBSsaX0IVZ4XXwdeffVAbG7gnxSYjTAfBgNVHSMEGDAWgBSsaX0IVZ4X
+XwdeffVAbG7gnxSYjTAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4GB
+ADe7SVmvGH2nkwVfONk8TauRUDkePN1CJZKFb2zW1uO9ANJ2v5Arm/OQp0BG/xnI
+Djw/aLTNVESF89oe15dkrUErtcaF413MC1Ld5lTCaJLHLGqDKY69e02YwRuxW7jY
+qarpt7k7aR5FbcfO5r4V/FK/Gvp4Dmoky8uap7SJIW6x
+-----END CERTIFICATE-----
diff --git a/certbot/CHANGELOG.md b/certbot/CHANGELOG.md
index 2f22b5204..36547fdd1 100644
--- a/certbot/CHANGELOG.md
+++ b/certbot/CHANGELOG.md
@@ -11,6 +11,8 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
the `manual` plugin: `CERTBOT_REMAINING_CHALLENGES` is equal to the number of challenges
remaining after the current challenge, `CERTBOT_ALL_DOMAINS` is a comma-separated list
of all domains challenged for the current certificate.
+* Added TLS-ALPN-01 challenge support in the `acme` library. Support of this
+ challenge in the Certbot client is planned to be added in a future release.
### Changed