DeepSolanaCoder
/
DeepSeek-Coder-main
/finetune
/venv
/lib
/python3.12
/site-packages
/aiohttp
/web_response.py
| import asyncio | |
| import collections.abc | |
| import datetime | |
| import enum | |
| import json | |
| import math | |
| import time | |
| import warnings | |
| import zlib | |
| from concurrent.futures import Executor | |
| from http import HTTPStatus | |
| from http.cookies import SimpleCookie | |
| from typing import ( | |
| TYPE_CHECKING, | |
| Any, | |
| Dict, | |
| Iterator, | |
| MutableMapping, | |
| Optional, | |
| Union, | |
| cast, | |
| ) | |
| from multidict import CIMultiDict, istr | |
| from . import hdrs, payload | |
| from .abc import AbstractStreamWriter | |
| from .compression_utils import ZLibCompressor | |
| from .helpers import ( | |
| ETAG_ANY, | |
| QUOTED_ETAG_RE, | |
| ETag, | |
| HeadersMixin, | |
| must_be_empty_body, | |
| parse_http_date, | |
| rfc822_formatted_time, | |
| sentinel, | |
| should_remove_content_length, | |
| validate_etag_value, | |
| ) | |
| from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11 | |
| from .payload import Payload | |
| from .typedefs import JSONEncoder, LooseHeaders | |
| REASON_PHRASES = {http_status.value: http_status.phrase for http_status in HTTPStatus} | |
| LARGE_BODY_SIZE = 1024**2 | |
| __all__ = ("ContentCoding", "StreamResponse", "Response", "json_response") | |
| if TYPE_CHECKING: | |
| from .web_request import BaseRequest | |
| BaseClass = MutableMapping[str, Any] | |
| else: | |
| BaseClass = collections.abc.MutableMapping | |
| # TODO(py311): Convert to StrEnum for wider use | |
| class ContentCoding(enum.Enum): | |
| # The content codings that we have support for. | |
| # | |
| # Additional registered codings are listed at: | |
| # https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding | |
| deflate = "deflate" | |
| gzip = "gzip" | |
| identity = "identity" | |
| CONTENT_CODINGS = {coding.value: coding for coding in ContentCoding} | |
| ############################################################ | |
| # HTTP Response classes | |
| ############################################################ | |
| class StreamResponse(BaseClass, HeadersMixin): | |
| _body: Union[None, bytes, bytearray, Payload] | |
| _length_check = True | |
| _body = None | |
| _keep_alive: Optional[bool] = None | |
| _chunked: bool = False | |
| _compression: bool = False | |
| _compression_strategy: int = zlib.Z_DEFAULT_STRATEGY | |
| _compression_force: Optional[ContentCoding] = None | |
| _req: Optional["BaseRequest"] = None | |
| _payload_writer: Optional[AbstractStreamWriter] = None | |
| _eof_sent: bool = False | |
| _must_be_empty_body: Optional[bool] = None | |
| _body_length = 0 | |
| _cookies: Optional[SimpleCookie] = None | |
| def __init__( | |
| self, | |
| *, | |
| status: int = 200, | |
| reason: Optional[str] = None, | |
| headers: Optional[LooseHeaders] = None, | |
| _real_headers: Optional[CIMultiDict[str]] = None, | |
| ) -> None: | |
| """Initialize a new stream response object. | |
| _real_headers is an internal parameter used to pass a pre-populated | |
| headers object. It is used by the `Response` class to avoid copying | |
| the headers when creating a new response object. It is not intended | |
| to be used by external code. | |
| """ | |
| self._state: Dict[str, Any] = {} | |
| if _real_headers is not None: | |
| self._headers = _real_headers | |
| elif headers is not None: | |
| self._headers: CIMultiDict[str] = CIMultiDict(headers) | |
| else: | |
| self._headers = CIMultiDict() | |
| self._set_status(status, reason) | |
| def prepared(self) -> bool: | |
| return self._eof_sent or self._payload_writer is not None | |
| def task(self) -> "Optional[asyncio.Task[None]]": | |
| if self._req: | |
| return self._req.task | |
| else: | |
| return None | |
| def status(self) -> int: | |
| return self._status | |
| def chunked(self) -> bool: | |
| return self._chunked | |
| def compression(self) -> bool: | |
| return self._compression | |
| def reason(self) -> str: | |
| return self._reason | |
| def set_status( | |
| self, | |
| status: int, | |
| reason: Optional[str] = None, | |
| ) -> None: | |
| assert ( | |
| not self.prepared | |
| ), "Cannot change the response status code after the headers have been sent" | |
| self._set_status(status, reason) | |
| def _set_status(self, status: int, reason: Optional[str]) -> None: | |
| self._status = int(status) | |
| if reason is None: | |
| reason = REASON_PHRASES.get(self._status, "") | |
| elif "\n" in reason: | |
| raise ValueError("Reason cannot contain \\n") | |
| self._reason = reason | |
| def keep_alive(self) -> Optional[bool]: | |
| return self._keep_alive | |
| def force_close(self) -> None: | |
| self._keep_alive = False | |
| def body_length(self) -> int: | |
| return self._body_length | |
| def output_length(self) -> int: | |
| warnings.warn("output_length is deprecated", DeprecationWarning) | |
| assert self._payload_writer | |
| return self._payload_writer.buffer_size | |
| def enable_chunked_encoding(self, chunk_size: Optional[int] = None) -> None: | |
| """Enables automatic chunked transfer encoding.""" | |
| if hdrs.CONTENT_LENGTH in self._headers: | |
| raise RuntimeError( | |
| "You can't enable chunked encoding when a content length is set" | |
| ) | |
| if chunk_size is not None: | |
| warnings.warn("Chunk size is deprecated #1615", DeprecationWarning) | |
| self._chunked = True | |
| def enable_compression( | |
| self, | |
| force: Optional[Union[bool, ContentCoding]] = None, | |
| strategy: int = zlib.Z_DEFAULT_STRATEGY, | |
| ) -> None: | |
| """Enables response compression encoding.""" | |
| # Backwards compatibility for when force was a bool <0.17. | |
| if isinstance(force, bool): | |
| force = ContentCoding.deflate if force else ContentCoding.identity | |
| warnings.warn( | |
| "Using boolean for force is deprecated #3318", DeprecationWarning | |
| ) | |
| elif force is not None: | |
| assert isinstance( | |
| force, ContentCoding | |
| ), "force should one of None, bool or ContentEncoding" | |
| self._compression = True | |
| self._compression_force = force | |
| self._compression_strategy = strategy | |
| def headers(self) -> "CIMultiDict[str]": | |
| return self._headers | |
| def cookies(self) -> SimpleCookie: | |
| if self._cookies is None: | |
| self._cookies = SimpleCookie() | |
| return self._cookies | |
| def set_cookie( | |
| self, | |
| name: str, | |
| value: str, | |
| *, | |
| expires: Optional[str] = None, | |
| domain: Optional[str] = None, | |
| max_age: Optional[Union[int, str]] = None, | |
| path: str = "/", | |
| secure: Optional[bool] = None, | |
| httponly: Optional[bool] = None, | |
| version: Optional[str] = None, | |
| samesite: Optional[str] = None, | |
| ) -> None: | |
| """Set or update response cookie. | |
| Sets new cookie or updates existent with new value. | |
| Also updates only those params which are not None. | |
| """ | |
| if self._cookies is None: | |
| self._cookies = SimpleCookie() | |
| self._cookies[name] = value | |
| c = self._cookies[name] | |
| if expires is not None: | |
| c["expires"] = expires | |
| elif c.get("expires") == "Thu, 01 Jan 1970 00:00:00 GMT": | |
| del c["expires"] | |
| if domain is not None: | |
| c["domain"] = domain | |
| if max_age is not None: | |
| c["max-age"] = str(max_age) | |
| elif "max-age" in c: | |
| del c["max-age"] | |
| c["path"] = path | |
| if secure is not None: | |
| c["secure"] = secure | |
| if httponly is not None: | |
| c["httponly"] = httponly | |
| if version is not None: | |
| c["version"] = version | |
| if samesite is not None: | |
| c["samesite"] = samesite | |
| def del_cookie( | |
| self, | |
| name: str, | |
| *, | |
| domain: Optional[str] = None, | |
| path: str = "/", | |
| secure: Optional[bool] = None, | |
| httponly: Optional[bool] = None, | |
| samesite: Optional[str] = None, | |
| ) -> None: | |
| """Delete cookie. | |
| Creates new empty expired cookie. | |
| """ | |
| # TODO: do we need domain/path here? | |
| if self._cookies is not None: | |
| self._cookies.pop(name, None) | |
| self.set_cookie( | |
| name, | |
| "", | |
| max_age=0, | |
| expires="Thu, 01 Jan 1970 00:00:00 GMT", | |
| domain=domain, | |
| path=path, | |
| secure=secure, | |
| httponly=httponly, | |
| samesite=samesite, | |
| ) | |
| def content_length(self) -> Optional[int]: | |
| # Just a placeholder for adding setter | |
| return super().content_length | |
| def content_length(self, value: Optional[int]) -> None: | |
| if value is not None: | |
| value = int(value) | |
| if self._chunked: | |
| raise RuntimeError( | |
| "You can't set content length when chunked encoding is enable" | |
| ) | |
| self._headers[hdrs.CONTENT_LENGTH] = str(value) | |
| else: | |
| self._headers.pop(hdrs.CONTENT_LENGTH, None) | |
| def content_type(self) -> str: | |
| # Just a placeholder for adding setter | |
| return super().content_type | |
| def content_type(self, value: str) -> None: | |
| self.content_type # read header values if needed | |
| self._content_type = str(value) | |
| self._generate_content_type_header() | |
| def charset(self) -> Optional[str]: | |
| # Just a placeholder for adding setter | |
| return super().charset | |
| def charset(self, value: Optional[str]) -> None: | |
| ctype = self.content_type # read header values if needed | |
| if ctype == "application/octet-stream": | |
| raise RuntimeError( | |
| "Setting charset for application/octet-stream " | |
| "doesn't make sense, setup content_type first" | |
| ) | |
| assert self._content_dict is not None | |
| if value is None: | |
| self._content_dict.pop("charset", None) | |
| else: | |
| self._content_dict["charset"] = str(value).lower() | |
| self._generate_content_type_header() | |
| def last_modified(self) -> Optional[datetime.datetime]: | |
| """The value of Last-Modified HTTP header, or None. | |
| This header is represented as a `datetime` object. | |
| """ | |
| return parse_http_date(self._headers.get(hdrs.LAST_MODIFIED)) | |
| def last_modified( | |
| self, value: Optional[Union[int, float, datetime.datetime, str]] | |
| ) -> None: | |
| if value is None: | |
| self._headers.pop(hdrs.LAST_MODIFIED, None) | |
| elif isinstance(value, (int, float)): | |
| self._headers[hdrs.LAST_MODIFIED] = time.strftime( | |
| "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(math.ceil(value)) | |
| ) | |
| elif isinstance(value, datetime.datetime): | |
| self._headers[hdrs.LAST_MODIFIED] = time.strftime( | |
| "%a, %d %b %Y %H:%M:%S GMT", value.utctimetuple() | |
| ) | |
| elif isinstance(value, str): | |
| self._headers[hdrs.LAST_MODIFIED] = value | |
| def etag(self) -> Optional[ETag]: | |
| quoted_value = self._headers.get(hdrs.ETAG) | |
| if not quoted_value: | |
| return None | |
| elif quoted_value == ETAG_ANY: | |
| return ETag(value=ETAG_ANY) | |
| match = QUOTED_ETAG_RE.fullmatch(quoted_value) | |
| if not match: | |
| return None | |
| is_weak, value = match.group(1, 2) | |
| return ETag( | |
| is_weak=bool(is_weak), | |
| value=value, | |
| ) | |
| def etag(self, value: Optional[Union[ETag, str]]) -> None: | |
| if value is None: | |
| self._headers.pop(hdrs.ETAG, None) | |
| elif (isinstance(value, str) and value == ETAG_ANY) or ( | |
| isinstance(value, ETag) and value.value == ETAG_ANY | |
| ): | |
| self._headers[hdrs.ETAG] = ETAG_ANY | |
| elif isinstance(value, str): | |
| validate_etag_value(value) | |
| self._headers[hdrs.ETAG] = f'"{value}"' | |
| elif isinstance(value, ETag) and isinstance(value.value, str): | |
| validate_etag_value(value.value) | |
| hdr_value = f'W/"{value.value}"' if value.is_weak else f'"{value.value}"' | |
| self._headers[hdrs.ETAG] = hdr_value | |
| else: | |
| raise ValueError( | |
| f"Unsupported etag type: {type(value)}. " | |
| f"etag must be str, ETag or None" | |
| ) | |
| def _generate_content_type_header( | |
| self, CONTENT_TYPE: istr = hdrs.CONTENT_TYPE | |
| ) -> None: | |
| assert self._content_dict is not None | |
| assert self._content_type is not None | |
| params = "; ".join(f"{k}={v}" for k, v in self._content_dict.items()) | |
| if params: | |
| ctype = self._content_type + "; " + params | |
| else: | |
| ctype = self._content_type | |
| self._headers[CONTENT_TYPE] = ctype | |
| async def _do_start_compression(self, coding: ContentCoding) -> None: | |
| if coding is ContentCoding.identity: | |
| return | |
| assert self._payload_writer is not None | |
| self._headers[hdrs.CONTENT_ENCODING] = coding.value | |
| self._payload_writer.enable_compression( | |
| coding.value, self._compression_strategy | |
| ) | |
| # Compressed payload may have different content length, | |
| # remove the header | |
| self._headers.popall(hdrs.CONTENT_LENGTH, None) | |
| async def _start_compression(self, request: "BaseRequest") -> None: | |
| if self._compression_force: | |
| await self._do_start_compression(self._compression_force) | |
| return | |
| # Encoding comparisons should be case-insensitive | |
| # https://www.rfc-editor.org/rfc/rfc9110#section-8.4.1 | |
| accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower() | |
| for value, coding in CONTENT_CODINGS.items(): | |
| if value in accept_encoding: | |
| await self._do_start_compression(coding) | |
| return | |
| async def prepare(self, request: "BaseRequest") -> Optional[AbstractStreamWriter]: | |
| if self._eof_sent: | |
| return None | |
| if self._payload_writer is not None: | |
| return self._payload_writer | |
| self._must_be_empty_body = must_be_empty_body(request.method, self.status) | |
| return await self._start(request) | |
| async def _start(self, request: "BaseRequest") -> AbstractStreamWriter: | |
| self._req = request | |
| writer = self._payload_writer = request._payload_writer | |
| await self._prepare_headers() | |
| await request._prepare_hook(self) | |
| await self._write_headers() | |
| return writer | |
| async def _prepare_headers(self) -> None: | |
| request = self._req | |
| assert request is not None | |
| writer = self._payload_writer | |
| assert writer is not None | |
| keep_alive = self._keep_alive | |
| if keep_alive is None: | |
| keep_alive = request.keep_alive | |
| self._keep_alive = keep_alive | |
| version = request.version | |
| headers = self._headers | |
| if self._cookies: | |
| for cookie in self._cookies.values(): | |
| value = cookie.output(header="")[1:] | |
| headers.add(hdrs.SET_COOKIE, value) | |
| if self._compression: | |
| await self._start_compression(request) | |
| if self._chunked: | |
| if version != HttpVersion11: | |
| raise RuntimeError( | |
| "Using chunked encoding is forbidden " | |
| "for HTTP/{0.major}.{0.minor}".format(request.version) | |
| ) | |
| if not self._must_be_empty_body: | |
| writer.enable_chunking() | |
| headers[hdrs.TRANSFER_ENCODING] = "chunked" | |
| elif self._length_check: # Disabled for WebSockets | |
| writer.length = self.content_length | |
| if writer.length is None: | |
| if version >= HttpVersion11: | |
| if not self._must_be_empty_body: | |
| writer.enable_chunking() | |
| headers[hdrs.TRANSFER_ENCODING] = "chunked" | |
| elif not self._must_be_empty_body: | |
| keep_alive = False | |
| # HTTP 1.1: https://tools.ietf.org/html/rfc7230#section-3.3.2 | |
| # HTTP 1.0: https://tools.ietf.org/html/rfc1945#section-10.4 | |
| if self._must_be_empty_body: | |
| if hdrs.CONTENT_LENGTH in headers and should_remove_content_length( | |
| request.method, self.status | |
| ): | |
| del headers[hdrs.CONTENT_LENGTH] | |
| # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-10 | |
| # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-13 | |
| if hdrs.TRANSFER_ENCODING in headers: | |
| del headers[hdrs.TRANSFER_ENCODING] | |
| elif (writer.length if self._length_check else self.content_length) != 0: | |
| # https://www.rfc-editor.org/rfc/rfc9110#section-8.3-5 | |
| headers.setdefault(hdrs.CONTENT_TYPE, "application/octet-stream") | |
| headers.setdefault(hdrs.DATE, rfc822_formatted_time()) | |
| headers.setdefault(hdrs.SERVER, SERVER_SOFTWARE) | |
| # connection header | |
| if hdrs.CONNECTION not in headers: | |
| if keep_alive: | |
| if version == HttpVersion10: | |
| headers[hdrs.CONNECTION] = "keep-alive" | |
| elif version == HttpVersion11: | |
| headers[hdrs.CONNECTION] = "close" | |
| async def _write_headers(self) -> None: | |
| request = self._req | |
| assert request is not None | |
| writer = self._payload_writer | |
| assert writer is not None | |
| # status line | |
| version = request.version | |
| status_line = f"HTTP/{version[0]}.{version[1]} {self._status} {self._reason}" | |
| await writer.write_headers(status_line, self._headers) | |
| async def write(self, data: Union[bytes, bytearray, memoryview]) -> None: | |
| assert isinstance( | |
| data, (bytes, bytearray, memoryview) | |
| ), "data argument must be byte-ish (%r)" % type(data) | |
| if self._eof_sent: | |
| raise RuntimeError("Cannot call write() after write_eof()") | |
| if self._payload_writer is None: | |
| raise RuntimeError("Cannot call write() before prepare()") | |
| await self._payload_writer.write(data) | |
| async def drain(self) -> None: | |
| assert not self._eof_sent, "EOF has already been sent" | |
| assert self._payload_writer is not None, "Response has not been started" | |
| warnings.warn( | |
| "drain method is deprecated, use await resp.write()", | |
| DeprecationWarning, | |
| stacklevel=2, | |
| ) | |
| await self._payload_writer.drain() | |
| async def write_eof(self, data: bytes = b"") -> None: | |
| assert isinstance( | |
| data, (bytes, bytearray, memoryview) | |
| ), "data argument must be byte-ish (%r)" % type(data) | |
| if self._eof_sent: | |
| return | |
| assert self._payload_writer is not None, "Response has not been started" | |
| await self._payload_writer.write_eof(data) | |
| self._eof_sent = True | |
| self._req = None | |
| self._body_length = self._payload_writer.output_size | |
| self._payload_writer = None | |
| def __repr__(self) -> str: | |
| if self._eof_sent: | |
| info = "eof" | |
| elif self.prepared: | |
| assert self._req is not None | |
| info = f"{self._req.method} {self._req.path} " | |
| else: | |
| info = "not prepared" | |
| return f"<{self.__class__.__name__} {self.reason} {info}>" | |
| def __getitem__(self, key: str) -> Any: | |
| return self._state[key] | |
| def __setitem__(self, key: str, value: Any) -> None: | |
| self._state[key] = value | |
| def __delitem__(self, key: str) -> None: | |
| del self._state[key] | |
| def __len__(self) -> int: | |
| return len(self._state) | |
| def __iter__(self) -> Iterator[str]: | |
| return iter(self._state) | |
| def __hash__(self) -> int: | |
| return hash(id(self)) | |
| def __eq__(self, other: object) -> bool: | |
| return self is other | |
| class Response(StreamResponse): | |
| _compressed_body: Optional[bytes] = None | |
| def __init__( | |
| self, | |
| *, | |
| body: Any = None, | |
| status: int = 200, | |
| reason: Optional[str] = None, | |
| text: Optional[str] = None, | |
| headers: Optional[LooseHeaders] = None, | |
| content_type: Optional[str] = None, | |
| charset: Optional[str] = None, | |
| zlib_executor_size: Optional[int] = None, | |
| zlib_executor: Optional[Executor] = None, | |
| ) -> None: | |
| if body is not None and text is not None: | |
| raise ValueError("body and text are not allowed together") | |
| if headers is None: | |
| real_headers: CIMultiDict[str] = CIMultiDict() | |
| elif not isinstance(headers, CIMultiDict): | |
| real_headers = CIMultiDict(headers) | |
| else: | |
| real_headers = headers # = cast('CIMultiDict[str]', headers) | |
| if content_type is not None and "charset" in content_type: | |
| raise ValueError("charset must not be in content_type argument") | |
| if text is not None: | |
| if hdrs.CONTENT_TYPE in real_headers: | |
| if content_type or charset: | |
| raise ValueError( | |
| "passing both Content-Type header and " | |
| "content_type or charset params " | |
| "is forbidden" | |
| ) | |
| else: | |
| # fast path for filling headers | |
| if not isinstance(text, str): | |
| raise TypeError("text argument must be str (%r)" % type(text)) | |
| if content_type is None: | |
| content_type = "text/plain" | |
| if charset is None: | |
| charset = "utf-8" | |
| real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset | |
| body = text.encode(charset) | |
| text = None | |
| elif hdrs.CONTENT_TYPE in real_headers: | |
| if content_type is not None or charset is not None: | |
| raise ValueError( | |
| "passing both Content-Type header and " | |
| "content_type or charset params " | |
| "is forbidden" | |
| ) | |
| elif content_type is not None: | |
| if charset is not None: | |
| content_type += "; charset=" + charset | |
| real_headers[hdrs.CONTENT_TYPE] = content_type | |
| super().__init__(status=status, reason=reason, _real_headers=real_headers) | |
| if text is not None: | |
| self.text = text | |
| else: | |
| self.body = body | |
| self._zlib_executor_size = zlib_executor_size | |
| self._zlib_executor = zlib_executor | |
| def body(self) -> Optional[Union[bytes, Payload]]: | |
| return self._body | |
| def body(self, body: Any) -> None: | |
| if body is None: | |
| self._body = None | |
| elif isinstance(body, (bytes, bytearray)): | |
| self._body = body | |
| else: | |
| try: | |
| self._body = body = payload.PAYLOAD_REGISTRY.get(body) | |
| except payload.LookupError: | |
| raise ValueError("Unsupported body type %r" % type(body)) | |
| headers = self._headers | |
| # set content-type | |
| if hdrs.CONTENT_TYPE not in headers: | |
| headers[hdrs.CONTENT_TYPE] = body.content_type | |
| # copy payload headers | |
| if body.headers: | |
| for key, value in body.headers.items(): | |
| if key not in headers: | |
| headers[key] = value | |
| self._compressed_body = None | |
| def text(self) -> Optional[str]: | |
| if self._body is None: | |
| return None | |
| return self._body.decode(self.charset or "utf-8") | |
| def text(self, text: str) -> None: | |
| assert text is None or isinstance( | |
| text, str | |
| ), "text argument must be str (%r)" % type(text) | |
| if self.content_type == "application/octet-stream": | |
| self.content_type = "text/plain" | |
| if self.charset is None: | |
| self.charset = "utf-8" | |
| self._body = text.encode(self.charset) | |
| self._compressed_body = None | |
| def content_length(self) -> Optional[int]: | |
| if self._chunked: | |
| return None | |
| if hdrs.CONTENT_LENGTH in self._headers: | |
| return int(self._headers[hdrs.CONTENT_LENGTH]) | |
| if self._compressed_body is not None: | |
| # Return length of the compressed body | |
| return len(self._compressed_body) | |
| elif isinstance(self._body, Payload): | |
| # A payload without content length, or a compressed payload | |
| return None | |
| elif self._body is not None: | |
| return len(self._body) | |
| else: | |
| return 0 | |
| def content_length(self, value: Optional[int]) -> None: | |
| raise RuntimeError("Content length is set automatically") | |
| async def write_eof(self, data: bytes = b"") -> None: | |
| if self._eof_sent: | |
| return | |
| if self._compressed_body is None: | |
| body: Optional[Union[bytes, Payload]] = self._body | |
| else: | |
| body = self._compressed_body | |
| assert not data, f"data arg is not supported, got {data!r}" | |
| assert self._req is not None | |
| assert self._payload_writer is not None | |
| if body is None or self._must_be_empty_body: | |
| await super().write_eof() | |
| elif isinstance(self._body, Payload): | |
| await self._body.write(self._payload_writer) | |
| await super().write_eof() | |
| else: | |
| await super().write_eof(cast(bytes, body)) | |
| async def _start(self, request: "BaseRequest") -> AbstractStreamWriter: | |
| if hdrs.CONTENT_LENGTH in self._headers: | |
| if should_remove_content_length(request.method, self.status): | |
| del self._headers[hdrs.CONTENT_LENGTH] | |
| elif not self._chunked: | |
| if isinstance(self._body, Payload): | |
| if self._body.size is not None: | |
| self._headers[hdrs.CONTENT_LENGTH] = str(self._body.size) | |
| else: | |
| body_len = len(self._body) if self._body else "0" | |
| # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-7 | |
| if body_len != "0" or ( | |
| self.status != 304 and request.method not in hdrs.METH_HEAD_ALL | |
| ): | |
| self._headers[hdrs.CONTENT_LENGTH] = str(body_len) | |
| return await super()._start(request) | |
| async def _do_start_compression(self, coding: ContentCoding) -> None: | |
| if self._chunked or isinstance(self._body, Payload): | |
| return await super()._do_start_compression(coding) | |
| if coding is ContentCoding.identity: | |
| return | |
| # Instead of using _payload_writer.enable_compression, | |
| # compress the whole body | |
| compressor = ZLibCompressor( | |
| encoding=coding.value, | |
| max_sync_chunk_size=self._zlib_executor_size, | |
| executor=self._zlib_executor, | |
| ) | |
| assert self._body is not None | |
| if self._zlib_executor_size is None and len(self._body) > LARGE_BODY_SIZE: | |
| warnings.warn( | |
| "Synchronous compression of large response bodies " | |
| f"({len(self._body)} bytes) might block the async event loop. " | |
| "Consider providing a custom value to zlib_executor_size/" | |
| "zlib_executor response properties or disabling compression on it." | |
| ) | |
| self._compressed_body = ( | |
| await compressor.compress(self._body) + compressor.flush() | |
| ) | |
| self._headers[hdrs.CONTENT_ENCODING] = coding.value | |
| self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body)) | |
| def json_response( | |
| data: Any = sentinel, | |
| *, | |
| text: Optional[str] = None, | |
| body: Optional[bytes] = None, | |
| status: int = 200, | |
| reason: Optional[str] = None, | |
| headers: Optional[LooseHeaders] = None, | |
| content_type: str = "application/json", | |
| dumps: JSONEncoder = json.dumps, | |
| ) -> Response: | |
| if data is not sentinel: | |
| if text or body: | |
| raise ValueError("only one of data, text, or body should be specified") | |
| else: | |
| text = dumps(data) | |
| return Response( | |
| text=text, | |
| body=body, | |
| status=status, | |
| reason=reason, | |
| headers=headers, | |
| content_type=content_type, | |
| ) | |