Buckets:
| # SPDX-FileCopyrightText: 2015 Eric Larson | |
| # | |
| # SPDX-License-Identifier: Apache-2.0 | |
| from __future__ import annotations | |
| import io | |
| from typing import IO, TYPE_CHECKING, Any, Mapping, cast | |
| from pip._vendor import msgpack | |
| from pip._vendor.requests.structures import CaseInsensitiveDict | |
| from pip._vendor.urllib3 import HTTPResponse | |
| if TYPE_CHECKING: | |
| from pip._vendor.requests import PreparedRequest | |
| class Serializer: | |
| serde_version = "4" | |
| def dumps( | |
| self, | |
| request: PreparedRequest, | |
| response: HTTPResponse, | |
| body: bytes | None = None, | |
| ) -> bytes: | |
| response_headers: CaseInsensitiveDict[str] = CaseInsensitiveDict( | |
| response.headers | |
| ) | |
| if body is None: | |
| # When a body isn't passed in, we'll read the response. We | |
| # also update the response with a new file handler to be | |
| # sure it acts as though it was never read. | |
| body = response.read(decode_content=False) | |
| response._fp = io.BytesIO(body) # type: ignore[assignment] | |
| response.length_remaining = len(body) | |
| data = { | |
| "response": { | |
| "body": body, # Empty bytestring if body is stored separately | |
| "headers": {str(k): str(v) for k, v in response.headers.items()}, | |
| "status": response.status, | |
| "version": response.version, | |
| "reason": str(response.reason), | |
| "decode_content": response.decode_content, | |
| } | |
| } | |
| # Construct our vary headers | |
| data["vary"] = {} | |
| if "vary" in response_headers: | |
| varied_headers = response_headers["vary"].split(",") | |
| for header in varied_headers: | |
| header = str(header).strip() | |
| header_value = request.headers.get(header, None) | |
| if header_value is not None: | |
| header_value = str(header_value) | |
| data["vary"][header] = header_value | |
| return b",".join([f"cc={self.serde_version}".encode(), self.serialize(data)]) | |
| def serialize(self, data: dict[str, Any]) -> bytes: | |
| return cast(bytes, msgpack.dumps(data, use_bin_type=True)) | |
| def loads( | |
| self, | |
| request: PreparedRequest, | |
| data: bytes, | |
| body_file: IO[bytes] | None = None, | |
| ) -> HTTPResponse | None: | |
| # Short circuit if we've been given an empty set of data | |
| if not data: | |
| return None | |
| # Previous versions of this library supported other serialization | |
| # formats, but these have all been removed. | |
| if not data.startswith(f"cc={self.serde_version},".encode()): | |
| return None | |
| data = data[5:] | |
| return self._loads_v4(request, data, body_file) | |
| def prepare_response( | |
| self, | |
| request: PreparedRequest, | |
| cached: Mapping[str, Any], | |
| body_file: IO[bytes] | None = None, | |
| ) -> HTTPResponse | None: | |
| """Verify our vary headers match and construct a real urllib3 | |
| HTTPResponse object. | |
| """ | |
| # Special case the '*' Vary value as it means we cannot actually | |
| # determine if the cached response is suitable for this request. | |
| # This case is also handled in the controller code when creating | |
| # a cache entry, but is left here for backwards compatibility. | |
| if "*" in cached.get("vary", {}): | |
| return None | |
| # Ensure that the Vary headers for the cached response match our | |
| # request | |
| for header, value in cached.get("vary", {}).items(): | |
| if request.headers.get(header, None) != value: | |
| return None | |
| body_raw = cached["response"].pop("body") | |
| headers: CaseInsensitiveDict[str] = CaseInsensitiveDict( | |
| data=cached["response"]["headers"] | |
| ) | |
| if headers.get("transfer-encoding", "") == "chunked": | |
| headers.pop("transfer-encoding") | |
| cached["response"]["headers"] = headers | |
| try: | |
| body: IO[bytes] | |
| if body_file is None: | |
| body = io.BytesIO(body_raw) | |
| else: | |
| body = body_file | |
| except TypeError: | |
| # This can happen if cachecontrol serialized to v1 format (pickle) | |
| # using Python 2. A Python 2 str(byte string) will be unpickled as | |
| # a Python 3 str (unicode string), which will cause the above to | |
| # fail with: | |
| # | |
| # TypeError: 'str' does not support the buffer interface | |
| body = io.BytesIO(body_raw.encode("utf8")) | |
| # Discard any `strict` parameter serialized by older version of cachecontrol. | |
| cached["response"].pop("strict", None) | |
| return HTTPResponse(body=body, preload_content=False, **cached["response"]) | |
| def _loads_v4( | |
| self, | |
| request: PreparedRequest, | |
| data: bytes, | |
| body_file: IO[bytes] | None = None, | |
| ) -> HTTPResponse | None: | |
| try: | |
| cached = msgpack.loads(data, raw=False) | |
| except ValueError: | |
| return None | |
| return self.prepare_response(request, cached, body_file) | |
Xet Storage Details
- Size:
- 5.16 kB
- Xet hash:
- 03e79697360dd202acae05f362dcacbedc3e07355f29ee8001f9307e541f24cd
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.