| from __future__ import annotations | |
| import json, sys | |
| from functools import partialmethod | |
| from aiohttp import StreamReader | |
| from aiohttp.base_protocol import BaseProtocol | |
| from curl_cffi.requests import AsyncSession as BaseSession | |
| from curl_cffi.requests.cookies import Request, Response | |
| class StreamResponse: | |
| def __init__(self, inner: Response, content: StreamReader, request: Request): | |
| self.inner = inner | |
| self.content = content | |
| self.request = request | |
| self.status_code = inner.status_code | |
| self.reason = inner.reason | |
| self.ok = inner.ok | |
| self.headers = inner.headers | |
| self.cookies = inner.cookies | |
| async def text(self) -> str: | |
| content = await self.content.read() | |
| return content.decode() | |
| def raise_for_status(self): | |
| if not self.ok: | |
| raise RuntimeError(f"HTTP Error {self.status_code}: {self.reason}") | |
| async def json(self, **kwargs): | |
| return json.loads(await self.content.read(), **kwargs) | |
| class StreamRequest: | |
| def __init__(self, session: AsyncSession, method: str, url: str, **kwargs): | |
| self.session = session | |
| self.loop = session.loop | |
| self.content = StreamReader( | |
| BaseProtocol(session.loop), | |
| sys.maxsize, | |
| loop=session.loop | |
| ) | |
| self.method = method | |
| self.url = url | |
| self.options = kwargs | |
| def on_content(self, data): | |
| if not self.enter.done(): | |
| self.enter.set_result(None) | |
| self.content.feed_data(data) | |
| def on_done(self, task): | |
| self.content.feed_eof() | |
| self.curl.clean_after_perform() | |
| self.curl.reset() | |
| self.session.push_curl(self.curl) | |
| async def __aenter__(self) -> StreamResponse: | |
| self.curl = await self.session.pop_curl() | |
| self.enter = self.loop.create_future() | |
| request, _, header_buffer = self.session._set_curl_options( | |
| self.curl, | |
| self.method, | |
| self.url, | |
| content_callback=self.on_content, | |
| **self.options | |
| ) | |
| await self.session.acurl.add_handle(self.curl, False) | |
| self.handle = self.session.acurl._curl2future[self.curl] | |
| self.handle.add_done_callback(self.on_done) | |
| await self.enter | |
| return StreamResponse( | |
| self.session._parse_response(self.curl, request, _, header_buffer), | |
| self.content, | |
| request | |
| ) | |
| async def __aexit__(self, exc_type, exc, tb): | |
| pass | |
| class AsyncSession(BaseSession): | |
| def request( | |
| self, | |
| method: str, | |
| url: str, | |
| **kwargs | |
| ) -> StreamRequest: | |
| return StreamRequest(self, method, url, **kwargs) | |
| head = partialmethod(request, "HEAD") | |
| get = partialmethod(request, "GET") | |
| post = partialmethod(request, "POST") | |
| put = partialmethod(request, "PUT") | |
| patch = partialmethod(request, "PATCH") | |
| delete = partialmethod(request, "DELETE") |