Spaces:
Runtime error
Runtime error
| """ | |
| requests.auth | |
| ~~~~~~~~~~~~~ | |
| This module contains the authentication handlers for Requests. | |
| """ | |
| import hashlib | |
| import os | |
| import re | |
| import threading | |
| import time | |
| import warnings | |
| from base64 import b64encode | |
| from ._internal_utils import to_native_string | |
| from .compat import basestring, str, urlparse | |
| from .cookies import extract_cookies_to_jar | |
| from .utils import parse_dict_header | |
| CONTENT_TYPE_FORM_URLENCODED = "application/x-www-form-urlencoded" | |
| CONTENT_TYPE_MULTI_PART = "multipart/form-data" | |
| def _basic_auth_str(username, password): | |
| """Returns a Basic Auth string.""" | |
| # "I want us to put a big-ol' comment on top of it that | |
| # says that this behaviour is dumb but we need to preserve | |
| # it because people are relying on it." | |
| # - Lukasa | |
| # | |
| # These are here solely to maintain backwards compatibility | |
| # for things like ints. This will be removed in 3.0.0. | |
| if not isinstance(username, basestring): | |
| warnings.warn( | |
| "Non-string usernames will no longer be supported in Requests " | |
| "3.0.0. Please convert the object you've passed in ({!r}) to " | |
| "a string or bytes object in the near future to avoid " | |
| "problems.".format(username), | |
| category=DeprecationWarning, | |
| ) | |
| username = str(username) | |
| if not isinstance(password, basestring): | |
| warnings.warn( | |
| "Non-string passwords will no longer be supported in Requests " | |
| "3.0.0. Please convert the object you've passed in ({!r}) to " | |
| "a string or bytes object in the near future to avoid " | |
| "problems.".format(type(password)), | |
| category=DeprecationWarning, | |
| ) | |
| password = str(password) | |
| # -- End Removal -- | |
| if isinstance(username, str): | |
| username = username.encode("latin1") | |
| if isinstance(password, str): | |
| password = password.encode("latin1") | |
| authstr = "Basic " + to_native_string( | |
| b64encode(b":".join((username, password))).strip() | |
| ) | |
| return authstr | |
| class AuthBase: | |
| """Base class that all auth implementations derive from""" | |
| def __call__(self, r): | |
| raise NotImplementedError("Auth hooks must be callable.") | |
| class HTTPBasicAuth(AuthBase): | |
| """Attaches HTTP Basic Authentication to the given Request object.""" | |
| def __init__(self, username, password): | |
| self.username = username | |
| self.password = password | |
| def __eq__(self, other): | |
| return all( | |
| [ | |
| self.username == getattr(other, "username", None), | |
| self.password == getattr(other, "password", None), | |
| ] | |
| ) | |
| def __ne__(self, other): | |
| return not self == other | |
| def __call__(self, r): | |
| r.headers["Authorization"] = _basic_auth_str(self.username, self.password) | |
| return r | |
| class HTTPProxyAuth(HTTPBasicAuth): | |
| """Attaches HTTP Proxy Authentication to a given Request object.""" | |
| def __call__(self, r): | |
| r.headers["Proxy-Authorization"] = _basic_auth_str(self.username, self.password) | |
| return r | |
| class HTTPDigestAuth(AuthBase): | |
| """Attaches HTTP Digest Authentication to the given Request object.""" | |
| def __init__(self, username, password): | |
| self.username = username | |
| self.password = password | |
| # Keep state in per-thread local storage | |
| self._thread_local = threading.local() | |
| def init_per_thread_state(self): | |
| # Ensure state is initialized just once per-thread | |
| if not hasattr(self._thread_local, "init"): | |
| self._thread_local.init = True | |
| self._thread_local.last_nonce = "" | |
| self._thread_local.nonce_count = 0 | |
| self._thread_local.chal = {} | |
| self._thread_local.pos = None | |
| self._thread_local.num_401_calls = None | |
| def build_digest_header(self, method, url): | |
| """ | |
| :rtype: str | |
| """ | |
| realm = self._thread_local.chal["realm"] | |
| nonce = self._thread_local.chal["nonce"] | |
| qop = self._thread_local.chal.get("qop") | |
| algorithm = self._thread_local.chal.get("algorithm") | |
| opaque = self._thread_local.chal.get("opaque") | |
| hash_utf8 = None | |
| if algorithm is None: | |
| _algorithm = "MD5" | |
| else: | |
| _algorithm = algorithm.upper() | |
| # lambdas assume digest modules are imported at the top level | |
| if _algorithm == "MD5" or _algorithm == "MD5-SESS": | |
| def md5_utf8(x): | |
| if isinstance(x, str): | |
| x = x.encode("utf-8") | |
| return hashlib.md5(x).hexdigest() | |
| hash_utf8 = md5_utf8 | |
| elif _algorithm == "SHA": | |
| def sha_utf8(x): | |
| if isinstance(x, str): | |
| x = x.encode("utf-8") | |
| return hashlib.sha1(x).hexdigest() | |
| hash_utf8 = sha_utf8 | |
| elif _algorithm == "SHA-256": | |
| def sha256_utf8(x): | |
| if isinstance(x, str): | |
| x = x.encode("utf-8") | |
| return hashlib.sha256(x).hexdigest() | |
| hash_utf8 = sha256_utf8 | |
| elif _algorithm == "SHA-512": | |
| def sha512_utf8(x): | |
| if isinstance(x, str): | |
| x = x.encode("utf-8") | |
| return hashlib.sha512(x).hexdigest() | |
| hash_utf8 = sha512_utf8 | |
| KD = lambda s, d: hash_utf8(f"{s}:{d}") # noqa:E731 | |
| if hash_utf8 is None: | |
| return None | |
| # XXX not implemented yet | |
| entdig = None | |
| p_parsed = urlparse(url) | |
| #: path is request-uri defined in RFC 2616 which should not be empty | |
| path = p_parsed.path or "/" | |
| if p_parsed.query: | |
| path += f"?{p_parsed.query}" | |
| A1 = f"{self.username}:{realm}:{self.password}" | |
| A2 = f"{method}:{path}" | |
| HA1 = hash_utf8(A1) | |
| HA2 = hash_utf8(A2) | |
| if nonce == self._thread_local.last_nonce: | |
| self._thread_local.nonce_count += 1 | |
| else: | |
| self._thread_local.nonce_count = 1 | |
| ncvalue = f"{self._thread_local.nonce_count:08x}" | |
| s = str(self._thread_local.nonce_count).encode("utf-8") | |
| s += nonce.encode("utf-8") | |
| s += time.ctime().encode("utf-8") | |
| s += os.urandom(8) | |
| cnonce = hashlib.sha1(s).hexdigest()[:16] | |
| if _algorithm == "MD5-SESS": | |
| HA1 = hash_utf8(f"{HA1}:{nonce}:{cnonce}") | |
| if not qop: | |
| respdig = KD(HA1, f"{nonce}:{HA2}") | |
| elif qop == "auth" or "auth" in qop.split(","): | |
| noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}" | |
| respdig = KD(HA1, noncebit) | |
| else: | |
| # XXX handle auth-int. | |
| return None | |
| self._thread_local.last_nonce = nonce | |
| # XXX should the partial digests be encoded too? | |
| base = ( | |
| f'username="{self.username}", realm="{realm}", nonce="{nonce}", ' | |
| f'uri="{path}", response="{respdig}"' | |
| ) | |
| if opaque: | |
| base += f', opaque="{opaque}"' | |
| if algorithm: | |
| base += f', algorithm="{algorithm}"' | |
| if entdig: | |
| base += f', digest="{entdig}"' | |
| if qop: | |
| base += f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"' | |
| return f"Digest {base}" | |
| def handle_redirect(self, r, **kwargs): | |
| """Reset num_401_calls counter on redirects.""" | |
| if r.is_redirect: | |
| self._thread_local.num_401_calls = 1 | |
| def handle_401(self, r, **kwargs): | |
| """ | |
| Takes the given response and tries digest-auth, if needed. | |
| :rtype: requests.Response | |
| """ | |
| # If response is not 4xx, do not auth | |
| # See https://github.com/psf/requests/issues/3772 | |
| if not 400 <= r.status_code < 500: | |
| self._thread_local.num_401_calls = 1 | |
| return r | |
| if self._thread_local.pos is not None: | |
| # Rewind the file position indicator of the body to where | |
| # it was to resend the request. | |
| r.request.body.seek(self._thread_local.pos) | |
| s_auth = r.headers.get("www-authenticate", "") | |
| if "digest" in s_auth.lower() and self._thread_local.num_401_calls < 2: | |
| self._thread_local.num_401_calls += 1 | |
| pat = re.compile(r"digest ", flags=re.IGNORECASE) | |
| self._thread_local.chal = parse_dict_header(pat.sub("", s_auth, count=1)) | |
| # Consume content and release the original connection | |
| # to allow our new request to reuse the same one. | |
| r.content | |
| r.close() | |
| prep = r.request.copy() | |
| extract_cookies_to_jar(prep._cookies, r.request, r.raw) | |
| prep.prepare_cookies(prep._cookies) | |
| prep.headers["Authorization"] = self.build_digest_header( | |
| prep.method, prep.url | |
| ) | |
| _r = r.connection.send(prep, **kwargs) | |
| _r.history.append(r) | |
| _r.request = prep | |
| return _r | |
| self._thread_local.num_401_calls = 1 | |
| return r | |
| def __call__(self, r): | |
| # Initialize per-thread state, if needed | |
| self.init_per_thread_state() | |
| # If we have a saved nonce, skip the 401 | |
| if self._thread_local.last_nonce: | |
| r.headers["Authorization"] = self.build_digest_header(r.method, r.url) | |
| try: | |
| self._thread_local.pos = r.body.tell() | |
| except AttributeError: | |
| # In the case of HTTPDigestAuth being reused and the body of | |
| # the previous request was a file-like object, pos has the | |
| # file position of the previous body. Ensure it's set to | |
| # None. | |
| self._thread_local.pos = None | |
| r.register_hook("response", self.handle_401) | |
| r.register_hook("response", self.handle_redirect) | |
| self._thread_local.num_401_calls = 1 | |
| return r | |
| def __eq__(self, other): | |
| return all( | |
| [ | |
| self.username == getattr(other, "username", None), | |
| self.password == getattr(other, "password", None), | |
| ] | |
| ) | |
| def __ne__(self, other): | |
| return not self == other | |