|
|
""" |
|
|
TLS with SNI_-support for Python 2. Follow these instructions if you would |
|
|
like to verify TLS certificates in Python 2. Note, the default libraries do |
|
|
*not* do certificate checking; you need to do additional work to validate |
|
|
certificates yourself. |
|
|
|
|
|
This needs the following packages installed: |
|
|
|
|
|
* `pyOpenSSL`_ (tested with 16.0.0) |
|
|
* `cryptography`_ (minimum 1.3.4, from pyopenssl) |
|
|
* `idna`_ (minimum 2.0, from cryptography) |
|
|
|
|
|
However, pyopenssl depends on cryptography, which depends on idna, so while we |
|
|
use all three directly here we end up having relatively few packages required. |
|
|
|
|
|
You can install them with the following command: |
|
|
|
|
|
.. code-block:: bash |
|
|
|
|
|
$ python -m pip install pyopenssl cryptography idna |
|
|
|
|
|
To activate certificate checking, call |
|
|
:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code |
|
|
before you begin making HTTP requests. This can be done in a ``sitecustomize`` |
|
|
module, or at any other time before your application begins using ``urllib3``, |
|
|
like this: |
|
|
|
|
|
.. code-block:: python |
|
|
|
|
|
try: |
|
|
import pip._vendor.urllib3.contrib.pyopenssl as pyopenssl |
|
|
pyopenssl.inject_into_urllib3() |
|
|
except ImportError: |
|
|
pass |
|
|
|
|
|
Now you can use :mod:`urllib3` as you normally would, and it will support SNI |
|
|
when the required modules are installed. |
|
|
|
|
|
Activating this module also has the positive side effect of disabling SSL/TLS |
|
|
compression in Python 2 (see `CRIME attack`_). |
|
|
|
|
|
.. _sni: https://en.wikipedia.org/wiki/Server_Name_Indication |
|
|
.. _crime attack: https://en.wikipedia.org/wiki/CRIME_(security_exploit) |
|
|
.. _pyopenssl: https://www.pyopenssl.org |
|
|
.. _cryptography: https://cryptography.io |
|
|
.. _idna: https://github.com/kjd/idna |
|
|
""" |
|
|
from __future__ import absolute_import |
|
|
|
|
|
import OpenSSL.SSL |
|
|
from cryptography import x509 |
|
|
from cryptography.hazmat.backends.openssl import backend as openssl_backend |
|
|
from cryptography.hazmat.backends.openssl.x509 import _Certificate |
|
|
|
|
|
try: |
|
|
from cryptography.x509 import UnsupportedExtension |
|
|
except ImportError: |
|
|
|
|
|
class UnsupportedExtension(Exception): |
|
|
pass |
|
|
|
|
|
|
|
|
from io import BytesIO |
|
|
from socket import error as SocketError |
|
|
from socket import timeout |
|
|
|
|
|
try: |
|
|
from socket import _fileobject |
|
|
except ImportError: |
|
|
_fileobject = None |
|
|
from ..packages.backports.makefile import backport_makefile |
|
|
|
|
|
import logging |
|
|
import ssl |
|
|
import sys |
|
|
|
|
|
from .. import util |
|
|
from ..packages import six |
|
|
from ..util.ssl_ import PROTOCOL_TLS_CLIENT |
|
|
|
|
|
__all__ = ["inject_into_urllib3", "extract_from_urllib3"] |
|
|
|
|
|
|
|
|
HAS_SNI = True |
|
|
|
|
|
|
|
|
_openssl_versions = { |
|
|
util.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD, |
|
|
PROTOCOL_TLS_CLIENT: OpenSSL.SSL.SSLv23_METHOD, |
|
|
ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD, |
|
|
} |
|
|
|
|
|
if hasattr(ssl, "PROTOCOL_SSLv3") and hasattr(OpenSSL.SSL, "SSLv3_METHOD"): |
|
|
_openssl_versions[ssl.PROTOCOL_SSLv3] = OpenSSL.SSL.SSLv3_METHOD |
|
|
|
|
|
if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"): |
|
|
_openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD |
|
|
|
|
|
if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"): |
|
|
_openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD |
|
|
|
|
|
|
|
|
_stdlib_to_openssl_verify = { |
|
|
ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE, |
|
|
ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER, |
|
|
ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER |
|
|
+ OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT, |
|
|
} |
|
|
_openssl_to_stdlib_verify = dict((v, k) for k, v in _stdlib_to_openssl_verify.items()) |
|
|
|
|
|
|
|
|
SSL_WRITE_BLOCKSIZE = 16384 |
|
|
|
|
|
orig_util_HAS_SNI = util.HAS_SNI |
|
|
orig_util_SSLContext = util.ssl_.SSLContext |
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
def inject_into_urllib3(): |
|
|
"Monkey-patch urllib3 with PyOpenSSL-backed SSL-support." |
|
|
|
|
|
_validate_dependencies_met() |
|
|
|
|
|
util.SSLContext = PyOpenSSLContext |
|
|
util.ssl_.SSLContext = PyOpenSSLContext |
|
|
util.HAS_SNI = HAS_SNI |
|
|
util.ssl_.HAS_SNI = HAS_SNI |
|
|
util.IS_PYOPENSSL = True |
|
|
util.ssl_.IS_PYOPENSSL = True |
|
|
|
|
|
|
|
|
def extract_from_urllib3(): |
|
|
"Undo monkey-patching by :func:`inject_into_urllib3`." |
|
|
|
|
|
util.SSLContext = orig_util_SSLContext |
|
|
util.ssl_.SSLContext = orig_util_SSLContext |
|
|
util.HAS_SNI = orig_util_HAS_SNI |
|
|
util.ssl_.HAS_SNI = orig_util_HAS_SNI |
|
|
util.IS_PYOPENSSL = False |
|
|
util.ssl_.IS_PYOPENSSL = False |
|
|
|
|
|
|
|
|
def _validate_dependencies_met(): |
|
|
""" |
|
|
Verifies that PyOpenSSL's package-level dependencies have been met. |
|
|
Throws `ImportError` if they are not met. |
|
|
""" |
|
|
|
|
|
from cryptography.x509.extensions import Extensions |
|
|
|
|
|
if getattr(Extensions, "get_extension_for_class", None) is None: |
|
|
raise ImportError( |
|
|
"'cryptography' module missing required functionality. " |
|
|
"Try upgrading to v1.3.4 or newer." |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
from OpenSSL.crypto import X509 |
|
|
|
|
|
x509 = X509() |
|
|
if getattr(x509, "_x509", None) is None: |
|
|
raise ImportError( |
|
|
"'pyOpenSSL' module missing required functionality. " |
|
|
"Try upgrading to v0.14 or newer." |
|
|
) |
|
|
|
|
|
|
|
|
def _dnsname_to_stdlib(name): |
|
|
""" |
|
|
Converts a dNSName SubjectAlternativeName field to the form used by the |
|
|
standard library on the given Python version. |
|
|
|
|
|
Cryptography produces a dNSName as a unicode string that was idna-decoded |
|
|
from ASCII bytes. We need to idna-encode that string to get it back, and |
|
|
then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib |
|
|
uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8). |
|
|
|
|
|
If the name cannot be idna-encoded then we return None signalling that |
|
|
the name given should be skipped. |
|
|
""" |
|
|
|
|
|
def idna_encode(name): |
|
|
""" |
|
|
Borrowed wholesale from the Python Cryptography Project. It turns out |
|
|
that we can't just safely call `idna.encode`: it can explode for |
|
|
wildcard names. This avoids that problem. |
|
|
""" |
|
|
from pip._vendor import idna |
|
|
|
|
|
try: |
|
|
for prefix in [u"*.", u"."]: |
|
|
if name.startswith(prefix): |
|
|
name = name[len(prefix) :] |
|
|
return prefix.encode("ascii") + idna.encode(name) |
|
|
return idna.encode(name) |
|
|
except idna.core.IDNAError: |
|
|
return None |
|
|
|
|
|
|
|
|
if ":" in name: |
|
|
return name |
|
|
|
|
|
name = idna_encode(name) |
|
|
if name is None: |
|
|
return None |
|
|
elif sys.version_info >= (3, 0): |
|
|
name = name.decode("utf-8") |
|
|
return name |
|
|
|
|
|
|
|
|
def get_subj_alt_name(peer_cert): |
|
|
""" |
|
|
Given an PyOpenSSL certificate, provides all the subject alternative names. |
|
|
""" |
|
|
|
|
|
if hasattr(peer_cert, "to_cryptography"): |
|
|
cert = peer_cert.to_cryptography() |
|
|
else: |
|
|
|
|
|
|
|
|
cert = _Certificate(openssl_backend, peer_cert._x509) |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value |
|
|
except x509.ExtensionNotFound: |
|
|
|
|
|
return [] |
|
|
except ( |
|
|
x509.DuplicateExtension, |
|
|
UnsupportedExtension, |
|
|
x509.UnsupportedGeneralNameType, |
|
|
UnicodeError, |
|
|
) as e: |
|
|
|
|
|
|
|
|
log.warning( |
|
|
"A problem was encountered with the certificate that prevented " |
|
|
"urllib3 from finding the SubjectAlternativeName field. This can " |
|
|
"affect certificate validation. The error was %s", |
|
|
e, |
|
|
) |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
names = [ |
|
|
("DNS", name) |
|
|
for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName)) |
|
|
if name is not None |
|
|
] |
|
|
names.extend( |
|
|
("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress) |
|
|
) |
|
|
|
|
|
return names |
|
|
|
|
|
|
|
|
class WrappedSocket(object): |
|
|
"""API-compatibility wrapper for Python OpenSSL's Connection-class. |
|
|
|
|
|
Note: _makefile_refs, _drop() and _reuse() are needed for the garbage |
|
|
collector of pypy. |
|
|
""" |
|
|
|
|
|
def __init__(self, connection, socket, suppress_ragged_eofs=True): |
|
|
self.connection = connection |
|
|
self.socket = socket |
|
|
self.suppress_ragged_eofs = suppress_ragged_eofs |
|
|
self._makefile_refs = 0 |
|
|
self._closed = False |
|
|
|
|
|
def fileno(self): |
|
|
return self.socket.fileno() |
|
|
|
|
|
|
|
|
def _decref_socketios(self): |
|
|
if self._makefile_refs > 0: |
|
|
self._makefile_refs -= 1 |
|
|
if self._closed: |
|
|
self.close() |
|
|
|
|
|
def recv(self, *args, **kwargs): |
|
|
try: |
|
|
data = self.connection.recv(*args, **kwargs) |
|
|
except OpenSSL.SSL.SysCallError as e: |
|
|
if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): |
|
|
return b"" |
|
|
else: |
|
|
raise SocketError(str(e)) |
|
|
except OpenSSL.SSL.ZeroReturnError: |
|
|
if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: |
|
|
return b"" |
|
|
else: |
|
|
raise |
|
|
except OpenSSL.SSL.WantReadError: |
|
|
if not util.wait_for_read(self.socket, self.socket.gettimeout()): |
|
|
raise timeout("The read operation timed out") |
|
|
else: |
|
|
return self.recv(*args, **kwargs) |
|
|
|
|
|
|
|
|
except OpenSSL.SSL.Error as e: |
|
|
raise ssl.SSLError("read error: %r" % e) |
|
|
else: |
|
|
return data |
|
|
|
|
|
def recv_into(self, *args, **kwargs): |
|
|
try: |
|
|
return self.connection.recv_into(*args, **kwargs) |
|
|
except OpenSSL.SSL.SysCallError as e: |
|
|
if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): |
|
|
return 0 |
|
|
else: |
|
|
raise SocketError(str(e)) |
|
|
except OpenSSL.SSL.ZeroReturnError: |
|
|
if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: |
|
|
return 0 |
|
|
else: |
|
|
raise |
|
|
except OpenSSL.SSL.WantReadError: |
|
|
if not util.wait_for_read(self.socket, self.socket.gettimeout()): |
|
|
raise timeout("The read operation timed out") |
|
|
else: |
|
|
return self.recv_into(*args, **kwargs) |
|
|
|
|
|
|
|
|
except OpenSSL.SSL.Error as e: |
|
|
raise ssl.SSLError("read error: %r" % e) |
|
|
|
|
|
def settimeout(self, timeout): |
|
|
return self.socket.settimeout(timeout) |
|
|
|
|
|
def _send_until_done(self, data): |
|
|
while True: |
|
|
try: |
|
|
return self.connection.send(data) |
|
|
except OpenSSL.SSL.WantWriteError: |
|
|
if not util.wait_for_write(self.socket, self.socket.gettimeout()): |
|
|
raise timeout() |
|
|
continue |
|
|
except OpenSSL.SSL.SysCallError as e: |
|
|
raise SocketError(str(e)) |
|
|
|
|
|
def sendall(self, data): |
|
|
total_sent = 0 |
|
|
while total_sent < len(data): |
|
|
sent = self._send_until_done( |
|
|
data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE] |
|
|
) |
|
|
total_sent += sent |
|
|
|
|
|
def shutdown(self): |
|
|
|
|
|
self.connection.shutdown() |
|
|
|
|
|
def close(self): |
|
|
if self._makefile_refs < 1: |
|
|
try: |
|
|
self._closed = True |
|
|
return self.connection.close() |
|
|
except OpenSSL.SSL.Error: |
|
|
return |
|
|
else: |
|
|
self._makefile_refs -= 1 |
|
|
|
|
|
def getpeercert(self, binary_form=False): |
|
|
x509 = self.connection.get_peer_certificate() |
|
|
|
|
|
if not x509: |
|
|
return x509 |
|
|
|
|
|
if binary_form: |
|
|
return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509) |
|
|
|
|
|
return { |
|
|
"subject": ((("commonName", x509.get_subject().CN),),), |
|
|
"subjectAltName": get_subj_alt_name(x509), |
|
|
} |
|
|
|
|
|
def version(self): |
|
|
return self.connection.get_protocol_version_name() |
|
|
|
|
|
def _reuse(self): |
|
|
self._makefile_refs += 1 |
|
|
|
|
|
def _drop(self): |
|
|
if self._makefile_refs < 1: |
|
|
self.close() |
|
|
else: |
|
|
self._makefile_refs -= 1 |
|
|
|
|
|
|
|
|
if _fileobject: |
|
|
|
|
|
def makefile(self, mode, bufsize=-1): |
|
|
self._makefile_refs += 1 |
|
|
return _fileobject(self, mode, bufsize, close=True) |
|
|
|
|
|
|
|
|
else: |
|
|
makefile = backport_makefile |
|
|
|
|
|
WrappedSocket.makefile = makefile |
|
|
|
|
|
|
|
|
class PyOpenSSLContext(object): |
|
|
""" |
|
|
I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible |
|
|
for translating the interface of the standard library ``SSLContext`` object |
|
|
to calls into PyOpenSSL. |
|
|
""" |
|
|
|
|
|
def __init__(self, protocol): |
|
|
self.protocol = _openssl_versions[protocol] |
|
|
self._ctx = OpenSSL.SSL.Context(self.protocol) |
|
|
self._options = 0 |
|
|
self.check_hostname = False |
|
|
|
|
|
@property |
|
|
def options(self): |
|
|
return self._options |
|
|
|
|
|
@options.setter |
|
|
def options(self, value): |
|
|
self._options = value |
|
|
self._ctx.set_options(value) |
|
|
|
|
|
@property |
|
|
def verify_mode(self): |
|
|
return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()] |
|
|
|
|
|
@verify_mode.setter |
|
|
def verify_mode(self, value): |
|
|
self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback) |
|
|
|
|
|
def set_default_verify_paths(self): |
|
|
self._ctx.set_default_verify_paths() |
|
|
|
|
|
def set_ciphers(self, ciphers): |
|
|
if isinstance(ciphers, six.text_type): |
|
|
ciphers = ciphers.encode("utf-8") |
|
|
self._ctx.set_cipher_list(ciphers) |
|
|
|
|
|
def load_verify_locations(self, cafile=None, capath=None, cadata=None): |
|
|
if cafile is not None: |
|
|
cafile = cafile.encode("utf-8") |
|
|
if capath is not None: |
|
|
capath = capath.encode("utf-8") |
|
|
try: |
|
|
self._ctx.load_verify_locations(cafile, capath) |
|
|
if cadata is not None: |
|
|
self._ctx.load_verify_locations(BytesIO(cadata)) |
|
|
except OpenSSL.SSL.Error as e: |
|
|
raise ssl.SSLError("unable to load trusted certificates: %r" % e) |
|
|
|
|
|
def load_cert_chain(self, certfile, keyfile=None, password=None): |
|
|
self._ctx.use_certificate_chain_file(certfile) |
|
|
if password is not None: |
|
|
if not isinstance(password, six.binary_type): |
|
|
password = password.encode("utf-8") |
|
|
self._ctx.set_passwd_cb(lambda *_: password) |
|
|
self._ctx.use_privatekey_file(keyfile or certfile) |
|
|
|
|
|
def set_alpn_protocols(self, protocols): |
|
|
protocols = [six.ensure_binary(p) for p in protocols] |
|
|
return self._ctx.set_alpn_protos(protocols) |
|
|
|
|
|
def wrap_socket( |
|
|
self, |
|
|
sock, |
|
|
server_side=False, |
|
|
do_handshake_on_connect=True, |
|
|
suppress_ragged_eofs=True, |
|
|
server_hostname=None, |
|
|
): |
|
|
cnx = OpenSSL.SSL.Connection(self._ctx, sock) |
|
|
|
|
|
if isinstance(server_hostname, six.text_type): |
|
|
server_hostname = server_hostname.encode("utf-8") |
|
|
|
|
|
if server_hostname is not None: |
|
|
cnx.set_tlsext_host_name(server_hostname) |
|
|
|
|
|
cnx.set_connect_state() |
|
|
|
|
|
while True: |
|
|
try: |
|
|
cnx.do_handshake() |
|
|
except OpenSSL.SSL.WantReadError: |
|
|
if not util.wait_for_read(sock, sock.gettimeout()): |
|
|
raise timeout("select timed out") |
|
|
continue |
|
|
except OpenSSL.SSL.Error as e: |
|
|
raise ssl.SSLError("bad handshake: %r" % e) |
|
|
break |
|
|
|
|
|
return WrappedSocket(cnx, sock) |
|
|
|
|
|
|
|
|
def _verify_callback(cnx, x509, err_no, err_depth, return_code): |
|
|
return err_no == 0 |
|
|
|