diff options
Diffstat (limited to 'acme')
-rw-r--r-- | acme/acme/challenges.py | 168 | ||||
-rw-r--r-- | acme/acme/crypto_util.py | 63 | ||||
-rw-r--r-- | acme/acme/standalone.py | 56 | ||||
-rw-r--r-- | acme/tests/challenges_test.py | 87 | ||||
-rw-r--r-- | acme/tests/crypto_util_test.py | 16 | ||||
-rw-r--r-- | acme/tests/standalone_test.py | 57 | ||||
-rw-r--r-- | acme/tests/testdata/README | 6 | ||||
-rw-r--r-- | acme/tests/testdata/rsa1024_cert.pem | 13 |
8 files changed, 421 insertions, 45 deletions
diff --git a/acme/acme/challenges.py b/acme/acme/challenges.py index 39c8d6269..0b112be00 100644 --- a/acme/acme/challenges.py +++ b/acme/acme/challenges.py @@ -1,14 +1,20 @@ """ACME Identifier Validation Challenges.""" import abc +import codecs import functools import hashlib import logging +import socket from cryptography.hazmat.primitives import hashes # type: ignore import josepy as jose import requests import six +from OpenSSL import SSL # type: ignore # https://github.com/python/typeshed/issues/2052 +from OpenSSL import crypto +from acme import crypto_util +from acme import errors from acme import fields logger = logging.getLogger(__name__) @@ -362,29 +368,163 @@ class HTTP01(KeyAuthorizationChallenge): @ChallengeResponse.register class TLSALPN01Response(KeyAuthorizationChallengeResponse): - """ACME TLS-ALPN-01 challenge response. + """ACME tls-alpn-01 challenge response.""" + typ = "tls-alpn-01" + + PORT = 443 + """Verification port as defined by the protocol. + + You can override it (e.g. for testing) by passing ``port`` to + `simple_verify`. - This class only allows initiating a TLS-ALPN-01 challenge returned from the - CA. Full support for responding to TLS-ALPN-01 challenges by generating and - serving the expected response certificate is not currently provided. """ - typ = "tls-alpn-01" + ID_PE_ACME_IDENTIFIER_V1 = b"1.3.6.1.5.5.7.1.30.1" + ACME_TLS_1_PROTOCOL = "acme-tls/1" -@Challenge.register -class TLSALPN01(KeyAuthorizationChallenge): - """ACME tls-alpn-01 challenge. + @property + def h(self): + """Hash value stored in challenge certificate""" + return hashlib.sha256(self.key_authorization.encode('utf-8')).digest() - This class simply allows parsing the TLS-ALPN-01 challenge returned from - the CA. Full TLS-ALPN-01 support is not currently provided. + def gen_cert(self, domain, key=None, bits=2048): + """Generate tls-alpn-01 certificate. - """ - typ = "tls-alpn-01" + :param unicode domain: Domain verified by the challenge. + :param OpenSSL.crypto.PKey key: Optional private key used in + certificate generation. If not provided (``None``), then + fresh key will be generated. + :param int bits: Number of bits for newly generated key. + + :rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey` + + """ + if key is None: + key = crypto.PKey() + key.generate_key(crypto.TYPE_RSA, bits) + + + der_value = b"DER:" + codecs.encode(self.h, 'hex') + acme_extension = crypto.X509Extension(self.ID_PE_ACME_IDENTIFIER_V1, + critical=True, value=der_value) + + return crypto_util.gen_ss_cert(key, [domain], force_san=True, + extensions=[acme_extension]), key + + def probe_cert(self, domain, host=None, port=None): + """Probe tls-alpn-01 challenge certificate. + + :param unicode domain: domain being validated, required. + :param string host: IP address used to probe the certificate. + :param int port: Port used to probe the certificate. + + """ + if host is None: + host = socket.gethostbyname(domain) + logger.debug('%s resolved to %s', domain, host) + if port is None: + port = self.PORT + + return crypto_util.probe_sni(host=host, port=port, name=domain, + alpn_protocols=[self.ACME_TLS_1_PROTOCOL]) + + def verify_cert(self, domain, cert): + """Verify tls-alpn-01 challenge certificate. + + :param unicode domain: Domain name being validated. + :param OpensSSL.crypto.X509 cert: Challenge certificate. + + :returns: Whether the certificate was successfully verified. + :rtype: bool + + """ + # pylint: disable=protected-access + names = crypto_util._pyopenssl_cert_or_req_all_names(cert) + logger.debug('Certificate %s. SANs: %s', cert.digest('sha256'), names) + if len(names) != 1 or names[0].lower() != domain.lower(): + return False + + for i in range(cert.get_extension_count()): + ext = cert.get_extension(i) + # FIXME: assume this is the ACME extension. Currently there is no + # way to get full OID of an unknown extension from pyopenssl. + if ext.get_short_name() == b'UNDEF': + data = ext.get_data() + return data == self.h + + return False + + # pylint: disable=too-many-arguments + def simple_verify(self, chall, domain, account_public_key, + cert=None, host=None, port=None): + """Simple verify. + + Verify ``validation`` using ``account_public_key``, optionally + probe tls-alpn-01 certificate and check using `verify_cert`. + + :param .challenges.TLSALPN01 chall: Corresponding challenge. + :param str domain: Domain name being validated. + :param JWK account_public_key: + :param OpenSSL.crypto.X509 cert: Optional certificate. If not + provided (``None``) certificate will be retrieved using + `probe_cert`. + :param string host: IP address used to probe the certificate. + :param int port: Port used to probe the certificate. + + + :returns: ``True`` if and only if client's control of the domain has been verified. + :rtype: bool + + """ + if not self.verify(chall, account_public_key): + logger.debug("Verification of key authorization in response failed") + return False + + if cert is None: + try: + cert = self.probe_cert(domain=domain, host=host, port=port) + except errors.Error as error: + logger.debug(str(error), exc_info=True) + return False + + return self.verify_cert(domain, cert) + + +@Challenge.register # pylint: disable=too-many-ancestors +class TLSALPN01(KeyAuthorizationChallenge): + """ACME tls-alpn-01 challenge.""" response_cls = TLSALPN01Response + typ = response_cls.typ def validation(self, account_key, **kwargs): - """Generate validation for the challenge.""" - raise NotImplementedError() + """Generate validation. + + :param JWK account_key: + :param unicode domain: Domain verified by the challenge. + :param OpenSSL.crypto.PKey cert_key: Optional private key used + in certificate generation. If not provided (``None``), then + fresh key will be generated. + + :rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey` + + """ + return self.response(account_key).gen_cert( + key=kwargs.get('cert_key'), + domain=kwargs.get('domain')) + + @staticmethod + def is_supported(): + """ + Check if TLS-ALPN-01 challenge is supported on this machine. + This implies that a recent version of OpenSSL is installed (>= 1.0.2), + or a recent cryptography version shipped with the OpenSSL library is installed. + + :returns: ``True`` if TLS-ALPN-01 is supported on this machine, ``False`` otherwise. + :rtype: bool + + """ + return (hasattr(SSL.Connection, "set_alpn_protos") + and hasattr(SSL.Context, "set_alpn_select_callback")) @Challenge.register diff --git a/acme/acme/crypto_util.py b/acme/acme/crypto_util.py index dc8fedad0..f8b7e2b30 100644 --- a/acme/acme/crypto_util.py +++ b/acme/acme/crypto_util.py @@ -27,19 +27,41 @@ logger = logging.getLogger(__name__) _DEFAULT_SSL_METHOD = SSL.SSLv23_METHOD # type: ignore -class SSLSocket(object): +class _DefaultCertSelection(object): + def __init__(self, certs): + self.certs = certs + + def __call__(self, connection): + server_name = connection.get_servername() + return self.certs.get(server_name, None) + + +class SSLSocket(object): # pylint: disable=too-few-public-methods """SSL wrapper for sockets. :ivar socket sock: Original wrapped socket. :ivar dict certs: Mapping from domain names (`bytes`) to `OpenSSL.crypto.X509`. :ivar method: See `OpenSSL.SSL.Context` for allowed values. + :ivar alpn_selection: Hook to select negotiated ALPN protocol for + connection. + :ivar cert_selection: Hook to select certificate for connection. If given, + `certs` parameter would be ignored, and therefore must be empty. """ - def __init__(self, sock, certs, method=_DEFAULT_SSL_METHOD): + def __init__(self, sock, certs=None, + method=_DEFAULT_SSL_METHOD, alpn_selection=None, + cert_selection=None): self.sock = sock - self.certs = certs + self.alpn_selection = alpn_selection self.method = method + if not cert_selection and not certs: + raise ValueError("Neither cert_selection or certs specified.") + if cert_selection and certs: + raise ValueError("Both cert_selection and certs specified.") + if cert_selection is None: + cert_selection = _DefaultCertSelection(certs) + self.cert_selection = cert_selection def __getattr__(self, name): return getattr(self.sock, name) @@ -56,18 +78,19 @@ class SSLSocket(object): :type connection: :class:`OpenSSL.Connection` """ - server_name = connection.get_servername() - try: - key, cert = self.certs[server_name] - except KeyError: - logger.debug("Server name (%s) not recognized, dropping SSL", - server_name) + pair = self.cert_selection(connection) + if pair is None: + logger.debug("Certificate selection for server name %s failed, dropping SSL", + connection.get_servername()) return + key, cert = pair new_context = SSL.Context(self.method) new_context.set_options(SSL.OP_NO_SSLv2) new_context.set_options(SSL.OP_NO_SSLv3) new_context.use_privatekey(key) new_context.use_certificate(cert) + if self.alpn_selection is not None: + new_context.set_alpn_select_callback(self.alpn_selection) connection.set_context(new_context) class FakeConnection(object): @@ -92,6 +115,8 @@ class SSLSocket(object): context.set_options(SSL.OP_NO_SSLv2) context.set_options(SSL.OP_NO_SSLv3) context.set_tlsext_servername_callback(self._pick_certificate_cb) + if self.alpn_selection is not None: + context.set_alpn_select_callback(self.alpn_selection) ssl_sock = self.FakeConnection(SSL.Connection(context, sock)) ssl_sock.set_accept_state() @@ -107,8 +132,9 @@ class SSLSocket(object): return ssl_sock, addr -def probe_sni(name, host, port=443, timeout=300, - method=_DEFAULT_SSL_METHOD, source_address=('', 0)): +def probe_sni(name, host, port=443, timeout=300, # pylint: disable=too-many-arguments + method=_DEFAULT_SSL_METHOD, source_address=('', 0), + alpn_protocols=None): """Probe SNI server for SSL certificate. :param bytes name: Byte string to send as the server name in the @@ -120,6 +146,8 @@ def probe_sni(name, host, port=443, timeout=300, :param tuple source_address: Enables multi-path probing (selection of source interface). See `socket.creation_connection` for more info. Available only in Python 2.7+. + :param alpn_protocols: Protocols to request using ALPN. + :type alpn_protocols: `list` of `bytes` :raises acme.errors.Error: In case of any problems. @@ -149,6 +177,8 @@ def probe_sni(name, host, port=443, timeout=300, client_ssl = SSL.Connection(context, client) client_ssl.set_connect_state() client_ssl.set_tlsext_host_name(name) # pyOpenSSL>=0.13 + if alpn_protocols is not None: + client_ssl.set_alpn_protos(alpn_protocols) try: client_ssl.do_handshake() client_ssl.shutdown() @@ -239,12 +269,14 @@ def _pyopenssl_cert_or_req_san(cert_or_req): def gen_ss_cert(key, domains, not_before=None, - validity=(7 * 24 * 60 * 60), force_san=True): + validity=(7 * 24 * 60 * 60), force_san=True, extensions=None): """Generate new self-signed certificate. :type domains: `list` of `unicode` :param OpenSSL.crypto.PKey key: :param bool force_san: + :param extensions: List of additional extensions to include in the cert. + :type extensions: `list` of `OpenSSL.crypto.X509Extension` If more than one domain is provided, all of the domains are put into ``subjectAltName`` X.509 extension and first domain is set as the @@ -257,10 +289,13 @@ def gen_ss_cert(key, domains, not_before=None, cert.set_serial_number(int(binascii.hexlify(os.urandom(16)), 16)) cert.set_version(2) - extensions = [ + if extensions is None: + extensions = [] + + extensions.append( crypto.X509Extension( b"basicConstraints", True, b"CA:TRUE, pathlen:0"), - ] + ) cert.get_subject().CN = domains[0] # TODO: what to put into cert.get_subject()? diff --git a/acme/acme/standalone.py b/acme/acme/standalone.py index 236f2c234..52ac07915 100644 --- a/acme/acme/standalone.py +++ b/acme/acme/standalone.py @@ -33,7 +33,14 @@ class TLSServer(socketserver.TCPServer): def _wrap_sock(self): self.socket = crypto_util.SSLSocket( - self.socket, certs=self.certs, method=self.method) + self.socket, cert_selection=self._cert_selection, + alpn_selection=getattr(self, '_alpn_selection', None), + method=self.method) + + def _cert_selection(self, connection): # pragma: no cover + """Callback selecting certificate for connection.""" + server_name = connection.get_servername() + return self.certs.get(server_name, None) def server_bind(self): self._wrap_sock() @@ -120,6 +127,40 @@ class BaseDualNetworkedServers(object): self.threads = [] +class TLSALPN01Server(TLSServer, ACMEServerMixin): + """TLSALPN01 Server.""" + + ACME_TLS_1_PROTOCOL = b"acme-tls/1" + + def __init__(self, server_address, certs, challenge_certs, ipv6=False): + TLSServer.__init__( + self, server_address, _BaseRequestHandlerWithLogging, certs=certs, + ipv6=ipv6) + self.challenge_certs = challenge_certs + + def _cert_selection(self, connection): + # TODO: We would like to serve challenge cert only if asked for it via + # ALPN. To do this, we need to retrieve the list of protos from client + # hello, but this is currently impossible with openssl [0], and ALPN + # negotiation is done after cert selection. + # Therefore, currently we always return challenge cert, and terminate + # handshake in alpn_selection() if ALPN protos are not what we expect. + # [0] https://github.com/openssl/openssl/issues/4952 + server_name = connection.get_servername() + logger.debug("Serving challenge cert for server name %s", server_name) + return self.challenge_certs.get(server_name, None) + + def _alpn_selection(self, _connection, alpn_protos): + """Callback to select alpn protocol.""" + if len(alpn_protos) == 1 and alpn_protos[0] == self.ACME_TLS_1_PROTOCOL: + logger.debug("Agreed on %s ALPN", self.ACME_TLS_1_PROTOCOL) + return self.ACME_TLS_1_PROTOCOL + logger.debug("Cannot agree on ALPN proto. Got: %s", str(alpn_protos)) + # Explicitly close the connection now, by returning an empty string. + # See https://www.pyopenssl.org/en/stable/api/ssl.html#OpenSSL.SSL.Context.set_alpn_select_callback # pylint: disable=line-too-long + return b"" + + class HTTPServer(BaseHTTPServer.HTTPServer): """Generic HTTP Server.""" @@ -222,3 +263,16 @@ class HTTP01RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): """ return functools.partial( cls, simple_http_resources=simple_http_resources) + + +class _BaseRequestHandlerWithLogging(socketserver.BaseRequestHandler): + """BaseRequestHandler with logging.""" + + def log_message(self, format, *args): # pylint: disable=redefined-builtin + """Log arbitrary message.""" + logger.debug("%s - - %s", self.client_address[0], format % args) + + def handle(self): + """Handle request.""" + self.log_message("Incoming request") + socketserver.BaseRequestHandler.handle(self) diff --git a/acme/tests/challenges_test.py b/acme/tests/challenges_test.py index adebaffc5..2b44d677d 100644 --- a/acme/tests/challenges_test.py +++ b/acme/tests/challenges_test.py @@ -2,10 +2,13 @@ import unittest import josepy as jose +import OpenSSL import mock import requests from six.moves.urllib import parse as urllib_parse +from acme import errors + import test_util CERT = test_util.load_comparable_cert('cert.pem') @@ -256,30 +259,87 @@ class HTTP01Test(unittest.TestCase): class TLSALPN01ResponseTest(unittest.TestCase): def setUp(self): - from acme.challenges import TLSALPN01Response - self.msg = TLSALPN01Response(key_authorization=u'foo') + from acme.challenges import TLSALPN01 + self.chall = TLSALPN01( + token=jose.b64decode(b'a82d5ff8ef740d12881f6d3c2277ab2e')) + self.domain = u'example.com' + self.domain2 = u'example2.com' + + self.response = self.chall.response(KEY) self.jmsg = { 'resource': 'challenge', 'type': 'tls-alpn-01', - 'keyAuthorization': u'foo', + 'keyAuthorization': self.response.key_authorization, } - from acme.challenges import TLSALPN01 - self.chall = TLSALPN01(token=(b'x' * 16)) - self.response = self.chall.response(KEY) - def test_to_partial_json(self): self.assertEqual({k: v for k, v in self.jmsg.items() if k != 'keyAuthorization'}, - self.msg.to_partial_json()) + self.response.to_partial_json()) def test_from_json(self): from acme.challenges import TLSALPN01Response - self.assertEqual(self.msg, TLSALPN01Response.from_json(self.jmsg)) + self.assertEqual(self.response, TLSALPN01Response.from_json(self.jmsg)) def test_from_json_hashable(self): from acme.challenges import TLSALPN01Response hash(TLSALPN01Response.from_json(self.jmsg)) + def test_gen_verify_cert(self): + key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem') + cert, key2 = self.response.gen_cert(self.domain, key1) + self.assertEqual(key1, key2) + self.assertTrue(self.response.verify_cert(self.domain, cert)) + + def test_gen_verify_cert_gen_key(self): + cert, key = self.response.gen_cert(self.domain) + self.assertTrue(isinstance(key, OpenSSL.crypto.PKey)) + self.assertTrue(self.response.verify_cert(self.domain, cert)) + + def test_verify_bad_cert(self): + self.assertFalse(self.response.verify_cert(self.domain, + test_util.load_cert('cert.pem'))) + + def test_verify_bad_domain(self): + key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem') + cert, key2 = self.response.gen_cert(self.domain, key1) + self.assertEqual(key1, key2) + self.assertFalse(self.response.verify_cert(self.domain2, cert)) + + def test_simple_verify_bad_key_authorization(self): + key2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem')) + self.response.simple_verify(self.chall, "local", key2.public_key()) + + @mock.patch('acme.challenges.TLSALPN01Response.verify_cert', autospec=True) + def test_simple_verify(self, mock_verify_cert): + mock_verify_cert.return_value = mock.sentinel.verification + self.assertEqual( + mock.sentinel.verification, self.response.simple_verify( + self.chall, self.domain, KEY.public_key(), + cert=mock.sentinel.cert)) + mock_verify_cert.assert_called_once_with( + self.response, self.domain, mock.sentinel.cert) + + @mock.patch('acme.challenges.socket.gethostbyname') + @mock.patch('acme.challenges.crypto_util.probe_sni') + def test_probe_cert(self, mock_probe_sni, mock_gethostbyname): + mock_gethostbyname.return_value = '127.0.0.1' + self.response.probe_cert('foo.com') + mock_gethostbyname.assert_called_once_with('foo.com') + mock_probe_sni.assert_called_once_with( + host='127.0.0.1', port=self.response.PORT, name='foo.com', + alpn_protocols=['acme-tls/1']) + + self.response.probe_cert('foo.com', host='8.8.8.8') + mock_probe_sni.assert_called_with( + host='8.8.8.8', port=mock.ANY, name='foo.com', + alpn_protocols=['acme-tls/1']) + + @mock.patch('acme.challenges.TLSALPN01Response.probe_cert') + def test_simple_verify_false_on_probe_error(self, mock_probe_cert): + mock_probe_cert.side_effect = errors.Error + self.assertFalse(self.response.simple_verify( + self.chall, self.domain, KEY.public_key())) + class TLSALPN01Test(unittest.TestCase): @@ -309,8 +369,13 @@ class TLSALPN01Test(unittest.TestCase): self.assertRaises( jose.DeserializationError, TLSALPN01.from_json, self.jmsg) - def test_validation(self): - self.assertRaises(NotImplementedError, self.msg.validation, KEY) + @mock.patch('acme.challenges.TLSALPN01Response.gen_cert') + def test_validation(self, mock_gen_cert): + mock_gen_cert.return_value = ('cert', 'key') + self.assertEqual(('cert', 'key'), self.msg.validation( + KEY, cert_key=mock.sentinel.cert_key, domain=mock.sentinel.domain)) + mock_gen_cert.assert_called_once_with(key=mock.sentinel.cert_key, + domain=mock.sentinel.domain) class DNSTest(unittest.TestCase): diff --git a/acme/tests/crypto_util_test.py b/acme/tests/crypto_util_test.py index 41640ed60..ff08a5405 100644 --- a/acme/tests/crypto_util_test.py +++ b/acme/tests/crypto_util_test.py @@ -18,7 +18,6 @@ import test_util class SSLSocketAndProbeSNITest(unittest.TestCase): """Tests for acme.crypto_util.SSLSocket/probe_sni.""" - def setUp(self): self.cert = test_util.load_comparable_cert('rsa2048_cert.pem') key = test_util.load_pyopenssl_private_key('rsa2048_key.pem') @@ -32,7 +31,8 @@ class SSLSocketAndProbeSNITest(unittest.TestCase): # six.moves.* | pylint: disable=attribute-defined-outside-init,no-init def server_bind(self): # pylint: disable=missing-docstring - self.socket = SSLSocket(socket.socket(), certs=certs) + self.socket = SSLSocket(socket.socket(), + certs) socketserver.TCPServer.server_bind(self) self.server = _TestServer(('', 0), socketserver.BaseRequestHandler) @@ -73,6 +73,18 @@ class SSLSocketAndProbeSNITest(unittest.TestCase): socket.setdefaulttimeout(original_timeout) +class SSLSocketTest(unittest.TestCase): + """Tests for acme.crypto_util.SSLSocket.""" + + def test_ssl_socket_invalid_arguments(self): + from acme.crypto_util import SSLSocket + with self.assertRaises(ValueError): + _ = SSLSocket(None, {'sni': ('key', 'cert')}, + cert_selection=lambda _: None) + with self.assertRaises(ValueError): + _ = SSLSocket(None) + + class PyOpenSSLCertOrReqAllNamesTest(unittest.TestCase): """Test for acme.crypto_util._pyopenssl_cert_or_req_all_names.""" diff --git a/acme/tests/standalone_test.py b/acme/tests/standalone_test.py index 83ced12b0..e2817b29c 100644 --- a/acme/tests/standalone_test.py +++ b/acme/tests/standalone_test.py @@ -10,7 +10,10 @@ from six.moves import http_client # pylint: disable=import-error from six.moves import socketserver # type: ignore # pylint: disable=import-error from acme import challenges +from acme import crypto_util +from acme import errors from acme.magic_typing import Set # pylint: disable=unused-import, no-name-in-module + import test_util @@ -84,6 +87,59 @@ class HTTP01ServerTest(unittest.TestCase): self.assertFalse(self._test_http01(add=False)) +@unittest.skipIf(not challenges.TLSALPN01.is_supported(), "pyOpenSSL too old") +class TLSALPN01ServerTest(unittest.TestCase): + """Test for acme.standalone.TLSALPN01Server.""" + + def setUp(self): + self.certs = {b'localhost': ( + test_util.load_pyopenssl_private_key('rsa2048_key.pem'), + test_util.load_cert('rsa2048_cert.pem'), + )} + # Use different certificate for challenge. + self.challenge_certs = {b'localhost': ( + test_util.load_pyopenssl_private_key('rsa1024_key.pem'), + test_util.load_cert('rsa1024_cert.pem'), + )} + from acme.standalone import TLSALPN01Server + self.server = TLSALPN01Server(("localhost", 0), certs=self.certs, + challenge_certs=self.challenge_certs) + # pylint: disable=no-member + self.thread = threading.Thread(target=self.server.serve_forever) + self.thread.start() + + def tearDown(self): + self.server.shutdown() # pylint: disable=no-member + self.thread.join() + + # TODO: This is not implemented yet, see comments in standalone.py + # def test_certs(self): + # host, port = self.server.socket.getsockname()[:2] + # cert = crypto_util.probe_sni( + # b'localhost', host=host, port=port, timeout=1) + # # Expect normal cert when connecting without ALPN. + # self.assertEqual(jose.ComparableX509(cert), + # jose.ComparableX509(self.certs[b'localhost'][1])) + + def test_challenge_certs(self): + host, port = self.server.socket.getsockname()[:2] + cert = crypto_util.probe_sni( + b'localhost', host=host, port=port, timeout=1, + alpn_protocols=[b"acme-tls/1"]) + # Expect challenge cert when connecting with ALPN. + self.assertEqual( + jose.ComparableX509(cert), + jose.ComparableX509(self.challenge_certs[b'localhost'][1]) + ) + + def test_bad_alpn(self): + host, port = self.server.socket.getsockname()[:2] + with self.assertRaises(errors.Error): + crypto_util.probe_sni( + b'localhost', host=host, port=port, timeout=1, + alpn_protocols=[b"bad-alpn"]) + + class BaseDualNetworkedServersTest(unittest.TestCase): """Test for acme.standalone.BaseDualNetworkedServers.""" @@ -138,7 +194,6 @@ class BaseDualNetworkedServersTest(unittest.TestCase): class HTTP01DualNetworkedServersTest(unittest.TestCase): """Tests for acme.standalone.HTTP01DualNetworkedServers.""" - def setUp(self): self.account_key = jose.JWK.load( test_util.load_vector('rsa1024_key.pem')) diff --git a/acme/tests/testdata/README b/acme/tests/testdata/README index dfe3f5405..d65cc3018 100644 --- a/acme/tests/testdata/README +++ b/acme/tests/testdata/README @@ -10,6 +10,8 @@ and for the CSR: openssl req -key rsa2048_key.pem -new -subj '/CN=example.com' -outform DER > csr.der -and for the certificate: +and for the certificates: - openssl req -key rsa2047_key.pem -new -subj '/CN=example.com' -x509 -outform DER > cert.der + openssl req -key rsa2048_key.pem -new -subj '/CN=example.com' -x509 -outform DER > cert.der + openssl req -key rsa2048_key.pem -new -subj '/CN=example.com' -x509 > rsa2048_cert.pem + openssl req -key rsa1024_key.pem -new -subj '/CN=example.com' -x509 > rsa1024_cert.pem diff --git a/acme/tests/testdata/rsa1024_cert.pem b/acme/tests/testdata/rsa1024_cert.pem new file mode 100644 index 000000000..1b7912181 --- /dev/null +++ b/acme/tests/testdata/rsa1024_cert.pem @@ -0,0 +1,13 @@ +-----BEGIN CERTIFICATE----- +MIIB/TCCAWagAwIBAgIJAOyRIBs3QT8QMA0GCSqGSIb3DQEBCwUAMBYxFDASBgNV +BAMMC2V4YW1wbGUuY29tMB4XDTE4MDQyMzEwMzE0NFoXDTE4MDUyMzEwMzE0NFow +FjEUMBIGA1UEAwwLZXhhbXBsZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJ +AoGBAJqJ87R8aVwByONxgQA9hwgvQd/QqI1r1UInXhEF2VnEtZGtUWLi100IpIqr +Mq4qusDwNZ3g8cUPtSkvJGs89djoajMDIJP7lQUEKUYnYrI0q755Tr/DgLWSk7iW +l5ezym0VzWUD0/xXUz8yRbNMTjTac80rS5SZk2ja2wWkYlRJAgMBAAGjUzBRMB0G +A1UdDgQWBBSsaX0IVZ4XXwdeffVAbG7gnxSYjTAfBgNVHSMEGDAWgBSsaX0IVZ4X +XwdeffVAbG7gnxSYjTAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4GB +ADe7SVmvGH2nkwVfONk8TauRUDkePN1CJZKFb2zW1uO9ANJ2v5Arm/OQp0BG/xnI +Djw/aLTNVESF89oe15dkrUErtcaF413MC1Ld5lTCaJLHLGqDKY69e02YwRuxW7jY +qarpt7k7aR5FbcfO5r4V/FK/Gvp4Dmoky8uap7SJIW6x +-----END CERTIFICATE----- |