| |
| |
| |
|
|
| """ |
| The httplib2 algorithms ported for use with requests. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import calendar |
| import logging |
| import re |
| import time |
| import weakref |
| from email.utils import parsedate_tz |
| from typing import TYPE_CHECKING, Collection, Mapping |
|
|
| from pip._vendor.requests.structures import CaseInsensitiveDict |
|
|
| from pip._vendor.cachecontrol.cache import DictCache, SeparateBodyBaseCache |
| from pip._vendor.cachecontrol.serialize import Serializer |
|
|
| if TYPE_CHECKING: |
| from typing import Literal |
|
|
| from pip._vendor.requests import PreparedRequest |
| from pip._vendor.urllib3 import HTTPResponse |
|
|
| from pip._vendor.cachecontrol.cache import BaseCache |
|
|
| logger = logging.getLogger(__name__) |
|
|
| URI = re.compile(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?") |
|
|
| PERMANENT_REDIRECT_STATUSES = (301, 308) |
|
|
|
|
| def parse_uri(uri: str) -> tuple[str, str, str, str, str]: |
| """Parses a URI using the regex given in Appendix B of RFC 3986. |
| |
| (scheme, authority, path, query, fragment) = parse_uri(uri) |
| """ |
| match = URI.match(uri) |
| assert match is not None |
| groups = match.groups() |
| return (groups[1], groups[3], groups[4], groups[6], groups[8]) |
|
|
|
|
| class CacheController: |
| """An interface to see if request should cached or not.""" |
|
|
| def __init__( |
| self, |
| cache: BaseCache | None = None, |
| cache_etags: bool = True, |
| serializer: Serializer | None = None, |
| status_codes: Collection[int] | None = None, |
| ): |
| self.cache = DictCache() if cache is None else cache |
| self.cache_etags = cache_etags |
| self.serializer = serializer or Serializer() |
| self.cacheable_status_codes = status_codes or (200, 203, 300, 301, 308) |
|
|
| @classmethod |
| def _urlnorm(cls, uri: str) -> str: |
| """Normalize the URL to create a safe key for the cache""" |
| (scheme, authority, path, query, fragment) = parse_uri(uri) |
| if not scheme or not authority: |
| raise Exception("Only absolute URIs are allowed. uri = %s" % uri) |
|
|
| scheme = scheme.lower() |
| authority = authority.lower() |
|
|
| if not path: |
| path = "/" |
|
|
| |
| |
| request_uri = query and "?".join([path, query]) or path |
| defrag_uri = scheme + "://" + authority + request_uri |
|
|
| return defrag_uri |
|
|
| @classmethod |
| def cache_url(cls, uri: str) -> str: |
| return cls._urlnorm(uri) |
|
|
| def parse_cache_control(self, headers: Mapping[str, str]) -> dict[str, int | None]: |
| known_directives = { |
| |
| "max-age": (int, True), |
| "max-stale": (int, False), |
| "min-fresh": (int, True), |
| "no-cache": (None, False), |
| "no-store": (None, False), |
| "no-transform": (None, False), |
| "only-if-cached": (None, False), |
| "must-revalidate": (None, False), |
| "public": (None, False), |
| "private": (None, False), |
| "proxy-revalidate": (None, False), |
| "s-maxage": (int, True), |
| } |
|
|
| cc_headers = headers.get("cache-control", headers.get("Cache-Control", "")) |
|
|
| retval: dict[str, int | None] = {} |
|
|
| for cc_directive in cc_headers.split(","): |
| if not cc_directive.strip(): |
| continue |
|
|
| parts = cc_directive.split("=", 1) |
| directive = parts[0].strip() |
|
|
| try: |
| typ, required = known_directives[directive] |
| except KeyError: |
| logger.debug("Ignoring unknown cache-control directive: %s", directive) |
| continue |
|
|
| if not typ or not required: |
| retval[directive] = None |
| if typ: |
| try: |
| retval[directive] = typ(parts[1].strip()) |
| except IndexError: |
| if required: |
| logger.debug( |
| "Missing value for cache-control " "directive: %s", |
| directive, |
| ) |
| except ValueError: |
| logger.debug( |
| "Invalid value for cache-control directive " "%s, must be %s", |
| directive, |
| typ.__name__, |
| ) |
|
|
| return retval |
|
|
| def _load_from_cache(self, request: PreparedRequest) -> HTTPResponse | None: |
| """ |
| Load a cached response, or return None if it's not available. |
| """ |
| |
| |
| if "Range" in request.headers: |
| return None |
|
|
| cache_url = request.url |
| assert cache_url is not None |
| cache_data = self.cache.get(cache_url) |
| if cache_data is None: |
| logger.debug("No cache entry available") |
| return None |
|
|
| if isinstance(self.cache, SeparateBodyBaseCache): |
| body_file = self.cache.get_body(cache_url) |
| else: |
| body_file = None |
|
|
| result = self.serializer.loads(request, cache_data, body_file) |
| if result is None: |
| logger.warning("Cache entry deserialization failed, entry ignored") |
| return result |
|
|
| def cached_request(self, request: PreparedRequest) -> HTTPResponse | Literal[False]: |
| """ |
| Return a cached response if it exists in the cache, otherwise |
| return False. |
| """ |
| assert request.url is not None |
| cache_url = self.cache_url(request.url) |
| logger.debug('Looking up "%s" in the cache', cache_url) |
| cc = self.parse_cache_control(request.headers) |
|
|
| |
| if "no-cache" in cc: |
| logger.debug('Request header has "no-cache", cache bypassed') |
| return False |
|
|
| if "max-age" in cc and cc["max-age"] == 0: |
| logger.debug('Request header has "max_age" as 0, cache bypassed') |
| return False |
|
|
| |
| resp = self._load_from_cache(request) |
| if not resp: |
| return False |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if int(resp.status) in PERMANENT_REDIRECT_STATUSES: |
| msg = ( |
| "Returning cached permanent redirect response " |
| "(ignoring date and etag information)" |
| ) |
| logger.debug(msg) |
| return resp |
|
|
| headers: CaseInsensitiveDict[str] = CaseInsensitiveDict(resp.headers) |
| if not headers or "date" not in headers: |
| if "etag" not in headers: |
| |
| |
| logger.debug("Purging cached response: no date or etag") |
| self.cache.delete(cache_url) |
| logger.debug("Ignoring cached response: no date") |
| return False |
|
|
| now = time.time() |
| time_tuple = parsedate_tz(headers["date"]) |
| assert time_tuple is not None |
| date = calendar.timegm(time_tuple[:6]) |
| current_age = max(0, now - date) |
| logger.debug("Current age based on date: %i", current_age) |
|
|
| |
| |
| |
| |
| resp_cc = self.parse_cache_control(headers) |
|
|
| |
| freshness_lifetime = 0 |
|
|
| |
| max_age = resp_cc.get("max-age") |
| if max_age is not None: |
| freshness_lifetime = max_age |
| logger.debug("Freshness lifetime from max-age: %i", freshness_lifetime) |
|
|
| |
| elif "expires" in headers: |
| expires = parsedate_tz(headers["expires"]) |
| if expires is not None: |
| expire_time = calendar.timegm(expires[:6]) - date |
| freshness_lifetime = max(0, expire_time) |
| logger.debug("Freshness lifetime from expires: %i", freshness_lifetime) |
|
|
| |
| |
| max_age = cc.get("max-age") |
| if max_age is not None: |
| freshness_lifetime = max_age |
| logger.debug( |
| "Freshness lifetime from request max-age: %i", freshness_lifetime |
| ) |
|
|
| min_fresh = cc.get("min-fresh") |
| if min_fresh is not None: |
| |
| current_age += min_fresh |
| logger.debug("Adjusted current age from min-fresh: %i", current_age) |
|
|
| |
| if freshness_lifetime > current_age: |
| logger.debug('The response is "fresh", returning cached response') |
| logger.debug("%i > %i", freshness_lifetime, current_age) |
| return resp |
|
|
| |
| if "etag" not in headers: |
| logger.debug('The cached response is "stale" with no etag, purging') |
| self.cache.delete(cache_url) |
|
|
| |
| return False |
|
|
| def conditional_headers(self, request: PreparedRequest) -> dict[str, str]: |
| resp = self._load_from_cache(request) |
| new_headers = {} |
|
|
| if resp: |
| headers: CaseInsensitiveDict[str] = CaseInsensitiveDict(resp.headers) |
|
|
| if "etag" in headers: |
| new_headers["If-None-Match"] = headers["ETag"] |
|
|
| if "last-modified" in headers: |
| new_headers["If-Modified-Since"] = headers["Last-Modified"] |
|
|
| return new_headers |
|
|
| def _cache_set( |
| self, |
| cache_url: str, |
| request: PreparedRequest, |
| response: HTTPResponse, |
| body: bytes | None = None, |
| expires_time: int | None = None, |
| ) -> None: |
| """ |
| Store the data in the cache. |
| """ |
| if isinstance(self.cache, SeparateBodyBaseCache): |
| |
| |
| self.cache.set( |
| cache_url, |
| self.serializer.dumps(request, response, b""), |
| expires=expires_time, |
| ) |
| |
| |
| if body is not None: |
| self.cache.set_body(cache_url, body) |
| else: |
| self.cache.set( |
| cache_url, |
| self.serializer.dumps(request, response, body), |
| expires=expires_time, |
| ) |
|
|
| def cache_response( |
| self, |
| request: PreparedRequest, |
| response_or_ref: HTTPResponse | weakref.ReferenceType[HTTPResponse], |
| body: bytes | None = None, |
| status_codes: Collection[int] | None = None, |
| ) -> None: |
| """ |
| Algorithm for caching requests. |
| |
| This assumes a requests Response object. |
| """ |
| if isinstance(response_or_ref, weakref.ReferenceType): |
| response = response_or_ref() |
| if response is None: |
| |
| |
| |
| return |
| else: |
| response = response_or_ref |
|
|
| |
| |
| cacheable_status_codes = status_codes or self.cacheable_status_codes |
| if response.status not in cacheable_status_codes: |
| logger.debug( |
| "Status code %s not in %s", response.status, cacheable_status_codes |
| ) |
| return |
|
|
| response_headers: CaseInsensitiveDict[str] = CaseInsensitiveDict( |
| response.headers |
| ) |
|
|
| if "date" in response_headers: |
| time_tuple = parsedate_tz(response_headers["date"]) |
| assert time_tuple is not None |
| date = calendar.timegm(time_tuple[:6]) |
| else: |
| date = 0 |
|
|
| |
| |
| |
| |
| if ( |
| body is not None |
| and "content-length" in response_headers |
| and response_headers["content-length"].isdigit() |
| and int(response_headers["content-length"]) != len(body) |
| ): |
| return |
|
|
| cc_req = self.parse_cache_control(request.headers) |
| cc = self.parse_cache_control(response_headers) |
|
|
| assert request.url is not None |
| cache_url = self.cache_url(request.url) |
| logger.debug('Updating cache with response from "%s"', cache_url) |
|
|
| |
| no_store = False |
| if "no-store" in cc: |
| no_store = True |
| logger.debug('Response header has "no-store"') |
| if "no-store" in cc_req: |
| no_store = True |
| logger.debug('Request header has "no-store"') |
| if no_store and self.cache.get(cache_url): |
| logger.debug('Purging existing cache entry to honor "no-store"') |
| self.cache.delete(cache_url) |
| if no_store: |
| return |
|
|
| |
| |
| |
| |
| |
| if "*" in response_headers.get("vary", ""): |
| logger.debug('Response header has "Vary: *"') |
| return |
|
|
| |
| if self.cache_etags and "etag" in response_headers: |
| expires_time = 0 |
| if response_headers.get("expires"): |
| expires = parsedate_tz(response_headers["expires"]) |
| if expires is not None: |
| expires_time = calendar.timegm(expires[:6]) - date |
|
|
| expires_time = max(expires_time, 14 * 86400) |
|
|
| logger.debug(f"etag object cached for {expires_time} seconds") |
| logger.debug("Caching due to etag") |
| self._cache_set(cache_url, request, response, body, expires_time) |
|
|
| |
| |
| elif int(response.status) in PERMANENT_REDIRECT_STATUSES: |
| logger.debug("Caching permanent redirect") |
| self._cache_set(cache_url, request, response, b"") |
|
|
| |
| |
| |
| elif "date" in response_headers: |
| time_tuple = parsedate_tz(response_headers["date"]) |
| assert time_tuple is not None |
| date = calendar.timegm(time_tuple[:6]) |
| |
| max_age = cc.get("max-age") |
| if max_age is not None and max_age > 0: |
| logger.debug("Caching b/c date exists and max-age > 0") |
| expires_time = max_age |
| self._cache_set( |
| cache_url, |
| request, |
| response, |
| body, |
| expires_time, |
| ) |
|
|
| |
| |
| elif "expires" in response_headers: |
| if response_headers["expires"]: |
| expires = parsedate_tz(response_headers["expires"]) |
| if expires is not None: |
| expires_time = calendar.timegm(expires[:6]) - date |
| else: |
| expires_time = None |
|
|
| logger.debug( |
| "Caching b/c of expires header. expires in {} seconds".format( |
| expires_time |
| ) |
| ) |
| self._cache_set( |
| cache_url, |
| request, |
| response, |
| body, |
| expires_time, |
| ) |
|
|
| def update_cached_response( |
| self, request: PreparedRequest, response: HTTPResponse |
| ) -> HTTPResponse: |
| """On a 304 we will get a new set of headers that we want to |
| update our cached value with, assuming we have one. |
| |
| This should only ever be called when we've sent an ETag and |
| gotten a 304 as the response. |
| """ |
| assert request.url is not None |
| cache_url = self.cache_url(request.url) |
| cached_response = self._load_from_cache(request) |
|
|
| if not cached_response: |
| |
| return response |
|
|
| |
| |
| |
| |
| |
| |
| |
| excluded_headers = ["content-length"] |
|
|
| cached_response.headers.update( |
| { |
| k: v |
| for k, v in response.headers.items() |
| if k.lower() not in excluded_headers |
| } |
| ) |
|
|
| |
| cached_response.status = 200 |
|
|
| |
| self._cache_set(cache_url, request, cached_response) |
|
|
| return cached_response |
|
|