| | """ |
| | requests.auth |
| | ~~~~~~~~~~~~~ |
| | |
| | This module contains the authentication handlers for Requests. |
| | """ |
| |
|
| | import hashlib |
| | import os |
| | import re |
| | import threading |
| | import time |
| | import warnings |
| | from base64 import b64encode |
| |
|
| | from ._internal_utils import to_native_string |
| | from .compat import basestring, str, urlparse |
| | from .cookies import extract_cookies_to_jar |
| | from .utils import parse_dict_header |
| |
|
| | CONTENT_TYPE_FORM_URLENCODED = "application/x-www-form-urlencoded" |
| | CONTENT_TYPE_MULTI_PART = "multipart/form-data" |
| |
|
| |
|
| | def _basic_auth_str(username, password): |
| | """Returns a Basic Auth string.""" |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | if not isinstance(username, basestring): |
| | warnings.warn( |
| | "Non-string usernames will no longer be supported in Requests " |
| | "3.0.0. Please convert the object you've passed in ({!r}) to " |
| | "a string or bytes object in the near future to avoid " |
| | "problems.".format(username), |
| | category=DeprecationWarning, |
| | ) |
| | username = str(username) |
| |
|
| | if not isinstance(password, basestring): |
| | warnings.warn( |
| | "Non-string passwords will no longer be supported in Requests " |
| | "3.0.0. Please convert the object you've passed in ({!r}) to " |
| | "a string or bytes object in the near future to avoid " |
| | "problems.".format(type(password)), |
| | category=DeprecationWarning, |
| | ) |
| | password = str(password) |
| | |
| |
|
| | if isinstance(username, str): |
| | username = username.encode("latin1") |
| |
|
| | if isinstance(password, str): |
| | password = password.encode("latin1") |
| |
|
| | authstr = "Basic " + to_native_string( |
| | b64encode(b":".join((username, password))).strip() |
| | ) |
| |
|
| | return authstr |
| |
|
| |
|
| | class AuthBase: |
| | """Base class that all auth implementations derive from""" |
| |
|
| | def __call__(self, r): |
| | raise NotImplementedError("Auth hooks must be callable.") |
| |
|
| |
|
| | class HTTPBasicAuth(AuthBase): |
| | """Attaches HTTP Basic Authentication to the given Request object.""" |
| |
|
| | def __init__(self, username, password): |
| | self.username = username |
| | self.password = password |
| |
|
| | def __eq__(self, other): |
| | return all( |
| | [ |
| | self.username == getattr(other, "username", None), |
| | self.password == getattr(other, "password", None), |
| | ] |
| | ) |
| |
|
| | def __ne__(self, other): |
| | return not self == other |
| |
|
| | def __call__(self, r): |
| | r.headers["Authorization"] = _basic_auth_str(self.username, self.password) |
| | return r |
| |
|
| |
|
| | class HTTPProxyAuth(HTTPBasicAuth): |
| | """Attaches HTTP Proxy Authentication to a given Request object.""" |
| |
|
| | def __call__(self, r): |
| | r.headers["Proxy-Authorization"] = _basic_auth_str(self.username, self.password) |
| | return r |
| |
|
| |
|
| | class HTTPDigestAuth(AuthBase): |
| | """Attaches HTTP Digest Authentication to the given Request object.""" |
| |
|
| | def __init__(self, username, password): |
| | self.username = username |
| | self.password = password |
| | |
| | self._thread_local = threading.local() |
| |
|
| | def init_per_thread_state(self): |
| | |
| | if not hasattr(self._thread_local, "init"): |
| | self._thread_local.init = True |
| | self._thread_local.last_nonce = "" |
| | self._thread_local.nonce_count = 0 |
| | self._thread_local.chal = {} |
| | self._thread_local.pos = None |
| | self._thread_local.num_401_calls = None |
| |
|
| | def build_digest_header(self, method, url): |
| | """ |
| | :rtype: str |
| | """ |
| |
|
| | realm = self._thread_local.chal["realm"] |
| | nonce = self._thread_local.chal["nonce"] |
| | qop = self._thread_local.chal.get("qop") |
| | algorithm = self._thread_local.chal.get("algorithm") |
| | opaque = self._thread_local.chal.get("opaque") |
| | hash_utf8 = None |
| |
|
| | if algorithm is None: |
| | _algorithm = "MD5" |
| | else: |
| | _algorithm = algorithm.upper() |
| | |
| | if _algorithm == "MD5" or _algorithm == "MD5-SESS": |
| |
|
| | def md5_utf8(x): |
| | if isinstance(x, str): |
| | x = x.encode("utf-8") |
| | return hashlib.md5(x).hexdigest() |
| |
|
| | hash_utf8 = md5_utf8 |
| | elif _algorithm == "SHA": |
| |
|
| | def sha_utf8(x): |
| | if isinstance(x, str): |
| | x = x.encode("utf-8") |
| | return hashlib.sha1(x).hexdigest() |
| |
|
| | hash_utf8 = sha_utf8 |
| | elif _algorithm == "SHA-256": |
| |
|
| | def sha256_utf8(x): |
| | if isinstance(x, str): |
| | x = x.encode("utf-8") |
| | return hashlib.sha256(x).hexdigest() |
| |
|
| | hash_utf8 = sha256_utf8 |
| | elif _algorithm == "SHA-512": |
| |
|
| | def sha512_utf8(x): |
| | if isinstance(x, str): |
| | x = x.encode("utf-8") |
| | return hashlib.sha512(x).hexdigest() |
| |
|
| | hash_utf8 = sha512_utf8 |
| |
|
| | KD = lambda s, d: hash_utf8(f"{s}:{d}") |
| |
|
| | if hash_utf8 is None: |
| | return None |
| |
|
| | |
| | entdig = None |
| | p_parsed = urlparse(url) |
| | |
| | path = p_parsed.path or "/" |
| | if p_parsed.query: |
| | path += f"?{p_parsed.query}" |
| |
|
| | A1 = f"{self.username}:{realm}:{self.password}" |
| | A2 = f"{method}:{path}" |
| |
|
| | HA1 = hash_utf8(A1) |
| | HA2 = hash_utf8(A2) |
| |
|
| | if nonce == self._thread_local.last_nonce: |
| | self._thread_local.nonce_count += 1 |
| | else: |
| | self._thread_local.nonce_count = 1 |
| | ncvalue = f"{self._thread_local.nonce_count:08x}" |
| | s = str(self._thread_local.nonce_count).encode("utf-8") |
| | s += nonce.encode("utf-8") |
| | s += time.ctime().encode("utf-8") |
| | s += os.urandom(8) |
| |
|
| | cnonce = hashlib.sha1(s).hexdigest()[:16] |
| | if _algorithm == "MD5-SESS": |
| | HA1 = hash_utf8(f"{HA1}:{nonce}:{cnonce}") |
| |
|
| | if not qop: |
| | respdig = KD(HA1, f"{nonce}:{HA2}") |
| | elif qop == "auth" or "auth" in qop.split(","): |
| | noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}" |
| | respdig = KD(HA1, noncebit) |
| | else: |
| | |
| | return None |
| |
|
| | self._thread_local.last_nonce = nonce |
| |
|
| | |
| | base = ( |
| | f'username="{self.username}", realm="{realm}", nonce="{nonce}", ' |
| | f'uri="{path}", response="{respdig}"' |
| | ) |
| | if opaque: |
| | base += f', opaque="{opaque}"' |
| | if algorithm: |
| | base += f', algorithm="{algorithm}"' |
| | if entdig: |
| | base += f', digest="{entdig}"' |
| | if qop: |
| | base += f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"' |
| |
|
| | return f"Digest {base}" |
| |
|
| | def handle_redirect(self, r, **kwargs): |
| | """Reset num_401_calls counter on redirects.""" |
| | if r.is_redirect: |
| | self._thread_local.num_401_calls = 1 |
| |
|
| | def handle_401(self, r, **kwargs): |
| | """ |
| | Takes the given response and tries digest-auth, if needed. |
| | |
| | :rtype: requests.Response |
| | """ |
| |
|
| | |
| | |
| | if not 400 <= r.status_code < 500: |
| | self._thread_local.num_401_calls = 1 |
| | return r |
| |
|
| | if self._thread_local.pos is not None: |
| | |
| | |
| | r.request.body.seek(self._thread_local.pos) |
| | s_auth = r.headers.get("www-authenticate", "") |
| |
|
| | if "digest" in s_auth.lower() and self._thread_local.num_401_calls < 2: |
| |
|
| | self._thread_local.num_401_calls += 1 |
| | pat = re.compile(r"digest ", flags=re.IGNORECASE) |
| | self._thread_local.chal = parse_dict_header(pat.sub("", s_auth, count=1)) |
| |
|
| | |
| | |
| | r.content |
| | r.close() |
| | prep = r.request.copy() |
| | extract_cookies_to_jar(prep._cookies, r.request, r.raw) |
| | prep.prepare_cookies(prep._cookies) |
| |
|
| | prep.headers["Authorization"] = self.build_digest_header( |
| | prep.method, prep.url |
| | ) |
| | _r = r.connection.send(prep, **kwargs) |
| | _r.history.append(r) |
| | _r.request = prep |
| |
|
| | return _r |
| |
|
| | self._thread_local.num_401_calls = 1 |
| | return r |
| |
|
| | def __call__(self, r): |
| | |
| | self.init_per_thread_state() |
| | |
| | if self._thread_local.last_nonce: |
| | r.headers["Authorization"] = self.build_digest_header(r.method, r.url) |
| | try: |
| | self._thread_local.pos = r.body.tell() |
| | except AttributeError: |
| | |
| | |
| | |
| | |
| | self._thread_local.pos = None |
| | r.register_hook("response", self.handle_401) |
| | r.register_hook("response", self.handle_redirect) |
| | self._thread_local.num_401_calls = 1 |
| |
|
| | return r |
| |
|
| | def __eq__(self, other): |
| | return all( |
| | [ |
| | self.username == getattr(other, "username", None), |
| | self.password == getattr(other, "password", None), |
| | ] |
| | ) |
| |
|
| | def __ne__(self, other): |
| | return not self == other |
| |
|